4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: file '%s' should not exist on non master"
755 " candidates (and the file is outdated)" % file_name)
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURenameCluster(LogicalUnit):
1333 """Rename the cluster.
1336 HPATH = "cluster-rename"
1337 HTYPE = constants.HTYPE_CLUSTER
1340 def BuildHooksEnv(self):
1345 "OP_TARGET": self.cfg.GetClusterName(),
1346 "NEW_NAME": self.op.name,
1348 mn = self.cfg.GetMasterNode()
1349 return env, [mn], [mn]
1351 def CheckPrereq(self):
1352 """Verify that the passed name is a valid one.
1355 hostname = utils.HostInfo(self.op.name)
1357 new_name = hostname.name
1358 self.ip = new_ip = hostname.ip
1359 old_name = self.cfg.GetClusterName()
1360 old_ip = self.cfg.GetMasterIP()
1361 if new_name == old_name and new_ip == old_ip:
1362 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363 " cluster has changed")
1364 if new_ip != old_ip:
1365 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367 " reachable on the network. Aborting." %
1370 self.op.name = new_name
1372 def Exec(self, feedback_fn):
1373 """Rename the cluster.
1376 clustername = self.op.name
1379 # shutdown the master IP
1380 master = self.cfg.GetMasterNode()
1381 result = self.rpc.call_node_stop_master(master, False)
1382 if result.failed or not result.data:
1383 raise errors.OpExecError("Could not disable the master role")
1386 cluster = self.cfg.GetClusterInfo()
1387 cluster.cluster_name = clustername
1388 cluster.master_ip = ip
1389 self.cfg.Update(cluster)
1391 # update the known hosts file
1392 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393 node_list = self.cfg.GetNodeList()
1395 node_list.remove(master)
1398 result = self.rpc.call_upload_file(node_list,
1399 constants.SSH_KNOWN_HOSTS_FILE)
1400 for to_node, to_result in result.iteritems():
1401 if to_result.failed or not to_result.data:
1402 logging.error("Copy of file %s to node %s failed",
1403 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1406 result = self.rpc.call_node_start_master(master, False)
1407 if result.failed or not result.data:
1408 self.LogWarning("Could not re-enable the master role on"
1409 " the master, please restart manually.")
1412 def _RecursiveCheckIfLVMBased(disk):
1413 """Check if the given disk or its children are lvm-based.
1415 @type disk: L{objects.Disk}
1416 @param disk: the disk to check
1418 @return: boolean indicating whether a LD_LV dev_type was found or not
1422 for chdisk in disk.children:
1423 if _RecursiveCheckIfLVMBased(chdisk):
1425 return disk.dev_type == constants.LD_LV
1428 class LUSetClusterParams(LogicalUnit):
1429 """Change the parameters of the cluster.
1432 HPATH = "cluster-modify"
1433 HTYPE = constants.HTYPE_CLUSTER
1437 def CheckArguments(self):
1441 if not hasattr(self.op, "candidate_pool_size"):
1442 self.op.candidate_pool_size = None
1443 if self.op.candidate_pool_size is not None:
1445 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446 except (ValueError, TypeError), err:
1447 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1449 if self.op.candidate_pool_size < 1:
1450 raise errors.OpPrereqError("At least one master candidate needed")
1452 def ExpandNames(self):
1453 # FIXME: in the future maybe other cluster params won't require checking on
1454 # all nodes to be modified.
1455 self.needed_locks = {
1456 locking.LEVEL_NODE: locking.ALL_SET,
1458 self.share_locks[locking.LEVEL_NODE] = 1
1460 def BuildHooksEnv(self):
1465 "OP_TARGET": self.cfg.GetClusterName(),
1466 "NEW_VG_NAME": self.op.vg_name,
1468 mn = self.cfg.GetMasterNode()
1469 return env, [mn], [mn]
1471 def CheckPrereq(self):
1472 """Check prerequisites.
1474 This checks whether the given params don't conflict and
1475 if the given volume group is valid.
1478 if self.op.vg_name is not None and not self.op.vg_name:
1479 instances = self.cfg.GetAllInstancesInfo().values()
1480 for inst in instances:
1481 for disk in inst.disks:
1482 if _RecursiveCheckIfLVMBased(disk):
1483 raise errors.OpPrereqError("Cannot disable lvm storage while"
1484 " lvm-based instances exist")
1486 node_list = self.acquired_locks[locking.LEVEL_NODE]
1488 # if vg_name not None, checks given volume group on all nodes
1490 vglist = self.rpc.call_vg_list(node_list)
1491 for node in node_list:
1492 if vglist[node].failed:
1493 # ignoring down node
1494 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1496 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1498 constants.MIN_VG_SIZE)
1500 raise errors.OpPrereqError("Error on node '%s': %s" %
1503 self.cluster = cluster = self.cfg.GetClusterInfo()
1504 # validate beparams changes
1505 if self.op.beparams:
1506 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507 self.new_beparams = cluster.FillDict(
1508 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1510 # hypervisor list/parameters
1511 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512 if self.op.hvparams:
1513 if not isinstance(self.op.hvparams, dict):
1514 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515 for hv_name, hv_dict in self.op.hvparams.items():
1516 if hv_name not in self.new_hvparams:
1517 self.new_hvparams[hv_name] = hv_dict
1519 self.new_hvparams[hv_name].update(hv_dict)
1521 if self.op.enabled_hypervisors is not None:
1522 self.hv_list = self.op.enabled_hypervisors
1524 self.hv_list = cluster.enabled_hypervisors
1526 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527 # either the enabled list has changed, or the parameters have, validate
1528 for hv_name, hv_params in self.new_hvparams.items():
1529 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530 (self.op.enabled_hypervisors and
1531 hv_name in self.op.enabled_hypervisors)):
1532 # either this is a new hypervisor, or its parameters have changed
1533 hv_class = hypervisor.GetHypervisor(hv_name)
1534 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535 hv_class.CheckParameterSyntax(hv_params)
1536 _CheckHVParams(self, node_list, hv_name, hv_params)
1538 def Exec(self, feedback_fn):
1539 """Change the parameters of the cluster.
1542 if self.op.vg_name is not None:
1543 new_volume = self.op.vg_name
1546 if new_volume != self.cfg.GetVGName():
1547 self.cfg.SetVGName(new_volume)
1549 feedback_fn("Cluster LVM configuration already in desired"
1550 " state, not changing")
1551 if self.op.hvparams:
1552 self.cluster.hvparams = self.new_hvparams
1553 if self.op.enabled_hypervisors is not None:
1554 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555 if self.op.beparams:
1556 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557 if self.op.candidate_pool_size is not None:
1558 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559 # we need to update the pool size here, otherwise the save will fail
1560 _AdjustCandidatePool(self)
1562 self.cfg.Update(self.cluster)
1565 class LURedistributeConfig(NoHooksLU):
1566 """Force the redistribution of cluster configuration.
1568 This is a very simple LU.
1574 def ExpandNames(self):
1575 self.needed_locks = {
1576 locking.LEVEL_NODE: locking.ALL_SET,
1578 self.share_locks[locking.LEVEL_NODE] = 1
1580 def CheckPrereq(self):
1581 """Check prerequisites.
1585 def Exec(self, feedback_fn):
1586 """Redistribute the configuration.
1589 self.cfg.Update(self.cfg.GetClusterInfo())
1592 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1593 """Sleep and poll for an instance's disk to sync.
1596 if not instance.disks:
1600 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1602 node = instance.primary_node
1604 for dev in instance.disks:
1605 lu.cfg.SetDiskID(dev, node)
1608 degr_retries = 10 # in seconds, as we sleep 1 second each time
1612 cumul_degraded = False
1613 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1614 if rstats.failed or not rstats.data:
1615 lu.LogWarning("Can't get any data from node %s", node)
1618 raise errors.RemoteError("Can't contact node %s for mirror data,"
1619 " aborting." % node)
1622 rstats = rstats.data
1624 for i, mstat in enumerate(rstats):
1626 lu.LogWarning("Can't compute data for node %s/%s",
1627 node, instance.disks[i].iv_name)
1629 # we ignore the ldisk parameter
1630 perc_done, est_time, is_degraded, _ = mstat
1631 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1632 if perc_done is not None:
1634 if est_time is not None:
1635 rem_time = "%d estimated seconds remaining" % est_time
1638 rem_time = "no time estimate"
1639 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1640 (instance.disks[i].iv_name, perc_done, rem_time))
1642 # if we're done but degraded, let's do a few small retries, to
1643 # make sure we see a stable and not transient situation; therefore
1644 # we force restart of the loop
1645 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1646 logging.info("Degraded disks found, %d retries left", degr_retries)
1654 time.sleep(min(60, max_time))
1657 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1658 return not cumul_degraded
1661 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1662 """Check that mirrors are not degraded.
1664 The ldisk parameter, if True, will change the test from the
1665 is_degraded attribute (which represents overall non-ok status for
1666 the device(s)) to the ldisk (representing the local storage status).
1669 lu.cfg.SetDiskID(dev, node)
1676 if on_primary or dev.AssembleOnSecondary():
1677 rstats = lu.rpc.call_blockdev_find(node, dev)
1678 msg = rstats.RemoteFailMsg()
1680 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1682 elif not rstats.payload:
1683 lu.LogWarning("Can't find disk on node %s", node)
1686 result = result and (not rstats.payload[idx])
1688 for child in dev.children:
1689 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1694 class LUDiagnoseOS(NoHooksLU):
1695 """Logical unit for OS diagnose/query.
1698 _OP_REQP = ["output_fields", "names"]
1700 _FIELDS_STATIC = utils.FieldSet()
1701 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1703 def ExpandNames(self):
1705 raise errors.OpPrereqError("Selective OS query not supported")
1707 _CheckOutputFields(static=self._FIELDS_STATIC,
1708 dynamic=self._FIELDS_DYNAMIC,
1709 selected=self.op.output_fields)
1711 # Lock all nodes, in shared mode
1712 # Temporary removal of locks, should be reverted later
1713 # TODO: reintroduce locks when they are lighter-weight
1714 self.needed_locks = {}
1715 #self.share_locks[locking.LEVEL_NODE] = 1
1716 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1718 def CheckPrereq(self):
1719 """Check prerequisites.
1724 def _DiagnoseByOS(node_list, rlist):
1725 """Remaps a per-node return list into an a per-os per-node dictionary
1727 @param node_list: a list with the names of all nodes
1728 @param rlist: a map with node names as keys and OS objects as values
1731 @return: a dictionary with osnames as keys and as value another map, with
1732 nodes as keys and list of OS objects as values, eg::
1734 {"debian-etch": {"node1": [<object>,...],
1735 "node2": [<object>,]}
1740 # we build here the list of nodes that didn't fail the RPC (at RPC
1741 # level), so that nodes with a non-responding node daemon don't
1742 # make all OSes invalid
1743 good_nodes = [node_name for node_name in rlist
1744 if not rlist[node_name].failed]
1745 for node_name, nr in rlist.iteritems():
1746 if nr.failed or not nr.data:
1748 for os_obj in nr.data:
1749 if os_obj.name not in all_os:
1750 # build a list of nodes for this os containing empty lists
1751 # for each node in node_list
1752 all_os[os_obj.name] = {}
1753 for nname in good_nodes:
1754 all_os[os_obj.name][nname] = []
1755 all_os[os_obj.name][node_name].append(os_obj)
1758 def Exec(self, feedback_fn):
1759 """Compute the list of OSes.
1762 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1763 node_data = self.rpc.call_os_diagnose(valid_nodes)
1764 if node_data == False:
1765 raise errors.OpExecError("Can't gather the list of OSes")
1766 pol = self._DiagnoseByOS(valid_nodes, node_data)
1768 for os_name, os_data in pol.iteritems():
1770 for field in self.op.output_fields:
1773 elif field == "valid":
1774 val = utils.all([osl and osl[0] for osl in os_data.values()])
1775 elif field == "node_status":
1777 for node_name, nos_list in os_data.iteritems():
1778 val[node_name] = [(v.status, v.path) for v in nos_list]
1780 raise errors.ParameterError(field)
1787 class LURemoveNode(LogicalUnit):
1788 """Logical unit for removing a node.
1791 HPATH = "node-remove"
1792 HTYPE = constants.HTYPE_NODE
1793 _OP_REQP = ["node_name"]
1795 def BuildHooksEnv(self):
1798 This doesn't run on the target node in the pre phase as a failed
1799 node would then be impossible to remove.
1803 "OP_TARGET": self.op.node_name,
1804 "NODE_NAME": self.op.node_name,
1806 all_nodes = self.cfg.GetNodeList()
1807 all_nodes.remove(self.op.node_name)
1808 return env, all_nodes, all_nodes
1810 def CheckPrereq(self):
1811 """Check prerequisites.
1814 - the node exists in the configuration
1815 - it does not have primary or secondary instances
1816 - it's not the master
1818 Any errors are signalled by raising errors.OpPrereqError.
1821 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1823 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1825 instance_list = self.cfg.GetInstanceList()
1827 masternode = self.cfg.GetMasterNode()
1828 if node.name == masternode:
1829 raise errors.OpPrereqError("Node is the master node,"
1830 " you need to failover first.")
1832 for instance_name in instance_list:
1833 instance = self.cfg.GetInstanceInfo(instance_name)
1834 if node.name in instance.all_nodes:
1835 raise errors.OpPrereqError("Instance %s is still running on the node,"
1836 " please remove first." % instance_name)
1837 self.op.node_name = node.name
1840 def Exec(self, feedback_fn):
1841 """Removes the node from the cluster.
1845 logging.info("Stopping the node daemon and removing configs from node %s",
1848 self.context.RemoveNode(node.name)
1850 self.rpc.call_node_leave_cluster(node.name)
1852 # Promote nodes to master candidate as needed
1853 _AdjustCandidatePool(self)
1856 class LUQueryNodes(NoHooksLU):
1857 """Logical unit for querying nodes.
1860 _OP_REQP = ["output_fields", "names", "use_locking"]
1862 _FIELDS_DYNAMIC = utils.FieldSet(
1864 "mtotal", "mnode", "mfree",
1866 "ctotal", "cnodes", "csockets",
1869 _FIELDS_STATIC = utils.FieldSet(
1870 "name", "pinst_cnt", "sinst_cnt",
1871 "pinst_list", "sinst_list",
1872 "pip", "sip", "tags",
1881 def ExpandNames(self):
1882 _CheckOutputFields(static=self._FIELDS_STATIC,
1883 dynamic=self._FIELDS_DYNAMIC,
1884 selected=self.op.output_fields)
1886 self.needed_locks = {}
1887 self.share_locks[locking.LEVEL_NODE] = 1
1890 self.wanted = _GetWantedNodes(self, self.op.names)
1892 self.wanted = locking.ALL_SET
1894 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1895 self.do_locking = self.do_node_query and self.op.use_locking
1897 # if we don't request only static fields, we need to lock the nodes
1898 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1901 def CheckPrereq(self):
1902 """Check prerequisites.
1905 # The validation of the node list is done in the _GetWantedNodes,
1906 # if non empty, and if empty, there's no validation to do
1909 def Exec(self, feedback_fn):
1910 """Computes the list of nodes and their attributes.
1913 all_info = self.cfg.GetAllNodesInfo()
1915 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1916 elif self.wanted != locking.ALL_SET:
1917 nodenames = self.wanted
1918 missing = set(nodenames).difference(all_info.keys())
1920 raise errors.OpExecError(
1921 "Some nodes were removed before retrieving their data: %s" % missing)
1923 nodenames = all_info.keys()
1925 nodenames = utils.NiceSort(nodenames)
1926 nodelist = [all_info[name] for name in nodenames]
1928 # begin data gathering
1930 if self.do_node_query:
1932 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1933 self.cfg.GetHypervisorType())
1934 for name in nodenames:
1935 nodeinfo = node_data[name]
1936 if not nodeinfo.failed and nodeinfo.data:
1937 nodeinfo = nodeinfo.data
1938 fn = utils.TryConvert
1940 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1941 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1942 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1943 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1944 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1945 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1946 "bootid": nodeinfo.get('bootid', None),
1947 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1948 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1951 live_data[name] = {}
1953 live_data = dict.fromkeys(nodenames, {})
1955 node_to_primary = dict([(name, set()) for name in nodenames])
1956 node_to_secondary = dict([(name, set()) for name in nodenames])
1958 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1959 "sinst_cnt", "sinst_list"))
1960 if inst_fields & frozenset(self.op.output_fields):
1961 instancelist = self.cfg.GetInstanceList()
1963 for instance_name in instancelist:
1964 inst = self.cfg.GetInstanceInfo(instance_name)
1965 if inst.primary_node in node_to_primary:
1966 node_to_primary[inst.primary_node].add(inst.name)
1967 for secnode in inst.secondary_nodes:
1968 if secnode in node_to_secondary:
1969 node_to_secondary[secnode].add(inst.name)
1971 master_node = self.cfg.GetMasterNode()
1973 # end data gathering
1976 for node in nodelist:
1978 for field in self.op.output_fields:
1981 elif field == "pinst_list":
1982 val = list(node_to_primary[node.name])
1983 elif field == "sinst_list":
1984 val = list(node_to_secondary[node.name])
1985 elif field == "pinst_cnt":
1986 val = len(node_to_primary[node.name])
1987 elif field == "sinst_cnt":
1988 val = len(node_to_secondary[node.name])
1989 elif field == "pip":
1990 val = node.primary_ip
1991 elif field == "sip":
1992 val = node.secondary_ip
1993 elif field == "tags":
1994 val = list(node.GetTags())
1995 elif field == "serial_no":
1996 val = node.serial_no
1997 elif field == "master_candidate":
1998 val = node.master_candidate
1999 elif field == "master":
2000 val = node.name == master_node
2001 elif field == "offline":
2003 elif field == "drained":
2005 elif self._FIELDS_DYNAMIC.Matches(field):
2006 val = live_data[node.name].get(field, None)
2007 elif field == "role":
2008 if node.name == master_node:
2010 elif node.master_candidate:
2019 raise errors.ParameterError(field)
2020 node_output.append(val)
2021 output.append(node_output)
2026 class LUQueryNodeVolumes(NoHooksLU):
2027 """Logical unit for getting volumes on node(s).
2030 _OP_REQP = ["nodes", "output_fields"]
2032 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2033 _FIELDS_STATIC = utils.FieldSet("node")
2035 def ExpandNames(self):
2036 _CheckOutputFields(static=self._FIELDS_STATIC,
2037 dynamic=self._FIELDS_DYNAMIC,
2038 selected=self.op.output_fields)
2040 self.needed_locks = {}
2041 self.share_locks[locking.LEVEL_NODE] = 1
2042 if not self.op.nodes:
2043 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2045 self.needed_locks[locking.LEVEL_NODE] = \
2046 _GetWantedNodes(self, self.op.nodes)
2048 def CheckPrereq(self):
2049 """Check prerequisites.
2051 This checks that the fields required are valid output fields.
2054 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2056 def Exec(self, feedback_fn):
2057 """Computes the list of nodes and their attributes.
2060 nodenames = self.nodes
2061 volumes = self.rpc.call_node_volumes(nodenames)
2063 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2064 in self.cfg.GetInstanceList()]
2066 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2069 for node in nodenames:
2070 if node not in volumes or volumes[node].failed or not volumes[node].data:
2073 node_vols = volumes[node].data[:]
2074 node_vols.sort(key=lambda vol: vol['dev'])
2076 for vol in node_vols:
2078 for field in self.op.output_fields:
2081 elif field == "phys":
2085 elif field == "name":
2087 elif field == "size":
2088 val = int(float(vol['size']))
2089 elif field == "instance":
2091 if node not in lv_by_node[inst]:
2093 if vol['name'] in lv_by_node[inst][node]:
2099 raise errors.ParameterError(field)
2100 node_output.append(str(val))
2102 output.append(node_output)
2107 class LUAddNode(LogicalUnit):
2108 """Logical unit for adding node to the cluster.
2112 HTYPE = constants.HTYPE_NODE
2113 _OP_REQP = ["node_name"]
2115 def BuildHooksEnv(self):
2118 This will run on all nodes before, and on all nodes + the new node after.
2122 "OP_TARGET": self.op.node_name,
2123 "NODE_NAME": self.op.node_name,
2124 "NODE_PIP": self.op.primary_ip,
2125 "NODE_SIP": self.op.secondary_ip,
2127 nodes_0 = self.cfg.GetNodeList()
2128 nodes_1 = nodes_0 + [self.op.node_name, ]
2129 return env, nodes_0, nodes_1
2131 def CheckPrereq(self):
2132 """Check prerequisites.
2135 - the new node is not already in the config
2137 - its parameters (single/dual homed) matches the cluster
2139 Any errors are signalled by raising errors.OpPrereqError.
2142 node_name = self.op.node_name
2145 dns_data = utils.HostInfo(node_name)
2147 node = dns_data.name
2148 primary_ip = self.op.primary_ip = dns_data.ip
2149 secondary_ip = getattr(self.op, "secondary_ip", None)
2150 if secondary_ip is None:
2151 secondary_ip = primary_ip
2152 if not utils.IsValidIP(secondary_ip):
2153 raise errors.OpPrereqError("Invalid secondary IP given")
2154 self.op.secondary_ip = secondary_ip
2156 node_list = cfg.GetNodeList()
2157 if not self.op.readd and node in node_list:
2158 raise errors.OpPrereqError("Node %s is already in the configuration" %
2160 elif self.op.readd and node not in node_list:
2161 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2163 for existing_node_name in node_list:
2164 existing_node = cfg.GetNodeInfo(existing_node_name)
2166 if self.op.readd and node == existing_node_name:
2167 if (existing_node.primary_ip != primary_ip or
2168 existing_node.secondary_ip != secondary_ip):
2169 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2170 " address configuration as before")
2173 if (existing_node.primary_ip == primary_ip or
2174 existing_node.secondary_ip == primary_ip or
2175 existing_node.primary_ip == secondary_ip or
2176 existing_node.secondary_ip == secondary_ip):
2177 raise errors.OpPrereqError("New node ip address(es) conflict with"
2178 " existing node %s" % existing_node.name)
2180 # check that the type of the node (single versus dual homed) is the
2181 # same as for the master
2182 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2183 master_singlehomed = myself.secondary_ip == myself.primary_ip
2184 newbie_singlehomed = secondary_ip == primary_ip
2185 if master_singlehomed != newbie_singlehomed:
2186 if master_singlehomed:
2187 raise errors.OpPrereqError("The master has no private ip but the"
2188 " new node has one")
2190 raise errors.OpPrereqError("The master has a private ip but the"
2191 " new node doesn't have one")
2193 # checks reachablity
2194 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2195 raise errors.OpPrereqError("Node not reachable by ping")
2197 if not newbie_singlehomed:
2198 # check reachability from my secondary ip to newbie's secondary ip
2199 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2200 source=myself.secondary_ip):
2201 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2202 " based ping to noded port")
2204 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2209 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2210 # the new node will increase mc_max with one, so:
2211 mc_max = min(mc_max + 1, cp_size)
2212 self.master_candidate = mc_now < mc_max
2215 self.new_node = self.cfg.GetNodeInfo(node)
2216 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2218 self.new_node = objects.Node(name=node,
2219 primary_ip=primary_ip,
2220 secondary_ip=secondary_ip,
2221 master_candidate=self.master_candidate,
2222 offline=False, drained=False)
2224 def Exec(self, feedback_fn):
2225 """Adds the new node to the cluster.
2228 new_node = self.new_node
2229 node = new_node.name
2231 # for re-adds, reset the offline/drained/master-candidate flags;
2232 # we need to reset here, otherwise offline would prevent RPC calls
2233 # later in the procedure; this also means that if the re-add
2234 # fails, we are left with a non-offlined, broken node
2236 new_node.drained = new_node.offline = False
2237 self.LogInfo("Readding a node, the offline/drained flags were reset")
2238 # if we demote the node, we do cleanup later in the procedure
2239 new_node.master_candidate = self.master_candidate
2241 # notify the user about any possible mc promotion
2242 if new_node.master_candidate:
2243 self.LogInfo("Node will be a master candidate")
2245 # check connectivity
2246 result = self.rpc.call_version([node])[node]
2249 if constants.PROTOCOL_VERSION == result.data:
2250 logging.info("Communication to node %s fine, sw version %s match",
2253 raise errors.OpExecError("Version mismatch master version %s,"
2254 " node version %s" %
2255 (constants.PROTOCOL_VERSION, result.data))
2257 raise errors.OpExecError("Cannot get version from the new node")
2260 logging.info("Copy ssh key to node %s", node)
2261 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2263 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2264 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2270 keyarray.append(f.read())
2274 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2276 keyarray[3], keyarray[4], keyarray[5])
2278 msg = result.RemoteFailMsg()
2280 raise errors.OpExecError("Cannot transfer ssh keys to the"
2281 " new node: %s" % msg)
2283 # Add node to our /etc/hosts, and add key to known_hosts
2284 utils.AddHostToEtcHosts(new_node.name)
2286 if new_node.secondary_ip != new_node.primary_ip:
2287 result = self.rpc.call_node_has_ip_address(new_node.name,
2288 new_node.secondary_ip)
2289 if result.failed or not result.data:
2290 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2291 " you gave (%s). Please fix and re-run this"
2292 " command." % new_node.secondary_ip)
2294 node_verify_list = [self.cfg.GetMasterNode()]
2295 node_verify_param = {
2297 # TODO: do a node-net-test as well?
2300 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2301 self.cfg.GetClusterName())
2302 for verifier in node_verify_list:
2303 if result[verifier].failed or not result[verifier].data:
2304 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2305 " for remote verification" % verifier)
2306 if result[verifier].data['nodelist']:
2307 for failed in result[verifier].data['nodelist']:
2308 feedback_fn("ssh/hostname verification failed %s -> %s" %
2309 (verifier, result[verifier].data['nodelist'][failed]))
2310 raise errors.OpExecError("ssh/hostname verification failed.")
2312 # Distribute updated /etc/hosts and known_hosts to all nodes,
2313 # including the node just added
2314 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2315 dist_nodes = self.cfg.GetNodeList()
2316 if not self.op.readd:
2317 dist_nodes.append(node)
2318 if myself.name in dist_nodes:
2319 dist_nodes.remove(myself.name)
2321 logging.debug("Copying hosts and known_hosts to all nodes")
2322 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2323 result = self.rpc.call_upload_file(dist_nodes, fname)
2324 for to_node, to_result in result.iteritems():
2325 if to_result.failed or not to_result.data:
2326 logging.error("Copy of file %s to node %s failed", fname, to_node)
2329 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2330 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2331 to_copy.append(constants.VNC_PASSWORD_FILE)
2333 for fname in to_copy:
2334 result = self.rpc.call_upload_file([node], fname)
2335 if result[node].failed or not result[node]:
2336 logging.error("Could not copy file %s to node %s", fname, node)
2339 self.context.ReaddNode(new_node)
2340 # make sure we redistribute the config
2341 self.cfg.Update(new_node)
2342 # and make sure the new node will not have old files around
2343 if not new_node.master_candidate:
2344 result = self.rpc.call_node_demote_from_mc(new_node.name)
2345 msg = result.RemoteFailMsg()
2347 self.LogWarning("Node failed to demote itself from master"
2348 " candidate status: %s" % msg)
2350 self.context.AddNode(new_node)
2353 class LUSetNodeParams(LogicalUnit):
2354 """Modifies the parameters of a node.
2357 HPATH = "node-modify"
2358 HTYPE = constants.HTYPE_NODE
2359 _OP_REQP = ["node_name"]
2362 def CheckArguments(self):
2363 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364 if node_name is None:
2365 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366 self.op.node_name = node_name
2367 _CheckBooleanOpField(self.op, 'master_candidate')
2368 _CheckBooleanOpField(self.op, 'offline')
2369 _CheckBooleanOpField(self.op, 'drained')
2370 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371 if all_mods.count(None) == 3:
2372 raise errors.OpPrereqError("Please pass at least one modification")
2373 if all_mods.count(True) > 1:
2374 raise errors.OpPrereqError("Can't set the node into more than one"
2375 " state at the same time")
2377 def ExpandNames(self):
2378 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380 def BuildHooksEnv(self):
2383 This runs on the master node.
2387 "OP_TARGET": self.op.node_name,
2388 "MASTER_CANDIDATE": str(self.op.master_candidate),
2389 "OFFLINE": str(self.op.offline),
2390 "DRAINED": str(self.op.drained),
2392 nl = [self.cfg.GetMasterNode(),
2396 def CheckPrereq(self):
2397 """Check prerequisites.
2399 This only checks the instance list against the existing names.
2402 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404 if ((self.op.master_candidate == False or self.op.offline == True or
2405 self.op.drained == True) and node.master_candidate):
2406 # we will demote the node from master_candidate
2407 if self.op.node_name == self.cfg.GetMasterNode():
2408 raise errors.OpPrereqError("The master node has to be a"
2409 " master candidate, online and not drained")
2410 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412 if num_candidates <= cp_size:
2413 msg = ("Not enough master candidates (desired"
2414 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416 self.LogWarning(msg)
2418 raise errors.OpPrereqError(msg)
2420 if (self.op.master_candidate == True and
2421 ((node.offline and not self.op.offline == False) or
2422 (node.drained and not self.op.drained == False))):
2423 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424 " to master_candidate" % node.name)
2428 def Exec(self, feedback_fn):
2437 if self.op.offline is not None:
2438 node.offline = self.op.offline
2439 result.append(("offline", str(self.op.offline)))
2440 if self.op.offline == True:
2441 if node.master_candidate:
2442 node.master_candidate = False
2444 result.append(("master_candidate", "auto-demotion due to offline"))
2446 node.drained = False
2447 result.append(("drained", "clear drained status due to offline"))
2449 if self.op.master_candidate is not None:
2450 node.master_candidate = self.op.master_candidate
2452 result.append(("master_candidate", str(self.op.master_candidate)))
2453 if self.op.master_candidate == False:
2454 rrc = self.rpc.call_node_demote_from_mc(node.name)
2455 msg = rrc.RemoteFailMsg()
2457 self.LogWarning("Node failed to demote itself: %s" % msg)
2459 if self.op.drained is not None:
2460 node.drained = self.op.drained
2461 result.append(("drained", str(self.op.drained)))
2462 if self.op.drained == True:
2463 if node.master_candidate:
2464 node.master_candidate = False
2466 result.append(("master_candidate", "auto-demotion due to drain"))
2467 rrc = self.rpc.call_node_demote_from_mc(node.name)
2468 msg = rrc.RemoteFailMsg()
2470 self.LogWarning("Node failed to demote itself: %s" % msg)
2472 node.offline = False
2473 result.append(("offline", "clear offline status due to drain"))
2475 # this will trigger configuration file update, if needed
2476 self.cfg.Update(node)
2477 # this will trigger job queue propagation or cleanup
2479 self.context.ReaddNode(node)
2484 class LUQueryClusterInfo(NoHooksLU):
2485 """Query cluster configuration.
2491 def ExpandNames(self):
2492 self.needed_locks = {}
2494 def CheckPrereq(self):
2495 """No prerequsites needed for this LU.
2500 def Exec(self, feedback_fn):
2501 """Return cluster config.
2504 cluster = self.cfg.GetClusterInfo()
2506 "software_version": constants.RELEASE_VERSION,
2507 "protocol_version": constants.PROTOCOL_VERSION,
2508 "config_version": constants.CONFIG_VERSION,
2509 "os_api_version": constants.OS_API_VERSION,
2510 "export_version": constants.EXPORT_VERSION,
2511 "architecture": (platform.architecture()[0], platform.machine()),
2512 "name": cluster.cluster_name,
2513 "master": cluster.master_node,
2514 "default_hypervisor": cluster.default_hypervisor,
2515 "enabled_hypervisors": cluster.enabled_hypervisors,
2516 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2517 for hypervisor in cluster.enabled_hypervisors]),
2518 "beparams": cluster.beparams,
2519 "candidate_pool_size": cluster.candidate_pool_size,
2520 "default_bridge": cluster.default_bridge,
2521 "master_netdev": cluster.master_netdev,
2522 "volume_group_name": cluster.volume_group_name,
2523 "file_storage_dir": cluster.file_storage_dir,
2529 class LUQueryConfigValues(NoHooksLU):
2530 """Return configuration values.
2535 _FIELDS_DYNAMIC = utils.FieldSet()
2536 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2538 def ExpandNames(self):
2539 self.needed_locks = {}
2541 _CheckOutputFields(static=self._FIELDS_STATIC,
2542 dynamic=self._FIELDS_DYNAMIC,
2543 selected=self.op.output_fields)
2545 def CheckPrereq(self):
2546 """No prerequisites.
2551 def Exec(self, feedback_fn):
2552 """Dump a representation of the cluster config to the standard output.
2556 for field in self.op.output_fields:
2557 if field == "cluster_name":
2558 entry = self.cfg.GetClusterName()
2559 elif field == "master_node":
2560 entry = self.cfg.GetMasterNode()
2561 elif field == "drain_flag":
2562 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2564 raise errors.ParameterError(field)
2565 values.append(entry)
2569 class LUActivateInstanceDisks(NoHooksLU):
2570 """Bring up an instance's disks.
2573 _OP_REQP = ["instance_name"]
2576 def ExpandNames(self):
2577 self._ExpandAndLockInstance()
2578 self.needed_locks[locking.LEVEL_NODE] = []
2579 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2581 def DeclareLocks(self, level):
2582 if level == locking.LEVEL_NODE:
2583 self._LockInstancesNodes()
2585 def CheckPrereq(self):
2586 """Check prerequisites.
2588 This checks that the instance is in the cluster.
2591 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2592 assert self.instance is not None, \
2593 "Cannot retrieve locked instance %s" % self.op.instance_name
2594 _CheckNodeOnline(self, self.instance.primary_node)
2596 def Exec(self, feedback_fn):
2597 """Activate the disks.
2600 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2602 raise errors.OpExecError("Cannot activate block devices")
2607 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2608 """Prepare the block devices for an instance.
2610 This sets up the block devices on all nodes.
2612 @type lu: L{LogicalUnit}
2613 @param lu: the logical unit on whose behalf we execute
2614 @type instance: L{objects.Instance}
2615 @param instance: the instance for whose disks we assemble
2616 @type ignore_secondaries: boolean
2617 @param ignore_secondaries: if true, errors on secondary nodes
2618 won't result in an error return from the function
2619 @return: False if the operation failed, otherwise a list of
2620 (host, instance_visible_name, node_visible_name)
2621 with the mapping from node devices to instance devices
2626 iname = instance.name
2627 # With the two passes mechanism we try to reduce the window of
2628 # opportunity for the race condition of switching DRBD to primary
2629 # before handshaking occured, but we do not eliminate it
2631 # The proper fix would be to wait (with some limits) until the
2632 # connection has been made and drbd transitions from WFConnection
2633 # into any other network-connected state (Connected, SyncTarget,
2636 # 1st pass, assemble on all nodes in secondary mode
2637 for inst_disk in instance.disks:
2638 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2639 lu.cfg.SetDiskID(node_disk, node)
2640 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2641 msg = result.RemoteFailMsg()
2643 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2644 " (is_primary=False, pass=1): %s",
2645 inst_disk.iv_name, node, msg)
2646 if not ignore_secondaries:
2649 # FIXME: race condition on drbd migration to primary
2651 # 2nd pass, do only the primary node
2652 for inst_disk in instance.disks:
2653 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2654 if node != instance.primary_node:
2656 lu.cfg.SetDiskID(node_disk, node)
2657 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2658 msg = result.RemoteFailMsg()
2660 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2661 " (is_primary=True, pass=2): %s",
2662 inst_disk.iv_name, node, msg)
2664 device_info.append((instance.primary_node, inst_disk.iv_name,
2667 # leave the disks configured for the primary node
2668 # this is a workaround that would be fixed better by
2669 # improving the logical/physical id handling
2670 for disk in instance.disks:
2671 lu.cfg.SetDiskID(disk, instance.primary_node)
2673 return disks_ok, device_info
2676 def _StartInstanceDisks(lu, instance, force):
2677 """Start the disks of an instance.
2680 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2681 ignore_secondaries=force)
2683 _ShutdownInstanceDisks(lu, instance)
2684 if force is not None and not force:
2685 lu.proc.LogWarning("", hint="If the message above refers to a"
2687 " you can retry the operation using '--force'.")
2688 raise errors.OpExecError("Disk consistency error")
2691 class LUDeactivateInstanceDisks(NoHooksLU):
2692 """Shutdown an instance's disks.
2695 _OP_REQP = ["instance_name"]
2698 def ExpandNames(self):
2699 self._ExpandAndLockInstance()
2700 self.needed_locks[locking.LEVEL_NODE] = []
2701 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2703 def DeclareLocks(self, level):
2704 if level == locking.LEVEL_NODE:
2705 self._LockInstancesNodes()
2707 def CheckPrereq(self):
2708 """Check prerequisites.
2710 This checks that the instance is in the cluster.
2713 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2714 assert self.instance is not None, \
2715 "Cannot retrieve locked instance %s" % self.op.instance_name
2717 def Exec(self, feedback_fn):
2718 """Deactivate the disks
2721 instance = self.instance
2722 _SafeShutdownInstanceDisks(self, instance)
2725 def _SafeShutdownInstanceDisks(lu, instance):
2726 """Shutdown block devices of an instance.
2728 This function checks if an instance is running, before calling
2729 _ShutdownInstanceDisks.
2732 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2733 [instance.hypervisor])
2734 ins_l = ins_l[instance.primary_node]
2735 if ins_l.failed or not isinstance(ins_l.data, list):
2736 raise errors.OpExecError("Can't contact node '%s'" %
2737 instance.primary_node)
2739 if instance.name in ins_l.data:
2740 raise errors.OpExecError("Instance is running, can't shutdown"
2743 _ShutdownInstanceDisks(lu, instance)
2746 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2747 """Shutdown block devices of an instance.
2749 This does the shutdown on all nodes of the instance.
2751 If the ignore_primary is false, errors on the primary node are
2756 for disk in instance.disks:
2757 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2758 lu.cfg.SetDiskID(top_disk, node)
2759 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2760 msg = result.RemoteFailMsg()
2762 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2763 disk.iv_name, node, msg)
2764 if not ignore_primary or node != instance.primary_node:
2769 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2770 """Checks if a node has enough free memory.
2772 This function check if a given node has the needed amount of free
2773 memory. In case the node has less memory or we cannot get the
2774 information from the node, this function raise an OpPrereqError
2777 @type lu: C{LogicalUnit}
2778 @param lu: a logical unit from which we get configuration data
2780 @param node: the node to check
2781 @type reason: C{str}
2782 @param reason: string to use in the error message
2783 @type requested: C{int}
2784 @param requested: the amount of memory in MiB to check for
2785 @type hypervisor_name: C{str}
2786 @param hypervisor_name: the hypervisor to ask for memory stats
2787 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2788 we cannot check the node
2791 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2792 nodeinfo[node].Raise()
2793 free_mem = nodeinfo[node].data.get('memory_free')
2794 if not isinstance(free_mem, int):
2795 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2796 " was '%s'" % (node, free_mem))
2797 if requested > free_mem:
2798 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2799 " needed %s MiB, available %s MiB" %
2800 (node, reason, requested, free_mem))
2803 class LUStartupInstance(LogicalUnit):
2804 """Starts an instance.
2807 HPATH = "instance-start"
2808 HTYPE = constants.HTYPE_INSTANCE
2809 _OP_REQP = ["instance_name", "force"]
2812 def ExpandNames(self):
2813 self._ExpandAndLockInstance()
2815 def BuildHooksEnv(self):
2818 This runs on master, primary and secondary nodes of the instance.
2822 "FORCE": self.op.force,
2824 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2825 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2828 def CheckPrereq(self):
2829 """Check prerequisites.
2831 This checks that the instance is in the cluster.
2834 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2835 assert self.instance is not None, \
2836 "Cannot retrieve locked instance %s" % self.op.instance_name
2839 self.beparams = getattr(self.op, "beparams", {})
2841 if not isinstance(self.beparams, dict):
2842 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2843 " dict" % (type(self.beparams), ))
2844 # fill the beparams dict
2845 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2846 self.op.beparams = self.beparams
2849 self.hvparams = getattr(self.op, "hvparams", {})
2851 if not isinstance(self.hvparams, dict):
2852 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2853 " dict" % (type(self.hvparams), ))
2855 # check hypervisor parameter syntax (locally)
2856 cluster = self.cfg.GetClusterInfo()
2857 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2858 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2860 filled_hvp.update(self.hvparams)
2861 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2862 hv_type.CheckParameterSyntax(filled_hvp)
2863 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2864 self.op.hvparams = self.hvparams
2866 _CheckNodeOnline(self, instance.primary_node)
2868 bep = self.cfg.GetClusterInfo().FillBE(instance)
2869 # check bridges existance
2870 _CheckInstanceBridgesExist(self, instance)
2872 remote_info = self.rpc.call_instance_info(instance.primary_node,
2874 instance.hypervisor)
2876 if not remote_info.data:
2877 _CheckNodeFreeMemory(self, instance.primary_node,
2878 "starting instance %s" % instance.name,
2879 bep[constants.BE_MEMORY], instance.hypervisor)
2881 def Exec(self, feedback_fn):
2882 """Start the instance.
2885 instance = self.instance
2886 force = self.op.force
2888 self.cfg.MarkInstanceUp(instance.name)
2890 node_current = instance.primary_node
2892 _StartInstanceDisks(self, instance, force)
2894 result = self.rpc.call_instance_start(node_current, instance,
2895 self.hvparams, self.beparams)
2896 msg = result.RemoteFailMsg()
2898 _ShutdownInstanceDisks(self, instance)
2899 raise errors.OpExecError("Could not start instance: %s" % msg)
2902 class LURebootInstance(LogicalUnit):
2903 """Reboot an instance.
2906 HPATH = "instance-reboot"
2907 HTYPE = constants.HTYPE_INSTANCE
2908 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2911 def ExpandNames(self):
2912 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2913 constants.INSTANCE_REBOOT_HARD,
2914 constants.INSTANCE_REBOOT_FULL]:
2915 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2916 (constants.INSTANCE_REBOOT_SOFT,
2917 constants.INSTANCE_REBOOT_HARD,
2918 constants.INSTANCE_REBOOT_FULL))
2919 self._ExpandAndLockInstance()
2921 def BuildHooksEnv(self):
2924 This runs on master, primary and secondary nodes of the instance.
2928 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2929 "REBOOT_TYPE": self.op.reboot_type,
2931 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2932 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2935 def CheckPrereq(self):
2936 """Check prerequisites.
2938 This checks that the instance is in the cluster.
2941 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2942 assert self.instance is not None, \
2943 "Cannot retrieve locked instance %s" % self.op.instance_name
2945 _CheckNodeOnline(self, instance.primary_node)
2947 # check bridges existance
2948 _CheckInstanceBridgesExist(self, instance)
2950 def Exec(self, feedback_fn):
2951 """Reboot the instance.
2954 instance = self.instance
2955 ignore_secondaries = self.op.ignore_secondaries
2956 reboot_type = self.op.reboot_type
2958 node_current = instance.primary_node
2960 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2961 constants.INSTANCE_REBOOT_HARD]:
2962 for disk in instance.disks:
2963 self.cfg.SetDiskID(disk, node_current)
2964 result = self.rpc.call_instance_reboot(node_current, instance,
2966 msg = result.RemoteFailMsg()
2968 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2970 result = self.rpc.call_instance_shutdown(node_current, instance)
2971 msg = result.RemoteFailMsg()
2973 raise errors.OpExecError("Could not shutdown instance for"
2974 " full reboot: %s" % msg)
2975 _ShutdownInstanceDisks(self, instance)
2976 _StartInstanceDisks(self, instance, ignore_secondaries)
2977 result = self.rpc.call_instance_start(node_current, instance, None, None)
2978 msg = result.RemoteFailMsg()
2980 _ShutdownInstanceDisks(self, instance)
2981 raise errors.OpExecError("Could not start instance for"
2982 " full reboot: %s" % msg)
2984 self.cfg.MarkInstanceUp(instance.name)
2987 class LUShutdownInstance(LogicalUnit):
2988 """Shutdown an instance.
2991 HPATH = "instance-stop"
2992 HTYPE = constants.HTYPE_INSTANCE
2993 _OP_REQP = ["instance_name"]
2996 def ExpandNames(self):
2997 self._ExpandAndLockInstance()
2999 def BuildHooksEnv(self):
3002 This runs on master, primary and secondary nodes of the instance.
3005 env = _BuildInstanceHookEnvByObject(self, self.instance)
3006 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3009 def CheckPrereq(self):
3010 """Check prerequisites.
3012 This checks that the instance is in the cluster.
3015 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3016 assert self.instance is not None, \
3017 "Cannot retrieve locked instance %s" % self.op.instance_name
3018 _CheckNodeOnline(self, self.instance.primary_node)
3020 def Exec(self, feedback_fn):
3021 """Shutdown the instance.
3024 instance = self.instance
3025 node_current = instance.primary_node
3026 self.cfg.MarkInstanceDown(instance.name)
3027 result = self.rpc.call_instance_shutdown(node_current, instance)
3028 msg = result.RemoteFailMsg()
3030 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3032 _ShutdownInstanceDisks(self, instance)
3035 class LUReinstallInstance(LogicalUnit):
3036 """Reinstall an instance.
3039 HPATH = "instance-reinstall"
3040 HTYPE = constants.HTYPE_INSTANCE
3041 _OP_REQP = ["instance_name"]
3044 def ExpandNames(self):
3045 self._ExpandAndLockInstance()
3047 def BuildHooksEnv(self):
3050 This runs on master, primary and secondary nodes of the instance.
3053 env = _BuildInstanceHookEnvByObject(self, self.instance)
3054 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3057 def CheckPrereq(self):
3058 """Check prerequisites.
3060 This checks that the instance is in the cluster and is not running.
3063 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3064 assert instance is not None, \
3065 "Cannot retrieve locked instance %s" % self.op.instance_name
3066 _CheckNodeOnline(self, instance.primary_node)
3068 if instance.disk_template == constants.DT_DISKLESS:
3069 raise errors.OpPrereqError("Instance '%s' has no disks" %
3070 self.op.instance_name)
3071 if instance.admin_up:
3072 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3073 self.op.instance_name)
3074 remote_info = self.rpc.call_instance_info(instance.primary_node,
3076 instance.hypervisor)
3078 if remote_info.data:
3079 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3080 (self.op.instance_name,
3081 instance.primary_node))
3083 self.op.os_type = getattr(self.op, "os_type", None)
3084 if self.op.os_type is not None:
3086 pnode = self.cfg.GetNodeInfo(
3087 self.cfg.ExpandNodeName(instance.primary_node))
3089 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3091 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3093 if not isinstance(result.data, objects.OS):
3094 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3095 " primary node" % self.op.os_type)
3097 self.instance = instance
3099 def Exec(self, feedback_fn):
3100 """Reinstall the instance.
3103 inst = self.instance
3105 if self.op.os_type is not None:
3106 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3107 inst.os = self.op.os_type
3108 self.cfg.Update(inst)
3110 _StartInstanceDisks(self, inst, None)
3112 feedback_fn("Running the instance OS create scripts...")
3113 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3114 msg = result.RemoteFailMsg()
3116 raise errors.OpExecError("Could not install OS for instance %s"
3118 (inst.name, inst.primary_node, msg))
3120 _ShutdownInstanceDisks(self, inst)
3123 class LURenameInstance(LogicalUnit):
3124 """Rename an instance.
3127 HPATH = "instance-rename"
3128 HTYPE = constants.HTYPE_INSTANCE
3129 _OP_REQP = ["instance_name", "new_name"]
3131 def BuildHooksEnv(self):
3134 This runs on master, primary and secondary nodes of the instance.
3137 env = _BuildInstanceHookEnvByObject(self, self.instance)
3138 env["INSTANCE_NEW_NAME"] = self.op.new_name
3139 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3142 def CheckPrereq(self):
3143 """Check prerequisites.
3145 This checks that the instance is in the cluster and is not running.
3148 instance = self.cfg.GetInstanceInfo(
3149 self.cfg.ExpandInstanceName(self.op.instance_name))
3150 if instance is None:
3151 raise errors.OpPrereqError("Instance '%s' not known" %
3152 self.op.instance_name)
3153 _CheckNodeOnline(self, instance.primary_node)
3155 if instance.admin_up:
3156 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3157 self.op.instance_name)
3158 remote_info = self.rpc.call_instance_info(instance.primary_node,
3160 instance.hypervisor)
3162 if remote_info.data:
3163 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3164 (self.op.instance_name,
3165 instance.primary_node))
3166 self.instance = instance
3168 # new name verification
3169 name_info = utils.HostInfo(self.op.new_name)
3171 self.op.new_name = new_name = name_info.name
3172 instance_list = self.cfg.GetInstanceList()
3173 if new_name in instance_list:
3174 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3177 if not getattr(self.op, "ignore_ip", False):
3178 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3179 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3180 (name_info.ip, new_name))
3183 def Exec(self, feedback_fn):
3184 """Reinstall the instance.
3187 inst = self.instance
3188 old_name = inst.name
3190 if inst.disk_template == constants.DT_FILE:
3191 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3193 self.cfg.RenameInstance(inst.name, self.op.new_name)
3194 # Change the instance lock. This is definitely safe while we hold the BGL
3195 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3196 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3198 # re-read the instance from the configuration after rename
3199 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3201 if inst.disk_template == constants.DT_FILE:
3202 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3203 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3204 old_file_storage_dir,
3205 new_file_storage_dir)
3208 raise errors.OpExecError("Could not connect to node '%s' to rename"
3209 " directory '%s' to '%s' (but the instance"
3210 " has been renamed in Ganeti)" % (
3211 inst.primary_node, old_file_storage_dir,
3212 new_file_storage_dir))
3214 if not result.data[0]:
3215 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3216 " (but the instance has been renamed in"
3217 " Ganeti)" % (old_file_storage_dir,
3218 new_file_storage_dir))
3220 _StartInstanceDisks(self, inst, None)
3222 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3224 msg = result.RemoteFailMsg()
3226 msg = ("Could not run OS rename script for instance %s on node %s"
3227 " (but the instance has been renamed in Ganeti): %s" %
3228 (inst.name, inst.primary_node, msg))
3229 self.proc.LogWarning(msg)
3231 _ShutdownInstanceDisks(self, inst)
3234 class LURemoveInstance(LogicalUnit):
3235 """Remove an instance.
3238 HPATH = "instance-remove"
3239 HTYPE = constants.HTYPE_INSTANCE
3240 _OP_REQP = ["instance_name", "ignore_failures"]
3243 def ExpandNames(self):
3244 self._ExpandAndLockInstance()
3245 self.needed_locks[locking.LEVEL_NODE] = []
3246 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3248 def DeclareLocks(self, level):
3249 if level == locking.LEVEL_NODE:
3250 self._LockInstancesNodes()
3252 def BuildHooksEnv(self):
3255 This runs on master, primary and secondary nodes of the instance.
3258 env = _BuildInstanceHookEnvByObject(self, self.instance)
3259 nl = [self.cfg.GetMasterNode()]
3262 def CheckPrereq(self):
3263 """Check prerequisites.
3265 This checks that the instance is in the cluster.
3268 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3269 assert self.instance is not None, \
3270 "Cannot retrieve locked instance %s" % self.op.instance_name
3272 def Exec(self, feedback_fn):
3273 """Remove the instance.
3276 instance = self.instance
3277 logging.info("Shutting down instance %s on node %s",
3278 instance.name, instance.primary_node)
3280 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3281 msg = result.RemoteFailMsg()
3283 if self.op.ignore_failures:
3284 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3286 raise errors.OpExecError("Could not shutdown instance %s on"
3288 (instance.name, instance.primary_node, msg))
3290 logging.info("Removing block devices for instance %s", instance.name)
3292 if not _RemoveDisks(self, instance):
3293 if self.op.ignore_failures:
3294 feedback_fn("Warning: can't remove instance's disks")
3296 raise errors.OpExecError("Can't remove instance's disks")
3298 logging.info("Removing instance %s out of cluster config", instance.name)
3300 self.cfg.RemoveInstance(instance.name)
3301 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3304 class LUQueryInstances(NoHooksLU):
3305 """Logical unit for querying instances.
3308 _OP_REQP = ["output_fields", "names", "use_locking"]
3310 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3312 "disk_template", "ip", "mac", "bridge",
3313 "sda_size", "sdb_size", "vcpus", "tags",
3314 "network_port", "beparams",
3315 r"(disk)\.(size)/([0-9]+)",
3316 r"(disk)\.(sizes)", "disk_usage",
3317 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3318 r"(nic)\.(macs|ips|bridges)",
3319 r"(disk|nic)\.(count)",
3320 "serial_no", "hypervisor", "hvparams",] +
3322 for name in constants.HVS_PARAMETERS] +
3324 for name in constants.BES_PARAMETERS])
3325 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3328 def ExpandNames(self):
3329 _CheckOutputFields(static=self._FIELDS_STATIC,
3330 dynamic=self._FIELDS_DYNAMIC,
3331 selected=self.op.output_fields)
3333 self.needed_locks = {}
3334 self.share_locks[locking.LEVEL_INSTANCE] = 1
3335 self.share_locks[locking.LEVEL_NODE] = 1
3338 self.wanted = _GetWantedInstances(self, self.op.names)
3340 self.wanted = locking.ALL_SET
3342 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3343 self.do_locking = self.do_node_query and self.op.use_locking
3345 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3346 self.needed_locks[locking.LEVEL_NODE] = []
3347 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3349 def DeclareLocks(self, level):
3350 if level == locking.LEVEL_NODE and self.do_locking:
3351 self._LockInstancesNodes()
3353 def CheckPrereq(self):
3354 """Check prerequisites.
3359 def Exec(self, feedback_fn):
3360 """Computes the list of nodes and their attributes.
3363 all_info = self.cfg.GetAllInstancesInfo()
3364 if self.wanted == locking.ALL_SET:
3365 # caller didn't specify instance names, so ordering is not important
3367 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3369 instance_names = all_info.keys()
3370 instance_names = utils.NiceSort(instance_names)
3372 # caller did specify names, so we must keep the ordering
3374 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3376 tgt_set = all_info.keys()
3377 missing = set(self.wanted).difference(tgt_set)
3379 raise errors.OpExecError("Some instances were removed before"
3380 " retrieving their data: %s" % missing)
3381 instance_names = self.wanted
3383 instance_list = [all_info[iname] for iname in instance_names]
3385 # begin data gathering
3387 nodes = frozenset([inst.primary_node for inst in instance_list])
3388 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3392 if self.do_node_query:
3394 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3396 result = node_data[name]
3398 # offline nodes will be in both lists
3399 off_nodes.append(name)
3401 bad_nodes.append(name)
3404 live_data.update(result.data)
3405 # else no instance is alive
3407 live_data = dict([(name, {}) for name in instance_names])
3409 # end data gathering
3414 for instance in instance_list:
3416 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3417 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3418 for field in self.op.output_fields:
3419 st_match = self._FIELDS_STATIC.Matches(field)
3424 elif field == "pnode":
3425 val = instance.primary_node
3426 elif field == "snodes":
3427 val = list(instance.secondary_nodes)
3428 elif field == "admin_state":
3429 val = instance.admin_up
3430 elif field == "oper_state":
3431 if instance.primary_node in bad_nodes:
3434 val = bool(live_data.get(instance.name))
3435 elif field == "status":
3436 if instance.primary_node in off_nodes:
3437 val = "ERROR_nodeoffline"
3438 elif instance.primary_node in bad_nodes:
3439 val = "ERROR_nodedown"
3441 running = bool(live_data.get(instance.name))
3443 if instance.admin_up:
3448 if instance.admin_up:
3452 elif field == "oper_ram":
3453 if instance.primary_node in bad_nodes:
3455 elif instance.name in live_data:
3456 val = live_data[instance.name].get("memory", "?")
3459 elif field == "vcpus":
3460 val = i_be[constants.BE_VCPUS]
3461 elif field == "disk_template":
3462 val = instance.disk_template
3465 val = instance.nics[0].ip
3468 elif field == "bridge":
3470 val = instance.nics[0].bridge
3473 elif field == "mac":
3475 val = instance.nics[0].mac
3478 elif field == "sda_size" or field == "sdb_size":
3479 idx = ord(field[2]) - ord('a')
3481 val = instance.FindDisk(idx).size
3482 except errors.OpPrereqError:
3484 elif field == "disk_usage": # total disk usage per node
3485 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3486 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3487 elif field == "tags":
3488 val = list(instance.GetTags())
3489 elif field == "serial_no":
3490 val = instance.serial_no
3491 elif field == "network_port":
3492 val = instance.network_port
3493 elif field == "hypervisor":
3494 val = instance.hypervisor
3495 elif field == "hvparams":
3497 elif (field.startswith(HVPREFIX) and
3498 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3499 val = i_hv.get(field[len(HVPREFIX):], None)
3500 elif field == "beparams":
3502 elif (field.startswith(BEPREFIX) and
3503 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3504 val = i_be.get(field[len(BEPREFIX):], None)
3505 elif st_match and st_match.groups():
3506 # matches a variable list
3507 st_groups = st_match.groups()
3508 if st_groups and st_groups[0] == "disk":
3509 if st_groups[1] == "count":
3510 val = len(instance.disks)
3511 elif st_groups[1] == "sizes":
3512 val = [disk.size for disk in instance.disks]
3513 elif st_groups[1] == "size":
3515 val = instance.FindDisk(st_groups[2]).size
3516 except errors.OpPrereqError:
3519 assert False, "Unhandled disk parameter"
3520 elif st_groups[0] == "nic":
3521 if st_groups[1] == "count":
3522 val = len(instance.nics)
3523 elif st_groups[1] == "macs":
3524 val = [nic.mac for nic in instance.nics]
3525 elif st_groups[1] == "ips":
3526 val = [nic.ip for nic in instance.nics]
3527 elif st_groups[1] == "bridges":
3528 val = [nic.bridge for nic in instance.nics]
3531 nic_idx = int(st_groups[2])
3532 if nic_idx >= len(instance.nics):
3535 if st_groups[1] == "mac":
3536 val = instance.nics[nic_idx].mac
3537 elif st_groups[1] == "ip":
3538 val = instance.nics[nic_idx].ip
3539 elif st_groups[1] == "bridge":
3540 val = instance.nics[nic_idx].bridge
3542 assert False, "Unhandled NIC parameter"
3544 assert False, ("Declared but unhandled variable parameter '%s'" %
3547 assert False, "Declared but unhandled parameter '%s'" % field
3554 class LUFailoverInstance(LogicalUnit):
3555 """Failover an instance.
3558 HPATH = "instance-failover"
3559 HTYPE = constants.HTYPE_INSTANCE
3560 _OP_REQP = ["instance_name", "ignore_consistency"]
3563 def ExpandNames(self):
3564 self._ExpandAndLockInstance()
3565 self.needed_locks[locking.LEVEL_NODE] = []
3566 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3568 def DeclareLocks(self, level):
3569 if level == locking.LEVEL_NODE:
3570 self._LockInstancesNodes()
3572 def BuildHooksEnv(self):
3575 This runs on master, primary and secondary nodes of the instance.
3579 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3581 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3582 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3585 def CheckPrereq(self):
3586 """Check prerequisites.
3588 This checks that the instance is in the cluster.
3591 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3592 assert self.instance is not None, \
3593 "Cannot retrieve locked instance %s" % self.op.instance_name
3595 bep = self.cfg.GetClusterInfo().FillBE(instance)
3596 if instance.disk_template not in constants.DTS_NET_MIRROR:
3597 raise errors.OpPrereqError("Instance's disk layout is not"
3598 " network mirrored, cannot failover.")
3600 secondary_nodes = instance.secondary_nodes
3601 if not secondary_nodes:
3602 raise errors.ProgrammerError("no secondary node but using "
3603 "a mirrored disk template")
3605 target_node = secondary_nodes[0]
3606 _CheckNodeOnline(self, target_node)
3607 _CheckNodeNotDrained(self, target_node)
3609 if instance.admin_up:
3610 # check memory requirements on the secondary node
3611 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3612 instance.name, bep[constants.BE_MEMORY],
3613 instance.hypervisor)
3615 self.LogInfo("Not checking memory on the secondary node as"
3616 " instance will not be started")
3618 # check bridge existance
3619 brlist = [nic.bridge for nic in instance.nics]
3620 result = self.rpc.call_bridges_exist(target_node, brlist)
3623 raise errors.OpPrereqError("One or more target bridges %s does not"
3624 " exist on destination node '%s'" %
3625 (brlist, target_node))
3627 def Exec(self, feedback_fn):
3628 """Failover an instance.
3630 The failover is done by shutting it down on its present node and
3631 starting it on the secondary.
3634 instance = self.instance
3636 source_node = instance.primary_node
3637 target_node = instance.secondary_nodes[0]
3639 feedback_fn("* checking disk consistency between source and target")
3640 for dev in instance.disks:
3641 # for drbd, these are drbd over lvm
3642 if not _CheckDiskConsistency(self, dev, target_node, False):
3643 if instance.admin_up and not self.op.ignore_consistency:
3644 raise errors.OpExecError("Disk %s is degraded on target node,"
3645 " aborting failover." % dev.iv_name)
3647 feedback_fn("* shutting down instance on source node")
3648 logging.info("Shutting down instance %s on node %s",
3649 instance.name, source_node)
3651 result = self.rpc.call_instance_shutdown(source_node, instance)
3652 msg = result.RemoteFailMsg()
3654 if self.op.ignore_consistency:
3655 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3656 " Proceeding anyway. Please make sure node"
3657 " %s is down. Error details: %s",
3658 instance.name, source_node, source_node, msg)
3660 raise errors.OpExecError("Could not shutdown instance %s on"
3662 (instance.name, source_node, msg))
3664 feedback_fn("* deactivating the instance's disks on source node")
3665 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3666 raise errors.OpExecError("Can't shut down the instance's disks.")
3668 instance.primary_node = target_node
3669 # distribute new instance config to the other nodes
3670 self.cfg.Update(instance)
3672 # Only start the instance if it's marked as up
3673 if instance.admin_up:
3674 feedback_fn("* activating the instance's disks on target node")
3675 logging.info("Starting instance %s on node %s",
3676 instance.name, target_node)
3678 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3679 ignore_secondaries=True)
3681 _ShutdownInstanceDisks(self, instance)
3682 raise errors.OpExecError("Can't activate the instance's disks")
3684 feedback_fn("* starting the instance on the target node")
3685 result = self.rpc.call_instance_start(target_node, instance, None, None)
3686 msg = result.RemoteFailMsg()
3688 _ShutdownInstanceDisks(self, instance)
3689 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3690 (instance.name, target_node, msg))
3693 class LUMigrateInstance(LogicalUnit):
3694 """Migrate an instance.
3696 This is migration without shutting down, compared to the failover,
3697 which is done with shutdown.
3700 HPATH = "instance-migrate"
3701 HTYPE = constants.HTYPE_INSTANCE
3702 _OP_REQP = ["instance_name", "live", "cleanup"]
3706 def ExpandNames(self):
3707 self._ExpandAndLockInstance()
3708 self.needed_locks[locking.LEVEL_NODE] = []
3709 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3711 def DeclareLocks(self, level):
3712 if level == locking.LEVEL_NODE:
3713 self._LockInstancesNodes()
3715 def BuildHooksEnv(self):
3718 This runs on master, primary and secondary nodes of the instance.
3721 env = _BuildInstanceHookEnvByObject(self, self.instance)
3722 env["MIGRATE_LIVE"] = self.op.live
3723 env["MIGRATE_CLEANUP"] = self.op.cleanup
3724 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3727 def CheckPrereq(self):
3728 """Check prerequisites.
3730 This checks that the instance is in the cluster.
3733 instance = self.cfg.GetInstanceInfo(
3734 self.cfg.ExpandInstanceName(self.op.instance_name))
3735 if instance is None:
3736 raise errors.OpPrereqError("Instance '%s' not known" %
3737 self.op.instance_name)
3739 if instance.disk_template != constants.DT_DRBD8:
3740 raise errors.OpPrereqError("Instance's disk layout is not"
3741 " drbd8, cannot migrate.")
3743 secondary_nodes = instance.secondary_nodes
3744 if not secondary_nodes:
3745 raise errors.ConfigurationError("No secondary node but using"
3746 " drbd8 disk template")
3748 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3750 target_node = secondary_nodes[0]
3751 # check memory requirements on the secondary node
3752 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3753 instance.name, i_be[constants.BE_MEMORY],
3754 instance.hypervisor)
3756 # check bridge existance
3757 brlist = [nic.bridge for nic in instance.nics]
3758 result = self.rpc.call_bridges_exist(target_node, brlist)
3759 if result.failed or not result.data:
3760 raise errors.OpPrereqError("One or more target bridges %s does not"
3761 " exist on destination node '%s'" %
3762 (brlist, target_node))
3764 if not self.op.cleanup:
3765 _CheckNodeNotDrained(self, target_node)
3766 result = self.rpc.call_instance_migratable(instance.primary_node,
3768 msg = result.RemoteFailMsg()
3770 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3773 self.instance = instance
3775 def _WaitUntilSync(self):
3776 """Poll with custom rpc for disk sync.
3778 This uses our own step-based rpc call.
3781 self.feedback_fn("* wait until resync is done")
3785 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3787 self.instance.disks)
3789 for node, nres in result.items():
3790 msg = nres.RemoteFailMsg()
3792 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3794 node_done, node_percent = nres.payload
3795 all_done = all_done and node_done
3796 if node_percent is not None:
3797 min_percent = min(min_percent, node_percent)
3799 if min_percent < 100:
3800 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3803 def _EnsureSecondary(self, node):
3804 """Demote a node to secondary.
3807 self.feedback_fn("* switching node %s to secondary mode" % node)
3809 for dev in self.instance.disks:
3810 self.cfg.SetDiskID(dev, node)
3812 result = self.rpc.call_blockdev_close(node, self.instance.name,
3813 self.instance.disks)
3814 msg = result.RemoteFailMsg()
3816 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3817 " error %s" % (node, msg))
3819 def _GoStandalone(self):
3820 """Disconnect from the network.
3823 self.feedback_fn("* changing into standalone mode")
3824 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3825 self.instance.disks)
3826 for node, nres in result.items():
3827 msg = nres.RemoteFailMsg()
3829 raise errors.OpExecError("Cannot disconnect disks node %s,"
3830 " error %s" % (node, msg))
3832 def _GoReconnect(self, multimaster):
3833 """Reconnect to the network.
3839 msg = "single-master"
3840 self.feedback_fn("* changing disks into %s mode" % msg)
3841 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3842 self.instance.disks,
3843 self.instance.name, multimaster)
3844 for node, nres in result.items():
3845 msg = nres.RemoteFailMsg()
3847 raise errors.OpExecError("Cannot change disks config on node %s,"
3848 " error: %s" % (node, msg))
3850 def _ExecCleanup(self):
3851 """Try to cleanup after a failed migration.
3853 The cleanup is done by:
3854 - check that the instance is running only on one node
3855 (and update the config if needed)
3856 - change disks on its secondary node to secondary
3857 - wait until disks are fully synchronized
3858 - disconnect from the network
3859 - change disks into single-master mode
3860 - wait again until disks are fully synchronized
3863 instance = self.instance
3864 target_node = self.target_node
3865 source_node = self.source_node
3867 # check running on only one node
3868 self.feedback_fn("* checking where the instance actually runs"
3869 " (if this hangs, the hypervisor might be in"
3871 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3872 for node, result in ins_l.items():
3874 if not isinstance(result.data, list):
3875 raise errors.OpExecError("Can't contact node '%s'" % node)
3877 runningon_source = instance.name in ins_l[source_node].data
3878 runningon_target = instance.name in ins_l[target_node].data
3880 if runningon_source and runningon_target:
3881 raise errors.OpExecError("Instance seems to be running on two nodes,"
3882 " or the hypervisor is confused. You will have"
3883 " to ensure manually that it runs only on one"
3884 " and restart this operation.")
3886 if not (runningon_source or runningon_target):
3887 raise errors.OpExecError("Instance does not seem to be running at all."
3888 " In this case, it's safer to repair by"
3889 " running 'gnt-instance stop' to ensure disk"
3890 " shutdown, and then restarting it.")
3892 if runningon_target:
3893 # the migration has actually succeeded, we need to update the config
3894 self.feedback_fn("* instance running on secondary node (%s),"
3895 " updating config" % target_node)
3896 instance.primary_node = target_node
3897 self.cfg.Update(instance)
3898 demoted_node = source_node
3900 self.feedback_fn("* instance confirmed to be running on its"
3901 " primary node (%s)" % source_node)
3902 demoted_node = target_node
3904 self._EnsureSecondary(demoted_node)
3906 self._WaitUntilSync()
3907 except errors.OpExecError:
3908 # we ignore here errors, since if the device is standalone, it
3909 # won't be able to sync
3911 self._GoStandalone()
3912 self._GoReconnect(False)
3913 self._WaitUntilSync()
3915 self.feedback_fn("* done")
3917 def _RevertDiskStatus(self):
3918 """Try to revert the disk status after a failed migration.
3921 target_node = self.target_node
3923 self._EnsureSecondary(target_node)
3924 self._GoStandalone()
3925 self._GoReconnect(False)
3926 self._WaitUntilSync()
3927 except errors.OpExecError, err:
3928 self.LogWarning("Migration failed and I can't reconnect the"
3929 " drives: error '%s'\n"
3930 "Please look and recover the instance status" %
3933 def _AbortMigration(self):
3934 """Call the hypervisor code to abort a started migration.
3937 instance = self.instance
3938 target_node = self.target_node
3939 migration_info = self.migration_info
3941 abort_result = self.rpc.call_finalize_migration(target_node,
3945 abort_msg = abort_result.RemoteFailMsg()
3947 logging.error("Aborting migration failed on target node %s: %s" %
3948 (target_node, abort_msg))
3949 # Don't raise an exception here, as we stil have to try to revert the
3950 # disk status, even if this step failed.
3952 def _ExecMigration(self):
3953 """Migrate an instance.
3955 The migrate is done by:
3956 - change the disks into dual-master mode
3957 - wait until disks are fully synchronized again
3958 - migrate the instance
3959 - change disks on the new secondary node (the old primary) to secondary
3960 - wait until disks are fully synchronized
3961 - change disks into single-master mode
3964 instance = self.instance
3965 target_node = self.target_node
3966 source_node = self.source_node
3968 self.feedback_fn("* checking disk consistency between source and target")
3969 for dev in instance.disks:
3970 if not _CheckDiskConsistency(self, dev, target_node, False):
3971 raise errors.OpExecError("Disk %s is degraded or not fully"
3972 " synchronized on target node,"
3973 " aborting migrate." % dev.iv_name)
3975 # First get the migration information from the remote node
3976 result = self.rpc.call_migration_info(source_node, instance)
3977 msg = result.RemoteFailMsg()
3979 log_err = ("Failed fetching source migration information from %s: %s" %
3981 logging.error(log_err)
3982 raise errors.OpExecError(log_err)
3984 self.migration_info = migration_info = result.payload
3986 # Then switch the disks to master/master mode
3987 self._EnsureSecondary(target_node)
3988 self._GoStandalone()
3989 self._GoReconnect(True)
3990 self._WaitUntilSync()
3992 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3993 result = self.rpc.call_accept_instance(target_node,
3996 self.nodes_ip[target_node])
3998 msg = result.RemoteFailMsg()
4000 logging.error("Instance pre-migration failed, trying to revert"
4001 " disk status: %s", msg)
4002 self._AbortMigration()
4003 self._RevertDiskStatus()
4004 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4005 (instance.name, msg))
4007 self.feedback_fn("* migrating instance to %s" % target_node)
4009 result = self.rpc.call_instance_migrate(source_node, instance,
4010 self.nodes_ip[target_node],
4012 msg = result.RemoteFailMsg()
4014 logging.error("Instance migration failed, trying to revert"
4015 " disk status: %s", msg)
4016 self._AbortMigration()
4017 self._RevertDiskStatus()
4018 raise errors.OpExecError("Could not migrate instance %s: %s" %
4019 (instance.name, msg))
4022 instance.primary_node = target_node
4023 # distribute new instance config to the other nodes
4024 self.cfg.Update(instance)
4026 result = self.rpc.call_finalize_migration(target_node,
4030 msg = result.RemoteFailMsg()
4032 logging.error("Instance migration succeeded, but finalization failed:"
4034 raise errors.OpExecError("Could not finalize instance migration: %s" %
4037 self._EnsureSecondary(source_node)
4038 self._WaitUntilSync()
4039 self._GoStandalone()
4040 self._GoReconnect(False)
4041 self._WaitUntilSync()
4043 self.feedback_fn("* done")
4045 def Exec(self, feedback_fn):
4046 """Perform the migration.
4049 self.feedback_fn = feedback_fn
4051 self.source_node = self.instance.primary_node
4052 self.target_node = self.instance.secondary_nodes[0]
4053 self.all_nodes = [self.source_node, self.target_node]
4055 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4056 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4059 return self._ExecCleanup()
4061 return self._ExecMigration()
4064 def _CreateBlockDev(lu, node, instance, device, force_create,
4066 """Create a tree of block devices on a given node.
4068 If this device type has to be created on secondaries, create it and
4071 If not, just recurse to children keeping the same 'force' value.
4073 @param lu: the lu on whose behalf we execute
4074 @param node: the node on which to create the device
4075 @type instance: L{objects.Instance}
4076 @param instance: the instance which owns the device
4077 @type device: L{objects.Disk}
4078 @param device: the device to create
4079 @type force_create: boolean
4080 @param force_create: whether to force creation of this device; this
4081 will be change to True whenever we find a device which has
4082 CreateOnSecondary() attribute
4083 @param info: the extra 'metadata' we should attach to the device
4084 (this will be represented as a LVM tag)
4085 @type force_open: boolean
4086 @param force_open: this parameter will be passes to the
4087 L{backend.BlockdevCreate} function where it specifies
4088 whether we run on primary or not, and it affects both
4089 the child assembly and the device own Open() execution
4092 if device.CreateOnSecondary():
4096 for child in device.children:
4097 _CreateBlockDev(lu, node, instance, child, force_create,
4100 if not force_create:
4103 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4106 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4107 """Create a single block device on a given node.
4109 This will not recurse over children of the device, so they must be
4112 @param lu: the lu on whose behalf we execute
4113 @param node: the node on which to create the device
4114 @type instance: L{objects.Instance}
4115 @param instance: the instance which owns the device
4116 @type device: L{objects.Disk}
4117 @param device: the device to create
4118 @param info: the extra 'metadata' we should attach to the device
4119 (this will be represented as a LVM tag)
4120 @type force_open: boolean
4121 @param force_open: this parameter will be passes to the
4122 L{backend.BlockdevCreate} function where it specifies
4123 whether we run on primary or not, and it affects both
4124 the child assembly and the device own Open() execution
4127 lu.cfg.SetDiskID(device, node)
4128 result = lu.rpc.call_blockdev_create(node, device, device.size,
4129 instance.name, force_open, info)
4130 msg = result.RemoteFailMsg()
4132 raise errors.OpExecError("Can't create block device %s on"
4133 " node %s for instance %s: %s" %
4134 (device, node, instance.name, msg))
4135 if device.physical_id is None:
4136 device.physical_id = result.payload
4139 def _GenerateUniqueNames(lu, exts):
4140 """Generate a suitable LV name.
4142 This will generate a logical volume name for the given instance.
4147 new_id = lu.cfg.GenerateUniqueID()
4148 results.append("%s%s" % (new_id, val))
4152 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4154 """Generate a drbd8 device complete with its children.
4157 port = lu.cfg.AllocatePort()
4158 vgname = lu.cfg.GetVGName()
4159 shared_secret = lu.cfg.GenerateDRBDSecret()
4160 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4161 logical_id=(vgname, names[0]))
4162 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4163 logical_id=(vgname, names[1]))
4164 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4165 logical_id=(primary, secondary, port,
4168 children=[dev_data, dev_meta],
4173 def _GenerateDiskTemplate(lu, template_name,
4174 instance_name, primary_node,
4175 secondary_nodes, disk_info,
4176 file_storage_dir, file_driver,
4178 """Generate the entire disk layout for a given template type.
4181 #TODO: compute space requirements
4183 vgname = lu.cfg.GetVGName()
4184 disk_count = len(disk_info)
4186 if template_name == constants.DT_DISKLESS:
4188 elif template_name == constants.DT_PLAIN:
4189 if len(secondary_nodes) != 0:
4190 raise errors.ProgrammerError("Wrong template configuration")
4192 names = _GenerateUniqueNames(lu, [".disk%d" % i
4193 for i in range(disk_count)])
4194 for idx, disk in enumerate(disk_info):
4195 disk_index = idx + base_index
4196 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4197 logical_id=(vgname, names[idx]),
4198 iv_name="disk/%d" % disk_index,
4200 disks.append(disk_dev)
4201 elif template_name == constants.DT_DRBD8:
4202 if len(secondary_nodes) != 1:
4203 raise errors.ProgrammerError("Wrong template configuration")
4204 remote_node = secondary_nodes[0]
4205 minors = lu.cfg.AllocateDRBDMinor(
4206 [primary_node, remote_node] * len(disk_info), instance_name)
4209 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4210 for i in range(disk_count)]):
4211 names.append(lv_prefix + "_data")
4212 names.append(lv_prefix + "_meta")
4213 for idx, disk in enumerate(disk_info):
4214 disk_index = idx + base_index
4215 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4216 disk["size"], names[idx*2:idx*2+2],
4217 "disk/%d" % disk_index,
4218 minors[idx*2], minors[idx*2+1])
4219 disk_dev.mode = disk["mode"]
4220 disks.append(disk_dev)
4221 elif template_name == constants.DT_FILE:
4222 if len(secondary_nodes) != 0:
4223 raise errors.ProgrammerError("Wrong template configuration")
4225 for idx, disk in enumerate(disk_info):
4226 disk_index = idx + base_index
4227 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4228 iv_name="disk/%d" % disk_index,
4229 logical_id=(file_driver,
4230 "%s/disk%d" % (file_storage_dir,
4233 disks.append(disk_dev)
4235 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4239 def _GetInstanceInfoText(instance):
4240 """Compute that text that should be added to the disk's metadata.
4243 return "originstname+%s" % instance.name
4246 def _CreateDisks(lu, instance):
4247 """Create all disks for an instance.
4249 This abstracts away some work from AddInstance.
4251 @type lu: L{LogicalUnit}
4252 @param lu: the logical unit on whose behalf we execute
4253 @type instance: L{objects.Instance}
4254 @param instance: the instance whose disks we should create
4256 @return: the success of the creation
4259 info = _GetInstanceInfoText(instance)
4260 pnode = instance.primary_node
4262 if instance.disk_template == constants.DT_FILE:
4263 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4264 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4266 if result.failed or not result.data:
4267 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4269 if not result.data[0]:
4270 raise errors.OpExecError("Failed to create directory '%s'" %
4273 # Note: this needs to be kept in sync with adding of disks in
4274 # LUSetInstanceParams
4275 for device in instance.disks:
4276 logging.info("Creating volume %s for instance %s",
4277 device.iv_name, instance.name)
4279 for node in instance.all_nodes:
4280 f_create = node == pnode
4281 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4284 def _RemoveDisks(lu, instance):
4285 """Remove all disks for an instance.
4287 This abstracts away some work from `AddInstance()` and
4288 `RemoveInstance()`. Note that in case some of the devices couldn't
4289 be removed, the removal will continue with the other ones (compare
4290 with `_CreateDisks()`).
4292 @type lu: L{LogicalUnit}
4293 @param lu: the logical unit on whose behalf we execute
4294 @type instance: L{objects.Instance}
4295 @param instance: the instance whose disks we should remove
4297 @return: the success of the removal
4300 logging.info("Removing block devices for instance %s", instance.name)
4303 for device in instance.disks:
4304 for node, disk in device.ComputeNodeTree(instance.primary_node):
4305 lu.cfg.SetDiskID(disk, node)
4306 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4308 lu.LogWarning("Could not remove block device %s on node %s,"
4309 " continuing anyway: %s", device.iv_name, node, msg)
4312 if instance.disk_template == constants.DT_FILE:
4313 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4314 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4316 if result.failed or not result.data:
4317 logging.error("Could not remove directory '%s'", file_storage_dir)
4323 def _ComputeDiskSize(disk_template, disks):
4324 """Compute disk size requirements in the volume group
4327 # Required free disk space as a function of disk and swap space
4329 constants.DT_DISKLESS: None,
4330 constants.DT_PLAIN: sum(d["size"] for d in disks),
4331 # 128 MB are added for drbd metadata for each disk
4332 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4333 constants.DT_FILE: None,
4336 if disk_template not in req_size_dict:
4337 raise errors.ProgrammerError("Disk template '%s' size requirement"
4338 " is unknown" % disk_template)
4340 return req_size_dict[disk_template]
4343 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4344 """Hypervisor parameter validation.
4346 This function abstract the hypervisor parameter validation to be
4347 used in both instance create and instance modify.
4349 @type lu: L{LogicalUnit}
4350 @param lu: the logical unit for which we check
4351 @type nodenames: list
4352 @param nodenames: the list of nodes on which we should check
4353 @type hvname: string
4354 @param hvname: the name of the hypervisor we should use
4355 @type hvparams: dict
4356 @param hvparams: the parameters which we need to check
4357 @raise errors.OpPrereqError: if the parameters are not valid
4360 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4363 for node in nodenames:
4367 msg = info.RemoteFailMsg()
4369 raise errors.OpPrereqError("Hypervisor parameter validation"
4370 " failed on node %s: %s" % (node, msg))
4373 class LUCreateInstance(LogicalUnit):
4374 """Create an instance.
4377 HPATH = "instance-add"
4378 HTYPE = constants.HTYPE_INSTANCE
4379 _OP_REQP = ["instance_name", "disks", "disk_template",
4381 "wait_for_sync", "ip_check", "nics",
4382 "hvparams", "beparams"]
4385 def _ExpandNode(self, node):
4386 """Expands and checks one node name.
4389 node_full = self.cfg.ExpandNodeName(node)
4390 if node_full is None:
4391 raise errors.OpPrereqError("Unknown node %s" % node)
4394 def ExpandNames(self):
4395 """ExpandNames for CreateInstance.
4397 Figure out the right locks for instance creation.
4400 self.needed_locks = {}
4402 # set optional parameters to none if they don't exist
4403 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4404 if not hasattr(self.op, attr):
4405 setattr(self.op, attr, None)
4407 # cheap checks, mostly valid constants given
4409 # verify creation mode
4410 if self.op.mode not in (constants.INSTANCE_CREATE,
4411 constants.INSTANCE_IMPORT):
4412 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4415 # disk template and mirror node verification
4416 if self.op.disk_template not in constants.DISK_TEMPLATES:
4417 raise errors.OpPrereqError("Invalid disk template name")
4419 if self.op.hypervisor is None:
4420 self.op.hypervisor = self.cfg.GetHypervisorType()
4422 cluster = self.cfg.GetClusterInfo()
4423 enabled_hvs = cluster.enabled_hypervisors
4424 if self.op.hypervisor not in enabled_hvs:
4425 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4426 " cluster (%s)" % (self.op.hypervisor,
4427 ",".join(enabled_hvs)))
4429 # check hypervisor parameter syntax (locally)
4430 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4431 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4433 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4434 hv_type.CheckParameterSyntax(filled_hvp)
4435 self.hv_full = filled_hvp
4437 # fill and remember the beparams dict
4438 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4439 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4442 #### instance parameters check
4444 # instance name verification
4445 hostname1 = utils.HostInfo(self.op.instance_name)
4446 self.op.instance_name = instance_name = hostname1.name
4448 # this is just a preventive check, but someone might still add this
4449 # instance in the meantime, and creation will fail at lock-add time
4450 if instance_name in self.cfg.GetInstanceList():
4451 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4454 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4458 for nic in self.op.nics:
4459 # ip validity checks
4460 ip = nic.get("ip", None)
4461 if ip is None or ip.lower() == "none":
4463 elif ip.lower() == constants.VALUE_AUTO:
4464 nic_ip = hostname1.ip
4466 if not utils.IsValidIP(ip):
4467 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4468 " like a valid IP" % ip)
4471 # MAC address verification
4472 mac = nic.get("mac", constants.VALUE_AUTO)
4473 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4474 if not utils.IsValidMac(mac.lower()):
4475 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4477 # bridge verification
4478 bridge = nic.get("bridge", None)
4480 bridge = self.cfg.GetDefBridge()
4481 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4483 # disk checks/pre-build
4485 for disk in self.op.disks:
4486 mode = disk.get("mode", constants.DISK_RDWR)
4487 if mode not in constants.DISK_ACCESS_SET:
4488 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4490 size = disk.get("size", None)
4492 raise errors.OpPrereqError("Missing disk size")
4496 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4497 self.disks.append({"size": size, "mode": mode})
4499 # used in CheckPrereq for ip ping check
4500 self.check_ip = hostname1.ip
4502 # file storage checks
4503 if (self.op.file_driver and
4504 not self.op.file_driver in constants.FILE_DRIVER):
4505 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4506 self.op.file_driver)
4508 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4509 raise errors.OpPrereqError("File storage directory path not absolute")
4511 ### Node/iallocator related checks
4512 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4513 raise errors.OpPrereqError("One and only one of iallocator and primary"
4514 " node must be given")
4516 if self.op.iallocator:
4517 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4519 self.op.pnode = self._ExpandNode(self.op.pnode)
4520 nodelist = [self.op.pnode]
4521 if self.op.snode is not None:
4522 self.op.snode = self._ExpandNode(self.op.snode)
4523 nodelist.append(self.op.snode)
4524 self.needed_locks[locking.LEVEL_NODE] = nodelist
4526 # in case of import lock the source node too
4527 if self.op.mode == constants.INSTANCE_IMPORT:
4528 src_node = getattr(self.op, "src_node", None)
4529 src_path = getattr(self.op, "src_path", None)
4531 if src_path is None:
4532 self.op.src_path = src_path = self.op.instance_name
4534 if src_node is None:
4535 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4536 self.op.src_node = None
4537 if os.path.isabs(src_path):
4538 raise errors.OpPrereqError("Importing an instance from an absolute"
4539 " path requires a source node option.")
4541 self.op.src_node = src_node = self._ExpandNode(src_node)
4542 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4543 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4544 if not os.path.isabs(src_path):
4545 self.op.src_path = src_path = \
4546 os.path.join(constants.EXPORT_DIR, src_path)
4548 else: # INSTANCE_CREATE
4549 if getattr(self.op, "os_type", None) is None:
4550 raise errors.OpPrereqError("No guest OS specified")
4552 def _RunAllocator(self):
4553 """Run the allocator based on input opcode.
4556 nics = [n.ToDict() for n in self.nics]
4557 ial = IAllocator(self,
4558 mode=constants.IALLOCATOR_MODE_ALLOC,
4559 name=self.op.instance_name,
4560 disk_template=self.op.disk_template,
4563 vcpus=self.be_full[constants.BE_VCPUS],
4564 mem_size=self.be_full[constants.BE_MEMORY],
4567 hypervisor=self.op.hypervisor,
4570 ial.Run(self.op.iallocator)
4573 raise errors.OpPrereqError("Can't compute nodes using"
4574 " iallocator '%s': %s" % (self.op.iallocator,
4576 if len(ial.nodes) != ial.required_nodes:
4577 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4578 " of nodes (%s), required %s" %
4579 (self.op.iallocator, len(ial.nodes),
4580 ial.required_nodes))
4581 self.op.pnode = ial.nodes[0]
4582 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4583 self.op.instance_name, self.op.iallocator,
4584 ", ".join(ial.nodes))
4585 if ial.required_nodes == 2:
4586 self.op.snode = ial.nodes[1]
4588 def BuildHooksEnv(self):
4591 This runs on master, primary and secondary nodes of the instance.
4595 "ADD_MODE": self.op.mode,
4597 if self.op.mode == constants.INSTANCE_IMPORT:
4598 env["SRC_NODE"] = self.op.src_node
4599 env["SRC_PATH"] = self.op.src_path
4600 env["SRC_IMAGES"] = self.src_images
4602 env.update(_BuildInstanceHookEnv(
4603 name=self.op.instance_name,
4604 primary_node=self.op.pnode,
4605 secondary_nodes=self.secondaries,
4606 status=self.op.start,
4607 os_type=self.op.os_type,
4608 memory=self.be_full[constants.BE_MEMORY],
4609 vcpus=self.be_full[constants.BE_VCPUS],
4610 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4611 disk_template=self.op.disk_template,
4612 disks=[(d["size"], d["mode"]) for d in self.disks],
4615 hypervisor=self.op.hypervisor,
4618 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4623 def CheckPrereq(self):
4624 """Check prerequisites.
4627 if (not self.cfg.GetVGName() and
4628 self.op.disk_template not in constants.DTS_NOT_LVM):
4629 raise errors.OpPrereqError("Cluster does not support lvm-based"
4632 if self.op.mode == constants.INSTANCE_IMPORT:
4633 src_node = self.op.src_node
4634 src_path = self.op.src_path
4636 if src_node is None:
4637 exp_list = self.rpc.call_export_list(
4638 self.acquired_locks[locking.LEVEL_NODE])
4640 for node in exp_list:
4641 if not exp_list[node].failed and src_path in exp_list[node].data:
4643 self.op.src_node = src_node = node
4644 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4648 raise errors.OpPrereqError("No export found for relative path %s" %
4651 _CheckNodeOnline(self, src_node)
4652 result = self.rpc.call_export_info(src_node, src_path)
4655 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4657 export_info = result.data
4658 if not export_info.has_section(constants.INISECT_EXP):
4659 raise errors.ProgrammerError("Corrupted export config")
4661 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4662 if (int(ei_version) != constants.EXPORT_VERSION):
4663 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4664 (ei_version, constants.EXPORT_VERSION))
4666 # Check that the new instance doesn't have less disks than the export
4667 instance_disks = len(self.disks)
4668 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4669 if instance_disks < export_disks:
4670 raise errors.OpPrereqError("Not enough disks to import."
4671 " (instance: %d, export: %d)" %
4672 (instance_disks, export_disks))
4674 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4676 for idx in range(export_disks):
4677 option = 'disk%d_dump' % idx
4678 if export_info.has_option(constants.INISECT_INS, option):
4679 # FIXME: are the old os-es, disk sizes, etc. useful?
4680 export_name = export_info.get(constants.INISECT_INS, option)
4681 image = os.path.join(src_path, export_name)
4682 disk_images.append(image)
4684 disk_images.append(False)
4686 self.src_images = disk_images
4688 old_name = export_info.get(constants.INISECT_INS, 'name')
4689 # FIXME: int() here could throw a ValueError on broken exports
4690 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4691 if self.op.instance_name == old_name:
4692 for idx, nic in enumerate(self.nics):
4693 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4694 nic_mac_ini = 'nic%d_mac' % idx
4695 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4697 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4698 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4699 if self.op.start and not self.op.ip_check:
4700 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4701 " adding an instance in start mode")
4703 if self.op.ip_check:
4704 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4705 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4706 (self.check_ip, self.op.instance_name))
4708 #### mac address generation
4709 # By generating here the mac address both the allocator and the hooks get
4710 # the real final mac address rather than the 'auto' or 'generate' value.
4711 # There is a race condition between the generation and the instance object
4712 # creation, which means that we know the mac is valid now, but we're not
4713 # sure it will be when we actually add the instance. If things go bad
4714 # adding the instance will abort because of a duplicate mac, and the
4715 # creation job will fail.
4716 for nic in self.nics:
4717 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4718 nic.mac = self.cfg.GenerateMAC()
4722 if self.op.iallocator is not None:
4723 self._RunAllocator()
4725 #### node related checks
4727 # check primary node
4728 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4729 assert self.pnode is not None, \
4730 "Cannot retrieve locked node %s" % self.op.pnode
4732 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4735 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4738 self.secondaries = []
4740 # mirror node verification
4741 if self.op.disk_template in constants.DTS_NET_MIRROR:
4742 if self.op.snode is None:
4743 raise errors.OpPrereqError("The networked disk templates need"
4745 if self.op.snode == pnode.name:
4746 raise errors.OpPrereqError("The secondary node cannot be"
4747 " the primary node.")
4748 _CheckNodeOnline(self, self.op.snode)
4749 _CheckNodeNotDrained(self, self.op.snode)
4750 self.secondaries.append(self.op.snode)
4752 nodenames = [pnode.name] + self.secondaries
4754 req_size = _ComputeDiskSize(self.op.disk_template,
4757 # Check lv size requirements
4758 if req_size is not None:
4759 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4761 for node in nodenames:
4762 info = nodeinfo[node]
4766 raise errors.OpPrereqError("Cannot get current information"
4767 " from node '%s'" % node)
4768 vg_free = info.get('vg_free', None)
4769 if not isinstance(vg_free, int):
4770 raise errors.OpPrereqError("Can't compute free disk space on"
4772 if req_size > info['vg_free']:
4773 raise errors.OpPrereqError("Not enough disk space on target node %s."
4774 " %d MB available, %d MB required" %
4775 (node, info['vg_free'], req_size))
4777 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4780 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4782 if not isinstance(result.data, objects.OS) or not result.data:
4783 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4784 " primary node" % self.op.os_type)
4786 # bridge check on primary node
4787 bridges = [n.bridge for n in self.nics]
4788 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4791 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4792 " exist on destination node '%s'" %
4793 (",".join(bridges), pnode.name))
4795 # memory check on primary node
4797 _CheckNodeFreeMemory(self, self.pnode.name,
4798 "creating instance %s" % self.op.instance_name,
4799 self.be_full[constants.BE_MEMORY],
4802 def Exec(self, feedback_fn):
4803 """Create and add the instance to the cluster.
4806 instance = self.op.instance_name
4807 pnode_name = self.pnode.name
4809 ht_kind = self.op.hypervisor
4810 if ht_kind in constants.HTS_REQ_PORT:
4811 network_port = self.cfg.AllocatePort()
4815 ##if self.op.vnc_bind_address is None:
4816 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4818 # this is needed because os.path.join does not accept None arguments
4819 if self.op.file_storage_dir is None:
4820 string_file_storage_dir = ""
4822 string_file_storage_dir = self.op.file_storage_dir
4824 # build the full file storage dir path
4825 file_storage_dir = os.path.normpath(os.path.join(
4826 self.cfg.GetFileStorageDir(),
4827 string_file_storage_dir, instance))
4830 disks = _GenerateDiskTemplate(self,
4831 self.op.disk_template,
4832 instance, pnode_name,
4836 self.op.file_driver,
4839 iobj = objects.Instance(name=instance, os=self.op.os_type,
4840 primary_node=pnode_name,
4841 nics=self.nics, disks=disks,
4842 disk_template=self.op.disk_template,
4844 network_port=network_port,
4845 beparams=self.op.beparams,
4846 hvparams=self.op.hvparams,
4847 hypervisor=self.op.hypervisor,
4850 feedback_fn("* creating instance disks...")
4852 _CreateDisks(self, iobj)
4853 except errors.OpExecError:
4854 self.LogWarning("Device creation failed, reverting...")
4856 _RemoveDisks(self, iobj)
4858 self.cfg.ReleaseDRBDMinors(instance)
4861 feedback_fn("adding instance %s to cluster config" % instance)
4863 self.cfg.AddInstance(iobj)
4864 # Declare that we don't want to remove the instance lock anymore, as we've
4865 # added the instance to the config
4866 del self.remove_locks[locking.LEVEL_INSTANCE]
4867 # Unlock all the nodes
4868 if self.op.mode == constants.INSTANCE_IMPORT:
4869 nodes_keep = [self.op.src_node]
4870 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4871 if node != self.op.src_node]
4872 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4873 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4875 self.context.glm.release(locking.LEVEL_NODE)
4876 del self.acquired_locks[locking.LEVEL_NODE]
4878 if self.op.wait_for_sync:
4879 disk_abort = not _WaitForSync(self, iobj)
4880 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4881 # make sure the disks are not degraded (still sync-ing is ok)
4883 feedback_fn("* checking mirrors status")
4884 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4889 _RemoveDisks(self, iobj)
4890 self.cfg.RemoveInstance(iobj.name)
4891 # Make sure the instance lock gets removed
4892 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4893 raise errors.OpExecError("There are some degraded disks for"
4896 feedback_fn("creating os for instance %s on node %s" %
4897 (instance, pnode_name))
4899 if iobj.disk_template != constants.DT_DISKLESS:
4900 if self.op.mode == constants.INSTANCE_CREATE:
4901 feedback_fn("* running the instance OS create scripts...")
4902 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4903 msg = result.RemoteFailMsg()
4905 raise errors.OpExecError("Could not add os for instance %s"
4907 (instance, pnode_name, msg))
4909 elif self.op.mode == constants.INSTANCE_IMPORT:
4910 feedback_fn("* running the instance OS import scripts...")
4911 src_node = self.op.src_node
4912 src_images = self.src_images
4913 cluster_name = self.cfg.GetClusterName()
4914 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4915 src_node, src_images,
4917 import_result.Raise()
4918 for idx, result in enumerate(import_result.data):
4920 self.LogWarning("Could not import the image %s for instance"
4921 " %s, disk %d, on node %s" %
4922 (src_images[idx], instance, idx, pnode_name))
4924 # also checked in the prereq part
4925 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4929 iobj.admin_up = True
4930 self.cfg.Update(iobj)
4931 logging.info("Starting instance %s on node %s", instance, pnode_name)
4932 feedback_fn("* starting instance...")
4933 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4934 msg = result.RemoteFailMsg()
4936 raise errors.OpExecError("Could not start instance: %s" % msg)
4939 class LUConnectConsole(NoHooksLU):
4940 """Connect to an instance's console.
4942 This is somewhat special in that it returns the command line that
4943 you need to run on the master node in order to connect to the
4947 _OP_REQP = ["instance_name"]
4950 def ExpandNames(self):
4951 self._ExpandAndLockInstance()
4953 def CheckPrereq(self):
4954 """Check prerequisites.
4956 This checks that the instance is in the cluster.
4959 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4960 assert self.instance is not None, \
4961 "Cannot retrieve locked instance %s" % self.op.instance_name
4962 _CheckNodeOnline(self, self.instance.primary_node)
4964 def Exec(self, feedback_fn):
4965 """Connect to the console of an instance
4968 instance = self.instance
4969 node = instance.primary_node
4971 node_insts = self.rpc.call_instance_list([node],
4972 [instance.hypervisor])[node]
4975 if instance.name not in node_insts.data:
4976 raise errors.OpExecError("Instance %s is not running." % instance.name)
4978 logging.debug("Connecting to console of %s on %s", instance.name, node)
4980 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4981 cluster = self.cfg.GetClusterInfo()
4982 # beparams and hvparams are passed separately, to avoid editing the
4983 # instance and then saving the defaults in the instance itself.
4984 hvparams = cluster.FillHV(instance)
4985 beparams = cluster.FillBE(instance)
4986 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4989 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4992 class LUReplaceDisks(LogicalUnit):
4993 """Replace the disks of an instance.
4996 HPATH = "mirrors-replace"
4997 HTYPE = constants.HTYPE_INSTANCE
4998 _OP_REQP = ["instance_name", "mode", "disks"]
5001 def CheckArguments(self):
5002 if not hasattr(self.op, "remote_node"):
5003 self.op.remote_node = None
5004 if not hasattr(self.op, "iallocator"):
5005 self.op.iallocator = None
5007 # check for valid parameter combination
5008 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5009 if self.op.mode == constants.REPLACE_DISK_CHG:
5011 raise errors.OpPrereqError("When changing the secondary either an"
5012 " iallocator script must be used or the"
5015 raise errors.OpPrereqError("Give either the iallocator or the new"
5016 " secondary, not both")
5017 else: # not replacing the secondary
5019 raise errors.OpPrereqError("The iallocator and new node options can"
5020 " be used only when changing the"
5023 def ExpandNames(self):
5024 self._ExpandAndLockInstance()
5026 if self.op.iallocator is not None:
5027 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5028 elif self.op.remote_node is not None:
5029 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5030 if remote_node is None:
5031 raise errors.OpPrereqError("Node '%s' not known" %
5032 self.op.remote_node)
5033 self.op.remote_node = remote_node
5034 # Warning: do not remove the locking of the new secondary here
5035 # unless DRBD8.AddChildren is changed to work in parallel;
5036 # currently it doesn't since parallel invocations of
5037 # FindUnusedMinor will conflict
5038 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5039 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5041 self.needed_locks[locking.LEVEL_NODE] = []
5042 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5044 def DeclareLocks(self, level):
5045 # If we're not already locking all nodes in the set we have to declare the
5046 # instance's primary/secondary nodes.
5047 if (level == locking.LEVEL_NODE and
5048 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5049 self._LockInstancesNodes()
5051 def _RunAllocator(self):
5052 """Compute a new secondary node using an IAllocator.
5055 ial = IAllocator(self,
5056 mode=constants.IALLOCATOR_MODE_RELOC,
5057 name=self.op.instance_name,
5058 relocate_from=[self.sec_node])
5060 ial.Run(self.op.iallocator)
5063 raise errors.OpPrereqError("Can't compute nodes using"
5064 " iallocator '%s': %s" % (self.op.iallocator,
5066 if len(ial.nodes) != ial.required_nodes:
5067 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5068 " of nodes (%s), required %s" %
5069 (len(ial.nodes), ial.required_nodes))
5070 self.op.remote_node = ial.nodes[0]
5071 self.LogInfo("Selected new secondary for the instance: %s",
5072 self.op.remote_node)
5074 def BuildHooksEnv(self):
5077 This runs on the master, the primary and all the secondaries.
5081 "MODE": self.op.mode,
5082 "NEW_SECONDARY": self.op.remote_node,
5083 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5085 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5087 self.cfg.GetMasterNode(),
5088 self.instance.primary_node,
5090 if self.op.remote_node is not None:
5091 nl.append(self.op.remote_node)
5094 def CheckPrereq(self):
5095 """Check prerequisites.
5097 This checks that the instance is in the cluster.
5100 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5101 assert instance is not None, \
5102 "Cannot retrieve locked instance %s" % self.op.instance_name
5103 self.instance = instance
5105 if instance.disk_template != constants.DT_DRBD8:
5106 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5109 if len(instance.secondary_nodes) != 1:
5110 raise errors.OpPrereqError("The instance has a strange layout,"
5111 " expected one secondary but found %d" %
5112 len(instance.secondary_nodes))
5114 self.sec_node = instance.secondary_nodes[0]
5116 if self.op.iallocator is not None:
5117 self._RunAllocator()
5119 remote_node = self.op.remote_node
5120 if remote_node is not None:
5121 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5122 assert self.remote_node_info is not None, \
5123 "Cannot retrieve locked node %s" % remote_node
5125 self.remote_node_info = None
5126 if remote_node == instance.primary_node:
5127 raise errors.OpPrereqError("The specified node is the primary node of"
5129 elif remote_node == self.sec_node:
5130 raise errors.OpPrereqError("The specified node is already the"
5131 " secondary node of the instance.")
5133 if self.op.mode == constants.REPLACE_DISK_PRI:
5134 n1 = self.tgt_node = instance.primary_node
5135 n2 = self.oth_node = self.sec_node
5136 elif self.op.mode == constants.REPLACE_DISK_SEC:
5137 n1 = self.tgt_node = self.sec_node
5138 n2 = self.oth_node = instance.primary_node
5139 elif self.op.mode == constants.REPLACE_DISK_CHG:
5140 n1 = self.new_node = remote_node
5141 n2 = self.oth_node = instance.primary_node
5142 self.tgt_node = self.sec_node
5143 _CheckNodeNotDrained(self, remote_node)
5145 raise errors.ProgrammerError("Unhandled disk replace mode")
5147 _CheckNodeOnline(self, n1)
5148 _CheckNodeOnline(self, n2)
5150 if not self.op.disks:
5151 self.op.disks = range(len(instance.disks))
5153 for disk_idx in self.op.disks:
5154 instance.FindDisk(disk_idx)
5156 def _ExecD8DiskOnly(self, feedback_fn):
5157 """Replace a disk on the primary or secondary for dbrd8.
5159 The algorithm for replace is quite complicated:
5161 1. for each disk to be replaced:
5163 1. create new LVs on the target node with unique names
5164 1. detach old LVs from the drbd device
5165 1. rename old LVs to name_replaced.<time_t>
5166 1. rename new LVs to old LVs
5167 1. attach the new LVs (with the old names now) to the drbd device
5169 1. wait for sync across all devices
5171 1. for each modified disk:
5173 1. remove old LVs (which have the name name_replaces.<time_t>)
5175 Failures are not very well handled.
5179 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5180 instance = self.instance
5182 vgname = self.cfg.GetVGName()
5185 tgt_node = self.tgt_node
5186 oth_node = self.oth_node
5188 # Step: check device activation
5189 self.proc.LogStep(1, steps_total, "check device existence")
5190 info("checking volume groups")
5191 my_vg = cfg.GetVGName()
5192 results = self.rpc.call_vg_list([oth_node, tgt_node])
5194 raise errors.OpExecError("Can't list volume groups on the nodes")
5195 for node in oth_node, tgt_node:
5197 if res.failed or not res.data or my_vg not in res.data:
5198 raise errors.OpExecError("Volume group '%s' not found on %s" %
5200 for idx, dev in enumerate(instance.disks):
5201 if idx not in self.op.disks:
5203 for node in tgt_node, oth_node:
5204 info("checking disk/%d on %s" % (idx, node))
5205 cfg.SetDiskID(dev, node)
5206 result = self.rpc.call_blockdev_find(node, dev)
5207 msg = result.RemoteFailMsg()
5208 if not msg and not result.payload:
5209 msg = "disk not found"
5211 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5214 # Step: check other node consistency
5215 self.proc.LogStep(2, steps_total, "check peer consistency")
5216 for idx, dev in enumerate(instance.disks):
5217 if idx not in self.op.disks:
5219 info("checking disk/%d consistency on %s" % (idx, oth_node))
5220 if not _CheckDiskConsistency(self, dev, oth_node,
5221 oth_node==instance.primary_node):
5222 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5223 " to replace disks on this node (%s)" %
5224 (oth_node, tgt_node))
5226 # Step: create new storage
5227 self.proc.LogStep(3, steps_total, "allocate new storage")
5228 for idx, dev in enumerate(instance.disks):
5229 if idx not in self.op.disks:
5232 cfg.SetDiskID(dev, tgt_node)
5233 lv_names = [".disk%d_%s" % (idx, suf)
5234 for suf in ["data", "meta"]]
5235 names = _GenerateUniqueNames(self, lv_names)
5236 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5237 logical_id=(vgname, names[0]))
5238 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5239 logical_id=(vgname, names[1]))
5240 new_lvs = [lv_data, lv_meta]
5241 old_lvs = dev.children
5242 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5243 info("creating new local storage on %s for %s" %
5244 (tgt_node, dev.iv_name))
5245 # we pass force_create=True to force the LVM creation
5246 for new_lv in new_lvs:
5247 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5248 _GetInstanceInfoText(instance), False)
5250 # Step: for each lv, detach+rename*2+attach
5251 self.proc.LogStep(4, steps_total, "change drbd configuration")
5252 for dev, old_lvs, new_lvs in iv_names.itervalues():
5253 info("detaching %s drbd from local storage" % dev.iv_name)
5254 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5257 raise errors.OpExecError("Can't detach drbd from local storage on node"
5258 " %s for device %s" % (tgt_node, dev.iv_name))
5260 #cfg.Update(instance)
5262 # ok, we created the new LVs, so now we know we have the needed
5263 # storage; as such, we proceed on the target node to rename
5264 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5265 # using the assumption that logical_id == physical_id (which in
5266 # turn is the unique_id on that node)
5268 # FIXME(iustin): use a better name for the replaced LVs
5269 temp_suffix = int(time.time())
5270 ren_fn = lambda d, suff: (d.physical_id[0],
5271 d.physical_id[1] + "_replaced-%s" % suff)
5272 # build the rename list based on what LVs exist on the node
5274 for to_ren in old_lvs:
5275 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5276 if not result.RemoteFailMsg() and result.payload:
5278 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5280 info("renaming the old LVs on the target node")
5281 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5284 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5285 # now we rename the new LVs to the old LVs
5286 info("renaming the new LVs on the target node")
5287 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5288 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5291 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5293 for old, new in zip(old_lvs, new_lvs):
5294 new.logical_id = old.logical_id
5295 cfg.SetDiskID(new, tgt_node)
5297 for disk in old_lvs:
5298 disk.logical_id = ren_fn(disk, temp_suffix)
5299 cfg.SetDiskID(disk, tgt_node)
5301 # now that the new lvs have the old name, we can add them to the device
5302 info("adding new mirror component on %s" % tgt_node)
5303 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5304 if result.failed or not result.data:
5305 for new_lv in new_lvs:
5306 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5308 warning("Can't rollback device %s: %s", dev, msg,
5309 hint="cleanup manually the unused logical volumes")
5310 raise errors.OpExecError("Can't add local storage to drbd")
5312 dev.children = new_lvs
5313 cfg.Update(instance)
5315 # Step: wait for sync
5317 # this can fail as the old devices are degraded and _WaitForSync
5318 # does a combined result over all disks, so we don't check its
5320 self.proc.LogStep(5, steps_total, "sync devices")
5321 _WaitForSync(self, instance, unlock=True)
5323 # so check manually all the devices
5324 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5325 cfg.SetDiskID(dev, instance.primary_node)
5326 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5327 msg = result.RemoteFailMsg()
5328 if not msg and not result.payload:
5329 msg = "disk not found"
5331 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5333 if result.payload[5]:
5334 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5336 # Step: remove old storage
5337 self.proc.LogStep(6, steps_total, "removing old storage")
5338 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5339 info("remove logical volumes for %s" % name)
5341 cfg.SetDiskID(lv, tgt_node)
5342 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5344 warning("Can't remove old LV: %s" % msg,
5345 hint="manually remove unused LVs")
5348 def _ExecD8Secondary(self, feedback_fn):
5349 """Replace the secondary node for drbd8.
5351 The algorithm for replace is quite complicated:
5352 - for all disks of the instance:
5353 - create new LVs on the new node with same names
5354 - shutdown the drbd device on the old secondary
5355 - disconnect the drbd network on the primary
5356 - create the drbd device on the new secondary
5357 - network attach the drbd on the primary, using an artifice:
5358 the drbd code for Attach() will connect to the network if it
5359 finds a device which is connected to the good local disks but
5361 - wait for sync across all devices
5362 - remove all disks from the old secondary
5364 Failures are not very well handled.
5368 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5369 instance = self.instance
5373 old_node = self.tgt_node
5374 new_node = self.new_node
5375 pri_node = instance.primary_node
5377 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5378 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5379 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5382 # Step: check device activation
5383 self.proc.LogStep(1, steps_total, "check device existence")
5384 info("checking volume groups")
5385 my_vg = cfg.GetVGName()
5386 results = self.rpc.call_vg_list([pri_node, new_node])
5387 for node in pri_node, new_node:
5389 if res.failed or not res.data or my_vg not in res.data:
5390 raise errors.OpExecError("Volume group '%s' not found on %s" %
5392 for idx, dev in enumerate(instance.disks):
5393 if idx not in self.op.disks:
5395 info("checking disk/%d on %s" % (idx, pri_node))
5396 cfg.SetDiskID(dev, pri_node)
5397 result = self.rpc.call_blockdev_find(pri_node, dev)
5398 msg = result.RemoteFailMsg()
5399 if not msg and not result.payload:
5400 msg = "disk not found"
5402 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5403 (idx, pri_node, msg))
5405 # Step: check other node consistency
5406 self.proc.LogStep(2, steps_total, "check peer consistency")
5407 for idx, dev in enumerate(instance.disks):
5408 if idx not in self.op.disks:
5410 info("checking disk/%d consistency on %s" % (idx, pri_node))
5411 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5412 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5413 " unsafe to replace the secondary" %
5416 # Step: create new storage
5417 self.proc.LogStep(3, steps_total, "allocate new storage")
5418 for idx, dev in enumerate(instance.disks):
5419 info("adding new local storage on %s for disk/%d" %
5421 # we pass force_create=True to force LVM creation
5422 for new_lv in dev.children:
5423 _CreateBlockDev(self, new_node, instance, new_lv, True,
5424 _GetInstanceInfoText(instance), False)
5426 # Step 4: dbrd minors and drbd setups changes
5427 # after this, we must manually remove the drbd minors on both the
5428 # error and the success paths
5429 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5431 logging.debug("Allocated minors %s" % (minors,))
5432 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5433 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5435 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5436 # create new devices on new_node; note that we create two IDs:
5437 # one without port, so the drbd will be activated without
5438 # networking information on the new node at this stage, and one
5439 # with network, for the latter activation in step 4
5440 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5441 if pri_node == o_node1:
5446 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5447 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5449 iv_names[idx] = (dev, dev.children, new_net_id)
5450 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5452 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5453 logical_id=new_alone_id,
5454 children=dev.children,
5457 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5458 _GetInstanceInfoText(instance), False)
5459 except errors.GenericError:
5460 self.cfg.ReleaseDRBDMinors(instance.name)
5463 for idx, dev in enumerate(instance.disks):
5464 # we have new devices, shutdown the drbd on the old secondary
5465 info("shutting down drbd for disk/%d on old node" % idx)
5466 cfg.SetDiskID(dev, old_node)
5467 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5469 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5471 hint="Please cleanup this device manually as soon as possible")
5473 info("detaching primary drbds from the network (=> standalone)")
5474 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5475 instance.disks)[pri_node]
5477 msg = result.RemoteFailMsg()
5479 # detaches didn't succeed (unlikely)
5480 self.cfg.ReleaseDRBDMinors(instance.name)
5481 raise errors.OpExecError("Can't detach the disks from the network on"
5482 " old node: %s" % (msg,))
5484 # if we managed to detach at least one, we update all the disks of
5485 # the instance to point to the new secondary
5486 info("updating instance configuration")
5487 for dev, _, new_logical_id in iv_names.itervalues():
5488 dev.logical_id = new_logical_id
5489 cfg.SetDiskID(dev, pri_node)
5490 cfg.Update(instance)
5492 # and now perform the drbd attach
5493 info("attaching primary drbds to new secondary (standalone => connected)")
5494 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5495 instance.disks, instance.name,
5497 for to_node, to_result in result.items():
5498 msg = to_result.RemoteFailMsg()
5500 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5501 hint="please do a gnt-instance info to see the"
5504 # this can fail as the old devices are degraded and _WaitForSync
5505 # does a combined result over all disks, so we don't check its
5507 self.proc.LogStep(5, steps_total, "sync devices")
5508 _WaitForSync(self, instance, unlock=True)
5510 # so check manually all the devices
5511 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5512 cfg.SetDiskID(dev, pri_node)
5513 result = self.rpc.call_blockdev_find(pri_node, dev)
5514 msg = result.RemoteFailMsg()
5515 if not msg and not result.payload:
5516 msg = "disk not found"
5518 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5520 if result.payload[5]:
5521 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5523 self.proc.LogStep(6, steps_total, "removing old storage")
5524 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5525 info("remove logical volumes for disk/%d" % idx)
5527 cfg.SetDiskID(lv, old_node)
5528 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5530 warning("Can't remove LV on old secondary: %s", msg,
5531 hint="Cleanup stale volumes by hand")
5533 def Exec(self, feedback_fn):
5534 """Execute disk replacement.
5536 This dispatches the disk replacement to the appropriate handler.
5539 instance = self.instance
5541 # Activate the instance disks if we're replacing them on a down instance
5542 if not instance.admin_up:
5543 _StartInstanceDisks(self, instance, True)
5545 if self.op.mode == constants.REPLACE_DISK_CHG:
5546 fn = self._ExecD8Secondary
5548 fn = self._ExecD8DiskOnly
5550 ret = fn(feedback_fn)
5552 # Deactivate the instance disks if we're replacing them on a down instance
5553 if not instance.admin_up:
5554 _SafeShutdownInstanceDisks(self, instance)
5559 class LUGrowDisk(LogicalUnit):
5560 """Grow a disk of an instance.
5564 HTYPE = constants.HTYPE_INSTANCE
5565 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5568 def ExpandNames(self):
5569 self._ExpandAndLockInstance()
5570 self.needed_locks[locking.LEVEL_NODE] = []
5571 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5573 def DeclareLocks(self, level):
5574 if level == locking.LEVEL_NODE:
5575 self._LockInstancesNodes()
5577 def BuildHooksEnv(self):
5580 This runs on the master, the primary and all the secondaries.
5584 "DISK": self.op.disk,
5585 "AMOUNT": self.op.amount,
5587 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5589 self.cfg.GetMasterNode(),
5590 self.instance.primary_node,
5594 def CheckPrereq(self):
5595 """Check prerequisites.
5597 This checks that the instance is in the cluster.
5600 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5601 assert instance is not None, \
5602 "Cannot retrieve locked instance %s" % self.op.instance_name
5603 nodenames = list(instance.all_nodes)
5604 for node in nodenames:
5605 _CheckNodeOnline(self, node)
5608 self.instance = instance
5610 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5611 raise errors.OpPrereqError("Instance's disk layout does not support"
5614 self.disk = instance.FindDisk(self.op.disk)
5616 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5617 instance.hypervisor)
5618 for node in nodenames:
5619 info = nodeinfo[node]
5620 if info.failed or not info.data:
5621 raise errors.OpPrereqError("Cannot get current information"
5622 " from node '%s'" % node)
5623 vg_free = info.data.get('vg_free', None)
5624 if not isinstance(vg_free, int):
5625 raise errors.OpPrereqError("Can't compute free disk space on"
5627 if self.op.amount > vg_free:
5628 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5629 " %d MiB available, %d MiB required" %
5630 (node, vg_free, self.op.amount))
5632 def Exec(self, feedback_fn):
5633 """Execute disk grow.
5636 instance = self.instance
5638 for node in instance.all_nodes:
5639 self.cfg.SetDiskID(disk, node)
5640 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5641 msg = result.RemoteFailMsg()
5643 raise errors.OpExecError("Grow request failed to node %s: %s" %
5645 disk.RecordGrow(self.op.amount)
5646 self.cfg.Update(instance)
5647 if self.op.wait_for_sync:
5648 disk_abort = not _WaitForSync(self, instance)
5650 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5651 " status.\nPlease check the instance.")
5654 class LUQueryInstanceData(NoHooksLU):
5655 """Query runtime instance data.
5658 _OP_REQP = ["instances", "static"]
5661 def ExpandNames(self):
5662 self.needed_locks = {}
5663 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5665 if not isinstance(self.op.instances, list):
5666 raise errors.OpPrereqError("Invalid argument type 'instances'")
5668 if self.op.instances:
5669 self.wanted_names = []
5670 for name in self.op.instances:
5671 full_name = self.cfg.ExpandInstanceName(name)
5672 if full_name is None:
5673 raise errors.OpPrereqError("Instance '%s' not known" % name)
5674 self.wanted_names.append(full_name)
5675 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5677 self.wanted_names = None
5678 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5680 self.needed_locks[locking.LEVEL_NODE] = []
5681 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5683 def DeclareLocks(self, level):
5684 if level == locking.LEVEL_NODE:
5685 self._LockInstancesNodes()
5687 def CheckPrereq(self):
5688 """Check prerequisites.
5690 This only checks the optional instance list against the existing names.
5693 if self.wanted_names is None:
5694 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5696 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5697 in self.wanted_names]
5700 def _ComputeDiskStatus(self, instance, snode, dev):
5701 """Compute block device status.
5704 static = self.op.static
5706 self.cfg.SetDiskID(dev, instance.primary_node)
5707 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5708 if dev_pstatus.offline:
5711 msg = dev_pstatus.RemoteFailMsg()
5713 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5714 (instance.name, msg))
5715 dev_pstatus = dev_pstatus.payload
5719 if dev.dev_type in constants.LDS_DRBD:
5720 # we change the snode then (otherwise we use the one passed in)
5721 if dev.logical_id[0] == instance.primary_node:
5722 snode = dev.logical_id[1]
5724 snode = dev.logical_id[0]
5726 if snode and not static:
5727 self.cfg.SetDiskID(dev, snode)
5728 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5729 if dev_sstatus.offline:
5732 msg = dev_sstatus.RemoteFailMsg()
5734 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5735 (instance.name, msg))
5736 dev_sstatus = dev_sstatus.payload
5741 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5742 for child in dev.children]
5747 "iv_name": dev.iv_name,
5748 "dev_type": dev.dev_type,
5749 "logical_id": dev.logical_id,
5750 "physical_id": dev.physical_id,
5751 "pstatus": dev_pstatus,
5752 "sstatus": dev_sstatus,
5753 "children": dev_children,
5760 def Exec(self, feedback_fn):
5761 """Gather and return data"""
5764 cluster = self.cfg.GetClusterInfo()
5766 for instance in self.wanted_instances:
5767 if not self.op.static:
5768 remote_info = self.rpc.call_instance_info(instance.primary_node,
5770 instance.hypervisor)
5772 remote_info = remote_info.data
5773 if remote_info and "state" in remote_info:
5776 remote_state = "down"
5779 if instance.admin_up:
5782 config_state = "down"
5784 disks = [self._ComputeDiskStatus(instance, None, device)
5785 for device in instance.disks]
5788 "name": instance.name,
5789 "config_state": config_state,
5790 "run_state": remote_state,
5791 "pnode": instance.primary_node,
5792 "snodes": instance.secondary_nodes,
5794 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5796 "hypervisor": instance.hypervisor,
5797 "network_port": instance.network_port,
5798 "hv_instance": instance.hvparams,
5799 "hv_actual": cluster.FillHV(instance),
5800 "be_instance": instance.beparams,
5801 "be_actual": cluster.FillBE(instance),
5804 result[instance.name] = idict
5809 class LUSetInstanceParams(LogicalUnit):
5810 """Modifies an instances's parameters.
5813 HPATH = "instance-modify"
5814 HTYPE = constants.HTYPE_INSTANCE
5815 _OP_REQP = ["instance_name"]
5818 def CheckArguments(self):
5819 if not hasattr(self.op, 'nics'):
5821 if not hasattr(self.op, 'disks'):
5823 if not hasattr(self.op, 'beparams'):
5824 self.op.beparams = {}
5825 if not hasattr(self.op, 'hvparams'):
5826 self.op.hvparams = {}
5827 self.op.force = getattr(self.op, "force", False)
5828 if not (self.op.nics or self.op.disks or
5829 self.op.hvparams or self.op.beparams):
5830 raise errors.OpPrereqError("No changes submitted")
5834 for disk_op, disk_dict in self.op.disks:
5835 if disk_op == constants.DDM_REMOVE:
5838 elif disk_op == constants.DDM_ADD:
5841 if not isinstance(disk_op, int):
5842 raise errors.OpPrereqError("Invalid disk index")
5843 if disk_op == constants.DDM_ADD:
5844 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5845 if mode not in constants.DISK_ACCESS_SET:
5846 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5847 size = disk_dict.get('size', None)
5849 raise errors.OpPrereqError("Required disk parameter size missing")
5852 except ValueError, err:
5853 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5855 disk_dict['size'] = size
5857 # modification of disk
5858 if 'size' in disk_dict:
5859 raise errors.OpPrereqError("Disk size change not possible, use"
5862 if disk_addremove > 1:
5863 raise errors.OpPrereqError("Only one disk add or remove operation"
5864 " supported at a time")
5868 for nic_op, nic_dict in self.op.nics:
5869 if nic_op == constants.DDM_REMOVE:
5872 elif nic_op == constants.DDM_ADD:
5875 if not isinstance(nic_op, int):
5876 raise errors.OpPrereqError("Invalid nic index")
5878 # nic_dict should be a dict
5879 nic_ip = nic_dict.get('ip', None)
5880 if nic_ip is not None:
5881 if nic_ip.lower() == constants.VALUE_NONE:
5882 nic_dict['ip'] = None
5884 if not utils.IsValidIP(nic_ip):
5885 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5887 if nic_op == constants.DDM_ADD:
5888 nic_bridge = nic_dict.get('bridge', None)
5889 if nic_bridge is None:
5890 nic_dict['bridge'] = self.cfg.GetDefBridge()
5891 nic_mac = nic_dict.get('mac', None)
5893 nic_dict['mac'] = constants.VALUE_AUTO
5895 if 'mac' in nic_dict:
5896 nic_mac = nic_dict['mac']
5897 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5898 if not utils.IsValidMac(nic_mac):
5899 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5900 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5901 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5902 " modifying an existing nic")
5904 if nic_addremove > 1:
5905 raise errors.OpPrereqError("Only one NIC add or remove operation"
5906 " supported at a time")
5908 def ExpandNames(self):
5909 self._ExpandAndLockInstance()
5910 self.needed_locks[locking.LEVEL_NODE] = []
5911 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5913 def DeclareLocks(self, level):
5914 if level == locking.LEVEL_NODE:
5915 self._LockInstancesNodes()
5917 def BuildHooksEnv(self):
5920 This runs on the master, primary and secondaries.
5924 if constants.BE_MEMORY in self.be_new:
5925 args['memory'] = self.be_new[constants.BE_MEMORY]
5926 if constants.BE_VCPUS in self.be_new:
5927 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5928 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5929 # information at all.
5932 nic_override = dict(self.op.nics)
5933 for idx, nic in enumerate(self.instance.nics):
5934 if idx in nic_override:
5935 this_nic_override = nic_override[idx]
5937 this_nic_override = {}
5938 if 'ip' in this_nic_override:
5939 ip = this_nic_override['ip']
5942 if 'bridge' in this_nic_override:
5943 bridge = this_nic_override['bridge']
5946 if 'mac' in this_nic_override:
5947 mac = this_nic_override['mac']
5950 args['nics'].append((ip, bridge, mac))
5951 if constants.DDM_ADD in nic_override:
5952 ip = nic_override[constants.DDM_ADD].get('ip', None)
5953 bridge = nic_override[constants.DDM_ADD]['bridge']
5954 mac = nic_override[constants.DDM_ADD]['mac']
5955 args['nics'].append((ip, bridge, mac))
5956 elif constants.DDM_REMOVE in nic_override:
5957 del args['nics'][-1]
5959 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5960 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5963 def CheckPrereq(self):
5964 """Check prerequisites.
5966 This only checks the instance list against the existing names.
5969 force = self.force = self.op.force
5971 # checking the new params on the primary/secondary nodes
5973 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5974 assert self.instance is not None, \
5975 "Cannot retrieve locked instance %s" % self.op.instance_name
5976 pnode = instance.primary_node
5977 nodelist = list(instance.all_nodes)
5979 # hvparams processing
5980 if self.op.hvparams:
5981 i_hvdict = copy.deepcopy(instance.hvparams)
5982 for key, val in self.op.hvparams.iteritems():
5983 if val == constants.VALUE_DEFAULT:
5990 cluster = self.cfg.GetClusterInfo()
5991 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5992 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5995 hypervisor.GetHypervisor(
5996 instance.hypervisor).CheckParameterSyntax(hv_new)
5997 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5998 self.hv_new = hv_new # the new actual values
5999 self.hv_inst = i_hvdict # the new dict (without defaults)
6001 self.hv_new = self.hv_inst = {}
6003 # beparams processing
6004 if self.op.beparams:
6005 i_bedict = copy.deepcopy(instance.beparams)
6006 for key, val in self.op.beparams.iteritems():
6007 if val == constants.VALUE_DEFAULT:
6014 cluster = self.cfg.GetClusterInfo()
6015 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6016 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6018 self.be_new = be_new # the new actual values
6019 self.be_inst = i_bedict # the new dict (without defaults)
6021 self.be_new = self.be_inst = {}
6025 if constants.BE_MEMORY in self.op.beparams and not self.force:
6026 mem_check_list = [pnode]
6027 if be_new[constants.BE_AUTO_BALANCE]:
6028 # either we changed auto_balance to yes or it was from before
6029 mem_check_list.extend(instance.secondary_nodes)
6030 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6031 instance.hypervisor)
6032 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6033 instance.hypervisor)
6034 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6035 # Assume the primary node is unreachable and go ahead
6036 self.warn.append("Can't get info from primary node %s" % pnode)
6038 if not instance_info.failed and instance_info.data:
6039 current_mem = int(instance_info.data['memory'])
6041 # Assume instance not running
6042 # (there is a slight race condition here, but it's not very probable,
6043 # and we have no other way to check)
6045 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6046 nodeinfo[pnode].data['memory_free'])
6048 raise errors.OpPrereqError("This change will prevent the instance"
6049 " from starting, due to %d MB of memory"
6050 " missing on its primary node" % miss_mem)
6052 if be_new[constants.BE_AUTO_BALANCE]:
6053 for node, nres in nodeinfo.iteritems():
6054 if node not in instance.secondary_nodes:
6056 if nres.failed or not isinstance(nres.data, dict):
6057 self.warn.append("Can't get info from secondary node %s" % node)
6058 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6059 self.warn.append("Not enough memory to failover instance to"
6060 " secondary node %s" % node)
6063 for nic_op, nic_dict in self.op.nics:
6064 if nic_op == constants.DDM_REMOVE:
6065 if not instance.nics:
6066 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6068 if nic_op != constants.DDM_ADD:
6070 if nic_op < 0 or nic_op >= len(instance.nics):
6071 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6073 (nic_op, len(instance.nics)))
6074 if 'bridge' in nic_dict:
6075 nic_bridge = nic_dict['bridge']
6076 if nic_bridge is None:
6077 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6078 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6079 msg = ("Bridge '%s' doesn't exist on one of"
6080 " the instance nodes" % nic_bridge)
6082 self.warn.append(msg)
6084 raise errors.OpPrereqError(msg)
6085 if 'mac' in nic_dict:
6086 nic_mac = nic_dict['mac']
6088 raise errors.OpPrereqError('Cannot set the nic mac to None')
6089 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6090 # otherwise generate the mac
6091 nic_dict['mac'] = self.cfg.GenerateMAC()
6093 # or validate/reserve the current one
6094 if self.cfg.IsMacInUse(nic_mac):
6095 raise errors.OpPrereqError("MAC address %s already in use"
6096 " in cluster" % nic_mac)
6099 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6100 raise errors.OpPrereqError("Disk operations not supported for"
6101 " diskless instances")
6102 for disk_op, disk_dict in self.op.disks:
6103 if disk_op == constants.DDM_REMOVE:
6104 if len(instance.disks) == 1:
6105 raise errors.OpPrereqError("Cannot remove the last disk of"
6107 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6108 ins_l = ins_l[pnode]
6109 if ins_l.failed or not isinstance(ins_l.data, list):
6110 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6111 if instance.name in ins_l.data:
6112 raise errors.OpPrereqError("Instance is running, can't remove"
6115 if (disk_op == constants.DDM_ADD and
6116 len(instance.nics) >= constants.MAX_DISKS):
6117 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6118 " add more" % constants.MAX_DISKS)
6119 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6121 if disk_op < 0 or disk_op >= len(instance.disks):
6122 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6124 (disk_op, len(instance.disks)))
6128 def Exec(self, feedback_fn):
6129 """Modifies an instance.
6131 All parameters take effect only at the next restart of the instance.
6134 # Process here the warnings from CheckPrereq, as we don't have a
6135 # feedback_fn there.
6136 for warn in self.warn:
6137 feedback_fn("WARNING: %s" % warn)
6140 instance = self.instance
6142 for disk_op, disk_dict in self.op.disks:
6143 if disk_op == constants.DDM_REMOVE:
6144 # remove the last disk
6145 device = instance.disks.pop()
6146 device_idx = len(instance.disks)
6147 for node, disk in device.ComputeNodeTree(instance.primary_node):
6148 self.cfg.SetDiskID(disk, node)
6149 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6151 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6152 " continuing anyway", device_idx, node, msg)
6153 result.append(("disk/%d" % device_idx, "remove"))
6154 elif disk_op == constants.DDM_ADD:
6156 if instance.disk_template == constants.DT_FILE:
6157 file_driver, file_path = instance.disks[0].logical_id
6158 file_path = os.path.dirname(file_path)
6160 file_driver = file_path = None
6161 disk_idx_base = len(instance.disks)
6162 new_disk = _GenerateDiskTemplate(self,
6163 instance.disk_template,
6164 instance.name, instance.primary_node,
6165 instance.secondary_nodes,
6170 instance.disks.append(new_disk)
6171 info = _GetInstanceInfoText(instance)
6173 logging.info("Creating volume %s for instance %s",
6174 new_disk.iv_name, instance.name)
6175 # Note: this needs to be kept in sync with _CreateDisks
6177 for node in instance.all_nodes:
6178 f_create = node == instance.primary_node
6180 _CreateBlockDev(self, node, instance, new_disk,
6181 f_create, info, f_create)
6182 except errors.OpExecError, err:
6183 self.LogWarning("Failed to create volume %s (%s) on"
6185 new_disk.iv_name, new_disk, node, err)
6186 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6187 (new_disk.size, new_disk.mode)))
6189 # change a given disk
6190 instance.disks[disk_op].mode = disk_dict['mode']
6191 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6193 for nic_op, nic_dict in self.op.nics:
6194 if nic_op == constants.DDM_REMOVE:
6195 # remove the last nic
6196 del instance.nics[-1]
6197 result.append(("nic.%d" % len(instance.nics), "remove"))
6198 elif nic_op == constants.DDM_ADD:
6199 # mac and bridge should be set, by now
6200 mac = nic_dict['mac']
6201 bridge = nic_dict['bridge']
6202 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6204 instance.nics.append(new_nic)
6205 result.append(("nic.%d" % (len(instance.nics) - 1),
6206 "add:mac=%s,ip=%s,bridge=%s" %
6207 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6209 # change a given nic
6210 for key in 'mac', 'ip', 'bridge':
6212 setattr(instance.nics[nic_op], key, nic_dict[key])
6213 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6216 if self.op.hvparams:
6217 instance.hvparams = self.hv_inst
6218 for key, val in self.op.hvparams.iteritems():
6219 result.append(("hv/%s" % key, val))
6222 if self.op.beparams:
6223 instance.beparams = self.be_inst
6224 for key, val in self.op.beparams.iteritems():
6225 result.append(("be/%s" % key, val))
6227 self.cfg.Update(instance)
6232 class LUQueryExports(NoHooksLU):
6233 """Query the exports list
6236 _OP_REQP = ['nodes']
6239 def ExpandNames(self):
6240 self.needed_locks = {}
6241 self.share_locks[locking.LEVEL_NODE] = 1
6242 if not self.op.nodes:
6243 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6245 self.needed_locks[locking.LEVEL_NODE] = \
6246 _GetWantedNodes(self, self.op.nodes)
6248 def CheckPrereq(self):
6249 """Check prerequisites.
6252 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6254 def Exec(self, feedback_fn):
6255 """Compute the list of all the exported system images.
6258 @return: a dictionary with the structure node->(export-list)
6259 where export-list is a list of the instances exported on
6263 rpcresult = self.rpc.call_export_list(self.nodes)
6265 for node in rpcresult:
6266 if rpcresult[node].failed:
6267 result[node] = False
6269 result[node] = rpcresult[node].data
6274 class LUExportInstance(LogicalUnit):
6275 """Export an instance to an image in the cluster.
6278 HPATH = "instance-export"
6279 HTYPE = constants.HTYPE_INSTANCE
6280 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6283 def ExpandNames(self):
6284 self._ExpandAndLockInstance()
6285 # FIXME: lock only instance primary and destination node
6287 # Sad but true, for now we have do lock all nodes, as we don't know where
6288 # the previous export might be, and and in this LU we search for it and
6289 # remove it from its current node. In the future we could fix this by:
6290 # - making a tasklet to search (share-lock all), then create the new one,
6291 # then one to remove, after
6292 # - removing the removal operation altoghether
6293 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6295 def DeclareLocks(self, level):
6296 """Last minute lock declaration."""
6297 # All nodes are locked anyway, so nothing to do here.
6299 def BuildHooksEnv(self):
6302 This will run on the master, primary node and target node.
6306 "EXPORT_NODE": self.op.target_node,
6307 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6309 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6310 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6311 self.op.target_node]
6314 def CheckPrereq(self):
6315 """Check prerequisites.
6317 This checks that the instance and node names are valid.
6320 instance_name = self.op.instance_name
6321 self.instance = self.cfg.GetInstanceInfo(instance_name)
6322 assert self.instance is not None, \
6323 "Cannot retrieve locked instance %s" % self.op.instance_name
6324 _CheckNodeOnline(self, self.instance.primary_node)
6326 self.dst_node = self.cfg.GetNodeInfo(
6327 self.cfg.ExpandNodeName(self.op.target_node))
6329 if self.dst_node is None:
6330 # This is wrong node name, not a non-locked node
6331 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6332 _CheckNodeOnline(self, self.dst_node.name)
6333 _CheckNodeNotDrained(self, self.dst_node.name)
6335 # instance disk type verification
6336 for disk in self.instance.disks:
6337 if disk.dev_type == constants.LD_FILE:
6338 raise errors.OpPrereqError("Export not supported for instances with"
6339 " file-based disks")
6341 def Exec(self, feedback_fn):
6342 """Export an instance to an image in the cluster.
6345 instance = self.instance
6346 dst_node = self.dst_node
6347 src_node = instance.primary_node
6348 if self.op.shutdown:
6349 # shutdown the instance, but not the disks
6350 result = self.rpc.call_instance_shutdown(src_node, instance)
6351 msg = result.RemoteFailMsg()
6353 raise errors.OpExecError("Could not shutdown instance %s on"
6355 (instance.name, src_node, msg))
6357 vgname = self.cfg.GetVGName()
6361 # set the disks ID correctly since call_instance_start needs the
6362 # correct drbd minor to create the symlinks
6363 for disk in instance.disks:
6364 self.cfg.SetDiskID(disk, src_node)
6367 for idx, disk in enumerate(instance.disks):
6368 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6369 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6370 if new_dev_name.failed or not new_dev_name.data:
6371 self.LogWarning("Could not snapshot disk/%d on node %s",
6373 snap_disks.append(False)
6375 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6376 logical_id=(vgname, new_dev_name.data),
6377 physical_id=(vgname, new_dev_name.data),
6378 iv_name=disk.iv_name)
6379 snap_disks.append(new_dev)
6382 if self.op.shutdown and instance.admin_up:
6383 result = self.rpc.call_instance_start(src_node, instance, None, None)
6384 msg = result.RemoteFailMsg()
6386 _ShutdownInstanceDisks(self, instance)
6387 raise errors.OpExecError("Could not start instance: %s" % msg)
6389 # TODO: check for size
6391 cluster_name = self.cfg.GetClusterName()
6392 for idx, dev in enumerate(snap_disks):
6394 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6395 instance, cluster_name, idx)
6396 if result.failed or not result.data:
6397 self.LogWarning("Could not export disk/%d from node %s to"
6398 " node %s", idx, src_node, dst_node.name)
6399 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6401 self.LogWarning("Could not remove snapshot for disk/%d from node"
6402 " %s: %s", idx, src_node, msg)
6404 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6405 if result.failed or not result.data:
6406 self.LogWarning("Could not finalize export for instance %s on node %s",
6407 instance.name, dst_node.name)
6409 nodelist = self.cfg.GetNodeList()
6410 nodelist.remove(dst_node.name)
6412 # on one-node clusters nodelist will be empty after the removal
6413 # if we proceed the backup would be removed because OpQueryExports
6414 # substitutes an empty list with the full cluster node list.
6416 exportlist = self.rpc.call_export_list(nodelist)
6417 for node in exportlist:
6418 if exportlist[node].failed:
6420 if instance.name in exportlist[node].data:
6421 if not self.rpc.call_export_remove(node, instance.name):
6422 self.LogWarning("Could not remove older export for instance %s"
6423 " on node %s", instance.name, node)
6426 class LURemoveExport(NoHooksLU):
6427 """Remove exports related to the named instance.
6430 _OP_REQP = ["instance_name"]
6433 def ExpandNames(self):
6434 self.needed_locks = {}
6435 # We need all nodes to be locked in order for RemoveExport to work, but we
6436 # don't need to lock the instance itself, as nothing will happen to it (and
6437 # we can remove exports also for a removed instance)
6438 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6440 def CheckPrereq(self):
6441 """Check prerequisites.
6445 def Exec(self, feedback_fn):
6446 """Remove any export.
6449 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6450 # If the instance was not found we'll try with the name that was passed in.
6451 # This will only work if it was an FQDN, though.
6453 if not instance_name:
6455 instance_name = self.op.instance_name
6457 exportlist = self.rpc.call_export_list(self.acquired_locks[
6458 locking.LEVEL_NODE])
6460 for node in exportlist:
6461 if exportlist[node].failed:
6462 self.LogWarning("Failed to query node %s, continuing" % node)
6464 if instance_name in exportlist[node].data:
6466 result = self.rpc.call_export_remove(node, instance_name)
6467 if result.failed or not result.data:
6468 logging.error("Could not remove export for instance %s"
6469 " on node %s", instance_name, node)
6471 if fqdn_warn and not found:
6472 feedback_fn("Export not found. If trying to remove an export belonging"
6473 " to a deleted instance please use its Fully Qualified"
6477 class TagsLU(NoHooksLU):
6480 This is an abstract class which is the parent of all the other tags LUs.
6484 def ExpandNames(self):
6485 self.needed_locks = {}
6486 if self.op.kind == constants.TAG_NODE:
6487 name = self.cfg.ExpandNodeName(self.op.name)
6489 raise errors.OpPrereqError("Invalid node name (%s)" %
6492 self.needed_locks[locking.LEVEL_NODE] = name
6493 elif self.op.kind == constants.TAG_INSTANCE:
6494 name = self.cfg.ExpandInstanceName(self.op.name)
6496 raise errors.OpPrereqError("Invalid instance name (%s)" %
6499 self.needed_locks[locking.LEVEL_INSTANCE] = name
6501 def CheckPrereq(self):
6502 """Check prerequisites.
6505 if self.op.kind == constants.TAG_CLUSTER:
6506 self.target = self.cfg.GetClusterInfo()
6507 elif self.op.kind == constants.TAG_NODE:
6508 self.target = self.cfg.GetNodeInfo(self.op.name)
6509 elif self.op.kind == constants.TAG_INSTANCE:
6510 self.target = self.cfg.GetInstanceInfo(self.op.name)
6512 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6516 class LUGetTags(TagsLU):
6517 """Returns the tags of a given object.
6520 _OP_REQP = ["kind", "name"]
6523 def Exec(self, feedback_fn):
6524 """Returns the tag list.
6527 return list(self.target.GetTags())
6530 class LUSearchTags(NoHooksLU):
6531 """Searches the tags for a given pattern.
6534 _OP_REQP = ["pattern"]
6537 def ExpandNames(self):
6538 self.needed_locks = {}
6540 def CheckPrereq(self):
6541 """Check prerequisites.
6543 This checks the pattern passed for validity by compiling it.
6547 self.re = re.compile(self.op.pattern)
6548 except re.error, err:
6549 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6550 (self.op.pattern, err))
6552 def Exec(self, feedback_fn):
6553 """Returns the tag list.
6557 tgts = [("/cluster", cfg.GetClusterInfo())]
6558 ilist = cfg.GetAllInstancesInfo().values()
6559 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6560 nlist = cfg.GetAllNodesInfo().values()
6561 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6563 for path, target in tgts:
6564 for tag in target.GetTags():
6565 if self.re.search(tag):
6566 results.append((path, tag))
6570 class LUAddTags(TagsLU):
6571 """Sets a tag on a given object.
6574 _OP_REQP = ["kind", "name", "tags"]
6577 def CheckPrereq(self):
6578 """Check prerequisites.
6580 This checks the type and length of the tag name and value.
6583 TagsLU.CheckPrereq(self)
6584 for tag in self.op.tags:
6585 objects.TaggableObject.ValidateTag(tag)
6587 def Exec(self, feedback_fn):
6592 for tag in self.op.tags:
6593 self.target.AddTag(tag)
6594 except errors.TagError, err:
6595 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6597 self.cfg.Update(self.target)
6598 except errors.ConfigurationError:
6599 raise errors.OpRetryError("There has been a modification to the"
6600 " config file and the operation has been"
6601 " aborted. Please retry.")
6604 class LUDelTags(TagsLU):
6605 """Delete a list of tags from a given object.
6608 _OP_REQP = ["kind", "name", "tags"]
6611 def CheckPrereq(self):
6612 """Check prerequisites.
6614 This checks that we have the given tag.
6617 TagsLU.CheckPrereq(self)
6618 for tag in self.op.tags:
6619 objects.TaggableObject.ValidateTag(tag)
6620 del_tags = frozenset(self.op.tags)
6621 cur_tags = self.target.GetTags()
6622 if not del_tags <= cur_tags:
6623 diff_tags = del_tags - cur_tags
6624 diff_names = ["'%s'" % tag for tag in diff_tags]
6626 raise errors.OpPrereqError("Tag(s) %s not found" %
6627 (",".join(diff_names)))
6629 def Exec(self, feedback_fn):
6630 """Remove the tag from the object.
6633 for tag in self.op.tags:
6634 self.target.RemoveTag(tag)
6636 self.cfg.Update(self.target)
6637 except errors.ConfigurationError:
6638 raise errors.OpRetryError("There has been a modification to the"
6639 " config file and the operation has been"
6640 " aborted. Please retry.")
6643 class LUTestDelay(NoHooksLU):
6644 """Sleep for a specified amount of time.
6646 This LU sleeps on the master and/or nodes for a specified amount of
6650 _OP_REQP = ["duration", "on_master", "on_nodes"]
6653 def ExpandNames(self):
6654 """Expand names and set required locks.
6656 This expands the node list, if any.
6659 self.needed_locks = {}
6660 if self.op.on_nodes:
6661 # _GetWantedNodes can be used here, but is not always appropriate to use
6662 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6664 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6665 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6667 def CheckPrereq(self):
6668 """Check prerequisites.
6672 def Exec(self, feedback_fn):
6673 """Do the actual sleep.
6676 if self.op.on_master:
6677 if not utils.TestDelay(self.op.duration):
6678 raise errors.OpExecError("Error during master delay test")
6679 if self.op.on_nodes:
6680 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6682 raise errors.OpExecError("Complete failure from rpc call")
6683 for node, node_result in result.items():
6685 if not node_result.data:
6686 raise errors.OpExecError("Failure during rpc call to node %s,"
6687 " result: %s" % (node, node_result.data))
6690 class IAllocator(object):
6691 """IAllocator framework.
6693 An IAllocator instance has three sets of attributes:
6694 - cfg that is needed to query the cluster
6695 - input data (all members of the _KEYS class attribute are required)
6696 - four buffer attributes (in|out_data|text), that represent the
6697 input (to the external script) in text and data structure format,
6698 and the output from it, again in two formats
6699 - the result variables from the script (success, info, nodes) for
6704 "mem_size", "disks", "disk_template",
6705 "os", "tags", "nics", "vcpus", "hypervisor",
6711 def __init__(self, lu, mode, name, **kwargs):
6713 # init buffer variables
6714 self.in_text = self.out_text = self.in_data = self.out_data = None
6715 # init all input fields so that pylint is happy
6718 self.mem_size = self.disks = self.disk_template = None
6719 self.os = self.tags = self.nics = self.vcpus = None
6720 self.hypervisor = None
6721 self.relocate_from = None
6723 self.required_nodes = None
6724 # init result fields
6725 self.success = self.info = self.nodes = None
6726 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6727 keyset = self._ALLO_KEYS
6728 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6729 keyset = self._RELO_KEYS
6731 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6732 " IAllocator" % self.mode)
6734 if key not in keyset:
6735 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6736 " IAllocator" % key)
6737 setattr(self, key, kwargs[key])
6739 if key not in kwargs:
6740 raise errors.ProgrammerError("Missing input parameter '%s' to"
6741 " IAllocator" % key)
6742 self._BuildInputData()
6744 def _ComputeClusterData(self):
6745 """Compute the generic allocator input data.
6747 This is the data that is independent of the actual operation.
6751 cluster_info = cfg.GetClusterInfo()
6754 "version": constants.IALLOCATOR_VERSION,
6755 "cluster_name": cfg.GetClusterName(),
6756 "cluster_tags": list(cluster_info.GetTags()),
6757 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6758 # we don't have job IDs
6760 iinfo = cfg.GetAllInstancesInfo().values()
6761 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6765 node_list = cfg.GetNodeList()
6767 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6768 hypervisor_name = self.hypervisor
6769 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6770 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6772 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6774 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6775 cluster_info.enabled_hypervisors)
6776 for nname, nresult in node_data.items():
6777 # first fill in static (config-based) values
6778 ninfo = cfg.GetNodeInfo(nname)
6780 "tags": list(ninfo.GetTags()),
6781 "primary_ip": ninfo.primary_ip,
6782 "secondary_ip": ninfo.secondary_ip,
6783 "offline": ninfo.offline,
6784 "drained": ninfo.drained,
6785 "master_candidate": ninfo.master_candidate,
6788 if not ninfo.offline:
6790 if not isinstance(nresult.data, dict):
6791 raise errors.OpExecError("Can't get data for node %s" % nname)
6792 remote_info = nresult.data
6793 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6794 'vg_size', 'vg_free', 'cpu_total']:
6795 if attr not in remote_info:
6796 raise errors.OpExecError("Node '%s' didn't return attribute"
6797 " '%s'" % (nname, attr))
6799 remote_info[attr] = int(remote_info[attr])
6800 except ValueError, err:
6801 raise errors.OpExecError("Node '%s' returned invalid value"
6802 " for '%s': %s" % (nname, attr, err))
6803 # compute memory used by primary instances
6804 i_p_mem = i_p_up_mem = 0
6805 for iinfo, beinfo in i_list:
6806 if iinfo.primary_node == nname:
6807 i_p_mem += beinfo[constants.BE_MEMORY]
6808 if iinfo.name not in node_iinfo[nname].data:
6811 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6812 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6813 remote_info['memory_free'] -= max(0, i_mem_diff)
6816 i_p_up_mem += beinfo[constants.BE_MEMORY]
6818 # compute memory used by instances
6820 "total_memory": remote_info['memory_total'],
6821 "reserved_memory": remote_info['memory_dom0'],
6822 "free_memory": remote_info['memory_free'],
6823 "total_disk": remote_info['vg_size'],
6824 "free_disk": remote_info['vg_free'],
6825 "total_cpus": remote_info['cpu_total'],
6826 "i_pri_memory": i_p_mem,
6827 "i_pri_up_memory": i_p_up_mem,
6831 node_results[nname] = pnr
6832 data["nodes"] = node_results
6836 for iinfo, beinfo in i_list:
6837 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6838 for n in iinfo.nics]
6840 "tags": list(iinfo.GetTags()),
6841 "admin_up": iinfo.admin_up,
6842 "vcpus": beinfo[constants.BE_VCPUS],
6843 "memory": beinfo[constants.BE_MEMORY],
6845 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6847 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6848 "disk_template": iinfo.disk_template,
6849 "hypervisor": iinfo.hypervisor,
6851 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6853 instance_data[iinfo.name] = pir
6855 data["instances"] = instance_data
6859 def _AddNewInstance(self):
6860 """Add new instance data to allocator structure.
6862 This in combination with _AllocatorGetClusterData will create the
6863 correct structure needed as input for the allocator.
6865 The checks for the completeness of the opcode must have already been
6871 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6873 if self.disk_template in constants.DTS_NET_MIRROR:
6874 self.required_nodes = 2
6876 self.required_nodes = 1
6880 "disk_template": self.disk_template,
6883 "vcpus": self.vcpus,
6884 "memory": self.mem_size,
6885 "disks": self.disks,
6886 "disk_space_total": disk_space,
6888 "required_nodes": self.required_nodes,
6890 data["request"] = request
6892 def _AddRelocateInstance(self):
6893 """Add relocate instance data to allocator structure.
6895 This in combination with _IAllocatorGetClusterData will create the
6896 correct structure needed as input for the allocator.
6898 The checks for the completeness of the opcode must have already been
6902 instance = self.lu.cfg.GetInstanceInfo(self.name)
6903 if instance is None:
6904 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6905 " IAllocator" % self.name)
6907 if instance.disk_template not in constants.DTS_NET_MIRROR:
6908 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6910 if len(instance.secondary_nodes) != 1:
6911 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6913 self.required_nodes = 1
6914 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6915 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6920 "disk_space_total": disk_space,
6921 "required_nodes": self.required_nodes,
6922 "relocate_from": self.relocate_from,
6924 self.in_data["request"] = request
6926 def _BuildInputData(self):
6927 """Build input data structures.
6930 self._ComputeClusterData()
6932 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6933 self._AddNewInstance()
6935 self._AddRelocateInstance()
6937 self.in_text = serializer.Dump(self.in_data)
6939 def Run(self, name, validate=True, call_fn=None):
6940 """Run an instance allocator and return the results.
6944 call_fn = self.lu.rpc.call_iallocator_runner
6947 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6950 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6951 raise errors.OpExecError("Invalid result from master iallocator runner")
6953 rcode, stdout, stderr, fail = result.data
6955 if rcode == constants.IARUN_NOTFOUND:
6956 raise errors.OpExecError("Can't find allocator '%s'" % name)
6957 elif rcode == constants.IARUN_FAILURE:
6958 raise errors.OpExecError("Instance allocator call failed: %s,"
6959 " output: %s" % (fail, stdout+stderr))
6960 self.out_text = stdout
6962 self._ValidateResult()
6964 def _ValidateResult(self):
6965 """Process the allocator results.
6967 This will process and if successful save the result in
6968 self.out_data and the other parameters.
6972 rdict = serializer.Load(self.out_text)
6973 except Exception, err:
6974 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6976 if not isinstance(rdict, dict):
6977 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6979 for key in "success", "info", "nodes":
6980 if key not in rdict:
6981 raise errors.OpExecError("Can't parse iallocator results:"
6982 " missing key '%s'" % key)
6983 setattr(self, key, rdict[key])
6985 if not isinstance(rdict["nodes"], list):
6986 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6988 self.out_data = rdict
6991 class LUTestAllocator(NoHooksLU):
6992 """Run allocator tests.
6994 This LU runs the allocator tests
6997 _OP_REQP = ["direction", "mode", "name"]
6999 def CheckPrereq(self):
7000 """Check prerequisites.
7002 This checks the opcode parameters depending on the director and mode test.
7005 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7006 for attr in ["name", "mem_size", "disks", "disk_template",
7007 "os", "tags", "nics", "vcpus"]:
7008 if not hasattr(self.op, attr):
7009 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7011 iname = self.cfg.ExpandInstanceName(self.op.name)
7012 if iname is not None:
7013 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7015 if not isinstance(self.op.nics, list):
7016 raise errors.OpPrereqError("Invalid parameter 'nics'")
7017 for row in self.op.nics:
7018 if (not isinstance(row, dict) or
7021 "bridge" not in row):
7022 raise errors.OpPrereqError("Invalid contents of the"
7023 " 'nics' parameter")
7024 if not isinstance(self.op.disks, list):
7025 raise errors.OpPrereqError("Invalid parameter 'disks'")
7026 for row in self.op.disks:
7027 if (not isinstance(row, dict) or
7028 "size" not in row or
7029 not isinstance(row["size"], int) or
7030 "mode" not in row or
7031 row["mode"] not in ['r', 'w']):
7032 raise errors.OpPrereqError("Invalid contents of the"
7033 " 'disks' parameter")
7034 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7035 self.op.hypervisor = self.cfg.GetHypervisorType()
7036 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7037 if not hasattr(self.op, "name"):
7038 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7039 fname = self.cfg.ExpandInstanceName(self.op.name)
7041 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7043 self.op.name = fname
7044 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7046 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7049 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7050 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7051 raise errors.OpPrereqError("Missing allocator name")
7052 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7053 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7056 def Exec(self, feedback_fn):
7057 """Run the allocator test.
7060 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7061 ial = IAllocator(self,
7064 mem_size=self.op.mem_size,
7065 disks=self.op.disks,
7066 disk_template=self.op.disk_template,
7070 vcpus=self.op.vcpus,
7071 hypervisor=self.op.hypervisor,
7074 ial = IAllocator(self,
7077 relocate_from=list(self.relocate_from),
7080 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7081 result = ial.in_text
7083 ial.Run(self.op.allocator, validate=False)
7084 result = ial.out_text