4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the node is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _CheckNodeNotDrained(lu, node):
445 """Ensure that a given node is not drained.
447 @param lu: the LU on behalf of which we make the check
448 @param node: the node to check
449 @raise errors.OpPrereqError: if the node is drained
452 if lu.cfg.GetNodeInfo(node).drained:
453 raise errors.OpPrereqError("Can't use drained node %s" % node)
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457 memory, vcpus, nics, disk_template, disks):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @return: the hook environment for this instance
493 "INSTANCE_NAME": name,
494 "INSTANCE_PRIMARY": primary_node,
495 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496 "INSTANCE_OS_TYPE": os_type,
497 "INSTANCE_STATUS": str_status,
498 "INSTANCE_MEMORY": memory,
499 "INSTANCE_VCPUS": vcpus,
500 "INSTANCE_DISK_TEMPLATE": disk_template,
504 nic_count = len(nics)
505 for idx, (ip, bridge, mac) in enumerate(nics):
508 env["INSTANCE_NIC%d_IP" % idx] = ip
509 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510 env["INSTANCE_NIC%d_MAC" % idx] = mac
514 env["INSTANCE_NIC_COUNT"] = nic_count
517 disk_count = len(disks)
518 for idx, (size, mode) in enumerate(disks):
519 env["INSTANCE_DISK%d_SIZE" % idx] = size
520 env["INSTANCE_DISK%d_MODE" % idx] = mode
524 env["INSTANCE_DISK_COUNT"] = disk_count
529 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530 """Builds instance related env variables for hooks from an object.
532 @type lu: L{LogicalUnit}
533 @param lu: the logical unit on whose behalf we execute
534 @type instance: L{objects.Instance}
535 @param instance: the instance for which we should build the
538 @param override: dictionary with key/values that will override
541 @return: the hook environment dictionary
544 bep = lu.cfg.GetClusterInfo().FillBE(instance)
546 'name': instance.name,
547 'primary_node': instance.primary_node,
548 'secondary_nodes': instance.secondary_nodes,
549 'os_type': instance.os,
550 'status': instance.admin_up,
551 'memory': bep[constants.BE_MEMORY],
552 'vcpus': bep[constants.BE_VCPUS],
553 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554 'disk_template': instance.disk_template,
555 'disks': [(disk.size, disk.mode) for disk in instance.disks],
558 args.update(override)
559 return _BuildInstanceHookEnv(**args)
562 def _AdjustCandidatePool(lu):
563 """Adjust the candidate pool after node operations.
566 mod_list = lu.cfg.MaintainCandidatePool()
568 lu.LogInfo("Promoted nodes to master candidate role: %s",
569 ", ".join(node.name for node in mod_list))
570 for name in mod_list:
571 lu.context.ReaddNode(name)
572 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
574 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
578 def _CheckInstanceBridgesExist(lu, instance):
579 """Check that the brigdes needed by an instance exist.
582 # check bridges existance
583 brlist = [nic.bridge for nic in instance.nics]
584 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
587 raise errors.OpPrereqError("One or more target bridges %s does not"
588 " exist on destination node '%s'" %
589 (brlist, instance.primary_node))
592 class LUDestroyCluster(NoHooksLU):
593 """Logical unit for destroying the cluster.
598 def CheckPrereq(self):
599 """Check prerequisites.
601 This checks whether the cluster is empty.
603 Any errors are signalled by raising errors.OpPrereqError.
606 master = self.cfg.GetMasterNode()
608 nodelist = self.cfg.GetNodeList()
609 if len(nodelist) != 1 or nodelist[0] != master:
610 raise errors.OpPrereqError("There are still %d node(s) in"
611 " this cluster." % (len(nodelist) - 1))
612 instancelist = self.cfg.GetInstanceList()
614 raise errors.OpPrereqError("There are still %d instance(s) in"
615 " this cluster." % len(instancelist))
617 def Exec(self, feedback_fn):
618 """Destroys the cluster.
621 master = self.cfg.GetMasterNode()
622 result = self.rpc.call_node_stop_master(master, False)
625 raise errors.OpExecError("Could not disable the master role")
626 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627 utils.CreateBackup(priv_key)
628 utils.CreateBackup(pub_key)
632 class LUVerifyCluster(LogicalUnit):
633 """Verifies the cluster status.
636 HPATH = "cluster-verify"
637 HTYPE = constants.HTYPE_CLUSTER
638 _OP_REQP = ["skip_checks"]
641 def ExpandNames(self):
642 self.needed_locks = {
643 locking.LEVEL_NODE: locking.ALL_SET,
644 locking.LEVEL_INSTANCE: locking.ALL_SET,
646 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
648 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649 node_result, feedback_fn, master_files,
651 """Run multiple tests against a node.
655 - compares ganeti version
656 - checks vg existance and size > 20G
657 - checks config file checksum
658 - checks ssh to other nodes
660 @type nodeinfo: L{objects.Node}
661 @param nodeinfo: the node to check
662 @param file_list: required list of files
663 @param local_cksum: dictionary of local files and their checksums
664 @param node_result: the results from the node
665 @param feedback_fn: function used to accumulate results
666 @param master_files: list of files that only masters should have
667 @param drbd_map: the useddrbd minors for this node, in
668 form of minor: (instance, must_exist) which correspond to instances
669 and their running status
670 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
675 # main result, node_result should be a non-empty dict
676 if not node_result or not isinstance(node_result, dict):
677 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
680 # compares ganeti version
681 local_version = constants.PROTOCOL_VERSION
682 remote_version = node_result.get('version', None)
683 if not (remote_version and isinstance(remote_version, (list, tuple)) and
684 len(remote_version) == 2):
685 feedback_fn(" - ERROR: connection to %s failed" % (node))
688 if local_version != remote_version[0]:
689 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
690 " node %s %s" % (local_version, node, remote_version[0]))
693 # node seems compatible, we can actually try to look into its results
697 # full package version
698 if constants.RELEASE_VERSION != remote_version[1]:
699 feedback_fn(" - WARNING: software version mismatch: master %s,"
701 (constants.RELEASE_VERSION, node, remote_version[1]))
703 # checks vg existence and size > 20G
704 if vg_name is not None:
705 vglist = node_result.get(constants.NV_VGLIST, None)
707 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
711 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712 constants.MIN_VG_SIZE)
714 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
717 # checks config file checksum
719 remote_cksum = node_result.get(constants.NV_FILELIST, None)
720 if not isinstance(remote_cksum, dict):
722 feedback_fn(" - ERROR: node hasn't returned file checksum data")
724 for file_name in file_list:
725 node_is_mc = nodeinfo.master_candidate
726 must_have_file = file_name not in master_files
727 if file_name not in remote_cksum:
728 if node_is_mc or must_have_file:
730 feedback_fn(" - ERROR: file '%s' missing" % file_name)
731 elif remote_cksum[file_name] != local_cksum[file_name]:
732 if node_is_mc or must_have_file:
734 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
736 # not candidate and this is not a must-have file
738 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
741 # all good, except non-master/non-must have combination
742 if not node_is_mc and not must_have_file:
743 feedback_fn(" - ERROR: file '%s' should not exist on non master"
744 " candidates" % file_name)
748 if constants.NV_NODELIST not in node_result:
750 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
752 if node_result[constants.NV_NODELIST]:
754 for node in node_result[constants.NV_NODELIST]:
755 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
756 (node, node_result[constants.NV_NODELIST][node]))
758 if constants.NV_NODENETTEST not in node_result:
760 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
762 if node_result[constants.NV_NODENETTEST]:
764 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
766 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
767 (node, node_result[constants.NV_NODENETTEST][node]))
769 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770 if isinstance(hyp_result, dict):
771 for hv_name, hv_result in hyp_result.iteritems():
772 if hv_result is not None:
773 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
774 (hv_name, hv_result))
776 # check used drbd list
777 if vg_name is not None:
778 used_minors = node_result.get(constants.NV_DRBDLIST, [])
779 if not isinstance(used_minors, (tuple, list)):
780 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
783 for minor, (iname, must_exist) in drbd_map.items():
784 if minor not in used_minors and must_exist:
785 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
786 " not active" % (minor, iname))
788 for minor in used_minors:
789 if minor not in drbd_map:
790 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
796 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
797 node_instance, feedback_fn, n_offline):
798 """Verify an instance.
800 This function checks to see if the required block devices are
801 available on the instance's node.
806 node_current = instanceconfig.primary_node
809 instanceconfig.MapLVsByNode(node_vol_should)
811 for node in node_vol_should:
812 if node in n_offline:
813 # ignore missing volumes on offline nodes
815 for volume in node_vol_should[node]:
816 if node not in node_vol_is or volume not in node_vol_is[node]:
817 feedback_fn(" - ERROR: volume %s missing on node %s" %
821 if instanceconfig.admin_up:
822 if ((node_current not in node_instance or
823 not instance in node_instance[node_current]) and
824 node_current not in n_offline):
825 feedback_fn(" - ERROR: instance %s not running on node %s" %
826 (instance, node_current))
829 for node in node_instance:
830 if (not node == node_current):
831 if instance in node_instance[node]:
832 feedback_fn(" - ERROR: instance %s should not run on node %s" %
838 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
839 """Verify if there are any unknown volumes in the cluster.
841 The .os, .swap and backup volumes are ignored. All other volumes are
847 for node in node_vol_is:
848 for volume in node_vol_is[node]:
849 if node not in node_vol_should or volume not in node_vol_should[node]:
850 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
855 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
856 """Verify the list of running instances.
858 This checks what instances are running but unknown to the cluster.
862 for node in node_instance:
863 for runninginstance in node_instance[node]:
864 if runninginstance not in instancelist:
865 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
866 (runninginstance, node))
870 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
871 """Verify N+1 Memory Resilience.
873 Check that if one single node dies we can still start all the instances it
879 for node, nodeinfo in node_info.iteritems():
880 # This code checks that every node which is now listed as secondary has
881 # enough memory to host all instances it is supposed to should a single
882 # other node in the cluster fail.
883 # FIXME: not ready for failover to an arbitrary node
884 # FIXME: does not support file-backed instances
885 # WARNING: we currently take into account down instances as well as up
886 # ones, considering that even if they're down someone might want to start
887 # them even in the event of a node failure.
888 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
890 for instance in instances:
891 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
892 if bep[constants.BE_AUTO_BALANCE]:
893 needed_mem += bep[constants.BE_MEMORY]
894 if nodeinfo['mfree'] < needed_mem:
895 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
896 " failovers should node %s fail" % (node, prinode))
900 def CheckPrereq(self):
901 """Check prerequisites.
903 Transform the list of checks we're going to skip into a set and check that
904 all its members are valid.
907 self.skip_set = frozenset(self.op.skip_checks)
908 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
909 raise errors.OpPrereqError("Invalid checks to be skipped specified")
911 def BuildHooksEnv(self):
914 Cluster-Verify hooks just rone in the post phase and their failure makes
915 the output be logged in the verify output and the verification to fail.
918 all_nodes = self.cfg.GetNodeList()
920 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
922 for node in self.cfg.GetAllNodesInfo().values():
923 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
925 return env, [], all_nodes
927 def Exec(self, feedback_fn):
928 """Verify integrity of cluster, performing various test on nodes.
932 feedback_fn("* Verifying global settings")
933 for msg in self.cfg.VerifyConfig():
934 feedback_fn(" - ERROR: %s" % msg)
936 vg_name = self.cfg.GetVGName()
937 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
938 nodelist = utils.NiceSort(self.cfg.GetNodeList())
939 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
940 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
941 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
942 for iname in instancelist)
943 i_non_redundant = [] # Non redundant instances
944 i_non_a_balanced = [] # Non auto-balanced instances
945 n_offline = [] # List of offline nodes
946 n_drained = [] # List of nodes being drained
952 # FIXME: verify OS list
954 master_files = [constants.CLUSTER_CONF_FILE]
956 file_names = ssconf.SimpleStore().GetFileList()
957 file_names.append(constants.SSL_CERT_FILE)
958 file_names.append(constants.RAPI_CERT_FILE)
959 file_names.extend(master_files)
961 local_checksums = utils.FingerprintFiles(file_names)
963 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
964 node_verify_param = {
965 constants.NV_FILELIST: file_names,
966 constants.NV_NODELIST: [node.name for node in nodeinfo
967 if not node.offline],
968 constants.NV_HYPERVISOR: hypervisors,
969 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
970 node.secondary_ip) for node in nodeinfo
971 if not node.offline],
972 constants.NV_INSTANCELIST: hypervisors,
973 constants.NV_VERSION: None,
974 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
976 if vg_name is not None:
977 node_verify_param[constants.NV_VGLIST] = None
978 node_verify_param[constants.NV_LVLIST] = vg_name
979 node_verify_param[constants.NV_DRBDLIST] = None
980 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
981 self.cfg.GetClusterName())
983 cluster = self.cfg.GetClusterInfo()
984 master_node = self.cfg.GetMasterNode()
985 all_drbd_map = self.cfg.ComputeDRBDMap()
987 for node_i in nodeinfo:
989 nresult = all_nvinfo[node].data
992 feedback_fn("* Skipping offline node %s" % (node,))
993 n_offline.append(node)
996 if node == master_node:
998 elif node_i.master_candidate:
999 ntype = "master candidate"
1000 elif node_i.drained:
1002 n_drained.append(node)
1005 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1007 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1008 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1013 for minor, instance in all_drbd_map[node].items():
1014 if instance not in instanceinfo:
1015 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1017 # ghost instance should not be running, but otherwise we
1018 # don't give double warnings (both ghost instance and
1019 # unallocated minor in use)
1020 node_drbd[minor] = (instance, False)
1022 instance = instanceinfo[instance]
1023 node_drbd[minor] = (instance.name, instance.admin_up)
1024 result = self._VerifyNode(node_i, file_names, local_checksums,
1025 nresult, feedback_fn, master_files,
1029 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1031 node_volume[node] = {}
1032 elif isinstance(lvdata, basestring):
1033 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1034 (node, utils.SafeEncode(lvdata)))
1036 node_volume[node] = {}
1037 elif not isinstance(lvdata, dict):
1038 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1042 node_volume[node] = lvdata
1045 idata = nresult.get(constants.NV_INSTANCELIST, None)
1046 if not isinstance(idata, list):
1047 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1052 node_instance[node] = idata
1055 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1056 if not isinstance(nodeinfo, dict):
1057 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1063 "mfree": int(nodeinfo['memory_free']),
1066 # dictionary holding all instances this node is secondary for,
1067 # grouped by their primary node. Each key is a cluster node, and each
1068 # value is a list of instances which have the key as primary and the
1069 # current node as secondary. this is handy to calculate N+1 memory
1070 # availability if you can only failover from a primary to its
1072 "sinst-by-pnode": {},
1074 # FIXME: devise a free space model for file based instances as well
1075 if vg_name is not None:
1076 if (constants.NV_VGLIST not in nresult or
1077 vg_name not in nresult[constants.NV_VGLIST]):
1078 feedback_fn(" - ERROR: node %s didn't return data for the"
1079 " volume group '%s' - it is either missing or broken" %
1083 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1084 except (ValueError, KeyError):
1085 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1086 " from node %s" % (node,))
1090 node_vol_should = {}
1092 for instance in instancelist:
1093 feedback_fn("* Verifying instance %s" % instance)
1094 inst_config = instanceinfo[instance]
1095 result = self._VerifyInstance(instance, inst_config, node_volume,
1096 node_instance, feedback_fn, n_offline)
1098 inst_nodes_offline = []
1100 inst_config.MapLVsByNode(node_vol_should)
1102 instance_cfg[instance] = inst_config
1104 pnode = inst_config.primary_node
1105 if pnode in node_info:
1106 node_info[pnode]['pinst'].append(instance)
1107 elif pnode not in n_offline:
1108 feedback_fn(" - ERROR: instance %s, connection to primary node"
1109 " %s failed" % (instance, pnode))
1112 if pnode in n_offline:
1113 inst_nodes_offline.append(pnode)
1115 # If the instance is non-redundant we cannot survive losing its primary
1116 # node, so we are not N+1 compliant. On the other hand we have no disk
1117 # templates with more than one secondary so that situation is not well
1119 # FIXME: does not support file-backed instances
1120 if len(inst_config.secondary_nodes) == 0:
1121 i_non_redundant.append(instance)
1122 elif len(inst_config.secondary_nodes) > 1:
1123 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1126 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1127 i_non_a_balanced.append(instance)
1129 for snode in inst_config.secondary_nodes:
1130 if snode in node_info:
1131 node_info[snode]['sinst'].append(instance)
1132 if pnode not in node_info[snode]['sinst-by-pnode']:
1133 node_info[snode]['sinst-by-pnode'][pnode] = []
1134 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1135 elif snode not in n_offline:
1136 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1137 " %s failed" % (instance, snode))
1139 if snode in n_offline:
1140 inst_nodes_offline.append(snode)
1142 if inst_nodes_offline:
1143 # warn that the instance lives on offline nodes, and set bad=True
1144 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1145 ", ".join(inst_nodes_offline))
1148 feedback_fn("* Verifying orphan volumes")
1149 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1153 feedback_fn("* Verifying remaining instances")
1154 result = self._VerifyOrphanInstances(instancelist, node_instance,
1158 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1159 feedback_fn("* Verifying N+1 Memory redundancy")
1160 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1163 feedback_fn("* Other Notes")
1165 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1166 % len(i_non_redundant))
1168 if i_non_a_balanced:
1169 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1170 % len(i_non_a_balanced))
1173 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1176 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1180 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1181 """Analize the post-hooks' result
1183 This method analyses the hook result, handles it, and sends some
1184 nicely-formatted feedback back to the user.
1186 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1187 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1188 @param hooks_results: the results of the multi-node hooks rpc call
1189 @param feedback_fn: function used send feedback back to the caller
1190 @param lu_result: previous Exec result
1191 @return: the new Exec result, based on the previous result
1195 # We only really run POST phase hooks, and are only interested in
1197 if phase == constants.HOOKS_PHASE_POST:
1198 # Used to change hooks' output to proper indentation
1199 indent_re = re.compile('^', re.M)
1200 feedback_fn("* Hooks Results")
1201 if not hooks_results:
1202 feedback_fn(" - ERROR: general communication failure")
1205 for node_name in hooks_results:
1206 show_node_header = True
1207 res = hooks_results[node_name]
1208 if res.failed or res.data is False or not isinstance(res.data, list):
1210 # no need to warn or set fail return value
1212 feedback_fn(" Communication failure in hooks execution")
1215 for script, hkr, output in res.data:
1216 if hkr == constants.HKR_FAIL:
1217 # The node header is only shown once, if there are
1218 # failing hooks on that node
1219 if show_node_header:
1220 feedback_fn(" Node %s:" % node_name)
1221 show_node_header = False
1222 feedback_fn(" ERROR: Script %s failed, output:" % script)
1223 output = indent_re.sub(' ', output)
1224 feedback_fn("%s" % output)
1230 class LUVerifyDisks(NoHooksLU):
1231 """Verifies the cluster disks status.
1237 def ExpandNames(self):
1238 self.needed_locks = {
1239 locking.LEVEL_NODE: locking.ALL_SET,
1240 locking.LEVEL_INSTANCE: locking.ALL_SET,
1242 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1244 def CheckPrereq(self):
1245 """Check prerequisites.
1247 This has no prerequisites.
1252 def Exec(self, feedback_fn):
1253 """Verify integrity of cluster disks.
1256 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1258 vg_name = self.cfg.GetVGName()
1259 nodes = utils.NiceSort(self.cfg.GetNodeList())
1260 instances = [self.cfg.GetInstanceInfo(name)
1261 for name in self.cfg.GetInstanceList()]
1264 for inst in instances:
1266 if (not inst.admin_up or
1267 inst.disk_template not in constants.DTS_NET_MIRROR):
1269 inst.MapLVsByNode(inst_lvs)
1270 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1271 for node, vol_list in inst_lvs.iteritems():
1272 for vol in vol_list:
1273 nv_dict[(node, vol)] = inst
1278 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1283 lvs = node_lvs[node]
1286 self.LogWarning("Connection to node %s failed: %s" %
1290 if isinstance(lvs, basestring):
1291 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1292 res_nlvm[node] = lvs
1294 elif not isinstance(lvs, dict):
1295 logging.warning("Connection to node %s failed or invalid data"
1297 res_nodes.append(node)
1300 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1301 inst = nv_dict.pop((node, lv_name), None)
1302 if (not lv_online and inst is not None
1303 and inst.name not in res_instances):
1304 res_instances.append(inst.name)
1306 # any leftover items in nv_dict are missing LVs, let's arrange the
1308 for key, inst in nv_dict.iteritems():
1309 if inst.name not in res_missing:
1310 res_missing[inst.name] = []
1311 res_missing[inst.name].append(key)
1316 class LURenameCluster(LogicalUnit):
1317 """Rename the cluster.
1320 HPATH = "cluster-rename"
1321 HTYPE = constants.HTYPE_CLUSTER
1324 def BuildHooksEnv(self):
1329 "OP_TARGET": self.cfg.GetClusterName(),
1330 "NEW_NAME": self.op.name,
1332 mn = self.cfg.GetMasterNode()
1333 return env, [mn], [mn]
1335 def CheckPrereq(self):
1336 """Verify that the passed name is a valid one.
1339 hostname = utils.HostInfo(self.op.name)
1341 new_name = hostname.name
1342 self.ip = new_ip = hostname.ip
1343 old_name = self.cfg.GetClusterName()
1344 old_ip = self.cfg.GetMasterIP()
1345 if new_name == old_name and new_ip == old_ip:
1346 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1347 " cluster has changed")
1348 if new_ip != old_ip:
1349 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1350 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1351 " reachable on the network. Aborting." %
1354 self.op.name = new_name
1356 def Exec(self, feedback_fn):
1357 """Rename the cluster.
1360 clustername = self.op.name
1363 # shutdown the master IP
1364 master = self.cfg.GetMasterNode()
1365 result = self.rpc.call_node_stop_master(master, False)
1366 if result.failed or not result.data:
1367 raise errors.OpExecError("Could not disable the master role")
1370 cluster = self.cfg.GetClusterInfo()
1371 cluster.cluster_name = clustername
1372 cluster.master_ip = ip
1373 self.cfg.Update(cluster)
1375 # update the known hosts file
1376 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1377 node_list = self.cfg.GetNodeList()
1379 node_list.remove(master)
1382 result = self.rpc.call_upload_file(node_list,
1383 constants.SSH_KNOWN_HOSTS_FILE)
1384 for to_node, to_result in result.iteritems():
1385 if to_result.failed or not to_result.data:
1386 logging.error("Copy of file %s to node %s failed",
1387 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1390 result = self.rpc.call_node_start_master(master, False)
1391 if result.failed or not result.data:
1392 self.LogWarning("Could not re-enable the master role on"
1393 " the master, please restart manually.")
1396 def _RecursiveCheckIfLVMBased(disk):
1397 """Check if the given disk or its children are lvm-based.
1399 @type disk: L{objects.Disk}
1400 @param disk: the disk to check
1402 @return: boolean indicating whether a LD_LV dev_type was found or not
1406 for chdisk in disk.children:
1407 if _RecursiveCheckIfLVMBased(chdisk):
1409 return disk.dev_type == constants.LD_LV
1412 class LUSetClusterParams(LogicalUnit):
1413 """Change the parameters of the cluster.
1416 HPATH = "cluster-modify"
1417 HTYPE = constants.HTYPE_CLUSTER
1421 def CheckArguments(self):
1425 if not hasattr(self.op, "candidate_pool_size"):
1426 self.op.candidate_pool_size = None
1427 if self.op.candidate_pool_size is not None:
1429 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1430 except (ValueError, TypeError), err:
1431 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1433 if self.op.candidate_pool_size < 1:
1434 raise errors.OpPrereqError("At least one master candidate needed")
1436 def ExpandNames(self):
1437 # FIXME: in the future maybe other cluster params won't require checking on
1438 # all nodes to be modified.
1439 self.needed_locks = {
1440 locking.LEVEL_NODE: locking.ALL_SET,
1442 self.share_locks[locking.LEVEL_NODE] = 1
1444 def BuildHooksEnv(self):
1449 "OP_TARGET": self.cfg.GetClusterName(),
1450 "NEW_VG_NAME": self.op.vg_name,
1452 mn = self.cfg.GetMasterNode()
1453 return env, [mn], [mn]
1455 def CheckPrereq(self):
1456 """Check prerequisites.
1458 This checks whether the given params don't conflict and
1459 if the given volume group is valid.
1462 if self.op.vg_name is not None and not self.op.vg_name:
1463 instances = self.cfg.GetAllInstancesInfo().values()
1464 for inst in instances:
1465 for disk in inst.disks:
1466 if _RecursiveCheckIfLVMBased(disk):
1467 raise errors.OpPrereqError("Cannot disable lvm storage while"
1468 " lvm-based instances exist")
1470 node_list = self.acquired_locks[locking.LEVEL_NODE]
1472 # if vg_name not None, checks given volume group on all nodes
1474 vglist = self.rpc.call_vg_list(node_list)
1475 for node in node_list:
1476 if vglist[node].failed:
1477 # ignoring down node
1478 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1480 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1482 constants.MIN_VG_SIZE)
1484 raise errors.OpPrereqError("Error on node '%s': %s" %
1487 self.cluster = cluster = self.cfg.GetClusterInfo()
1488 # validate beparams changes
1489 if self.op.beparams:
1490 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1491 self.new_beparams = cluster.FillDict(
1492 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1494 # hypervisor list/parameters
1495 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1496 if self.op.hvparams:
1497 if not isinstance(self.op.hvparams, dict):
1498 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1499 for hv_name, hv_dict in self.op.hvparams.items():
1500 if hv_name not in self.new_hvparams:
1501 self.new_hvparams[hv_name] = hv_dict
1503 self.new_hvparams[hv_name].update(hv_dict)
1505 if self.op.enabled_hypervisors is not None:
1506 self.hv_list = self.op.enabled_hypervisors
1508 self.hv_list = cluster.enabled_hypervisors
1510 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1511 # either the enabled list has changed, or the parameters have, validate
1512 for hv_name, hv_params in self.new_hvparams.items():
1513 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1514 (self.op.enabled_hypervisors and
1515 hv_name in self.op.enabled_hypervisors)):
1516 # either this is a new hypervisor, or its parameters have changed
1517 hv_class = hypervisor.GetHypervisor(hv_name)
1518 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1519 hv_class.CheckParameterSyntax(hv_params)
1520 _CheckHVParams(self, node_list, hv_name, hv_params)
1522 def Exec(self, feedback_fn):
1523 """Change the parameters of the cluster.
1526 if self.op.vg_name is not None:
1527 if self.op.vg_name != self.cfg.GetVGName():
1528 self.cfg.SetVGName(self.op.vg_name)
1530 feedback_fn("Cluster LVM configuration already in desired"
1531 " state, not changing")
1532 if self.op.hvparams:
1533 self.cluster.hvparams = self.new_hvparams
1534 if self.op.enabled_hypervisors is not None:
1535 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1536 if self.op.beparams:
1537 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1538 if self.op.candidate_pool_size is not None:
1539 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1541 self.cfg.Update(self.cluster)
1543 # we want to update nodes after the cluster so that if any errors
1544 # happen, we have recorded and saved the cluster info
1545 if self.op.candidate_pool_size is not None:
1546 _AdjustCandidatePool(self)
1549 class LURedistributeConfig(NoHooksLU):
1550 """Force the redistribution of cluster configuration.
1552 This is a very simple LU.
1558 def ExpandNames(self):
1559 self.needed_locks = {
1560 locking.LEVEL_NODE: locking.ALL_SET,
1562 self.share_locks[locking.LEVEL_NODE] = 1
1564 def CheckPrereq(self):
1565 """Check prerequisites.
1569 def Exec(self, feedback_fn):
1570 """Redistribute the configuration.
1573 self.cfg.Update(self.cfg.GetClusterInfo())
1576 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1577 """Sleep and poll for an instance's disk to sync.
1580 if not instance.disks:
1584 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1586 node = instance.primary_node
1588 for dev in instance.disks:
1589 lu.cfg.SetDiskID(dev, node)
1595 cumul_degraded = False
1596 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1597 if rstats.failed or not rstats.data:
1598 lu.LogWarning("Can't get any data from node %s", node)
1601 raise errors.RemoteError("Can't contact node %s for mirror data,"
1602 " aborting." % node)
1605 rstats = rstats.data
1607 for i, mstat in enumerate(rstats):
1609 lu.LogWarning("Can't compute data for node %s/%s",
1610 node, instance.disks[i].iv_name)
1612 # we ignore the ldisk parameter
1613 perc_done, est_time, is_degraded, _ = mstat
1614 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1615 if perc_done is not None:
1617 if est_time is not None:
1618 rem_time = "%d estimated seconds remaining" % est_time
1621 rem_time = "no time estimate"
1622 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1623 (instance.disks[i].iv_name, perc_done, rem_time))
1627 time.sleep(min(60, max_time))
1630 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1631 return not cumul_degraded
1634 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1635 """Check that mirrors are not degraded.
1637 The ldisk parameter, if True, will change the test from the
1638 is_degraded attribute (which represents overall non-ok status for
1639 the device(s)) to the ldisk (representing the local storage status).
1642 lu.cfg.SetDiskID(dev, node)
1649 if on_primary or dev.AssembleOnSecondary():
1650 rstats = lu.rpc.call_blockdev_find(node, dev)
1651 msg = rstats.RemoteFailMsg()
1653 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1655 elif not rstats.payload:
1656 lu.LogWarning("Can't find disk on node %s", node)
1659 result = result and (not rstats.payload[idx])
1661 for child in dev.children:
1662 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1667 class LUDiagnoseOS(NoHooksLU):
1668 """Logical unit for OS diagnose/query.
1671 _OP_REQP = ["output_fields", "names"]
1673 _FIELDS_STATIC = utils.FieldSet()
1674 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1676 def ExpandNames(self):
1678 raise errors.OpPrereqError("Selective OS query not supported")
1680 _CheckOutputFields(static=self._FIELDS_STATIC,
1681 dynamic=self._FIELDS_DYNAMIC,
1682 selected=self.op.output_fields)
1684 # Lock all nodes, in shared mode
1685 # Temporary removal of locks, should be reverted later
1686 # TODO: reintroduce locks when they are lighter-weight
1687 self.needed_locks = {}
1688 #self.share_locks[locking.LEVEL_NODE] = 1
1689 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1691 def CheckPrereq(self):
1692 """Check prerequisites.
1697 def _DiagnoseByOS(node_list, rlist):
1698 """Remaps a per-node return list into an a per-os per-node dictionary
1700 @param node_list: a list with the names of all nodes
1701 @param rlist: a map with node names as keys and OS objects as values
1704 @return: a dictionary with osnames as keys and as value another map, with
1705 nodes as keys and list of OS objects as values, eg::
1707 {"debian-etch": {"node1": [<object>,...],
1708 "node2": [<object>,]}
1713 # we build here the list of nodes that didn't fail the RPC (at RPC
1714 # level), so that nodes with a non-responding node daemon don't
1715 # make all OSes invalid
1716 good_nodes = [node_name for node_name in rlist
1717 if not rlist[node_name].failed]
1718 for node_name, nr in rlist.iteritems():
1719 if nr.failed or not nr.data:
1721 for os_obj in nr.data:
1722 if os_obj.name not in all_os:
1723 # build a list of nodes for this os containing empty lists
1724 # for each node in node_list
1725 all_os[os_obj.name] = {}
1726 for nname in good_nodes:
1727 all_os[os_obj.name][nname] = []
1728 all_os[os_obj.name][node_name].append(os_obj)
1731 def Exec(self, feedback_fn):
1732 """Compute the list of OSes.
1735 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1736 node_data = self.rpc.call_os_diagnose(valid_nodes)
1737 if node_data == False:
1738 raise errors.OpExecError("Can't gather the list of OSes")
1739 pol = self._DiagnoseByOS(valid_nodes, node_data)
1741 for os_name, os_data in pol.iteritems():
1743 for field in self.op.output_fields:
1746 elif field == "valid":
1747 val = utils.all([osl and osl[0] for osl in os_data.values()])
1748 elif field == "node_status":
1750 for node_name, nos_list in os_data.iteritems():
1751 val[node_name] = [(v.status, v.path) for v in nos_list]
1753 raise errors.ParameterError(field)
1760 class LURemoveNode(LogicalUnit):
1761 """Logical unit for removing a node.
1764 HPATH = "node-remove"
1765 HTYPE = constants.HTYPE_NODE
1766 _OP_REQP = ["node_name"]
1768 def BuildHooksEnv(self):
1771 This doesn't run on the target node in the pre phase as a failed
1772 node would then be impossible to remove.
1776 "OP_TARGET": self.op.node_name,
1777 "NODE_NAME": self.op.node_name,
1779 all_nodes = self.cfg.GetNodeList()
1780 all_nodes.remove(self.op.node_name)
1781 return env, all_nodes, all_nodes
1783 def CheckPrereq(self):
1784 """Check prerequisites.
1787 - the node exists in the configuration
1788 - it does not have primary or secondary instances
1789 - it's not the master
1791 Any errors are signalled by raising errors.OpPrereqError.
1794 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1796 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1798 instance_list = self.cfg.GetInstanceList()
1800 masternode = self.cfg.GetMasterNode()
1801 if node.name == masternode:
1802 raise errors.OpPrereqError("Node is the master node,"
1803 " you need to failover first.")
1805 for instance_name in instance_list:
1806 instance = self.cfg.GetInstanceInfo(instance_name)
1807 if node.name in instance.all_nodes:
1808 raise errors.OpPrereqError("Instance %s is still running on the node,"
1809 " please remove first." % instance_name)
1810 self.op.node_name = node.name
1813 def Exec(self, feedback_fn):
1814 """Removes the node from the cluster.
1818 logging.info("Stopping the node daemon and removing configs from node %s",
1821 self.context.RemoveNode(node.name)
1823 self.rpc.call_node_leave_cluster(node.name)
1825 # Promote nodes to master candidate as needed
1826 _AdjustCandidatePool(self)
1829 class LUQueryNodes(NoHooksLU):
1830 """Logical unit for querying nodes.
1833 _OP_REQP = ["output_fields", "names", "use_locking"]
1835 _FIELDS_DYNAMIC = utils.FieldSet(
1837 "mtotal", "mnode", "mfree",
1839 "ctotal", "cnodes", "csockets",
1842 _FIELDS_STATIC = utils.FieldSet(
1843 "name", "pinst_cnt", "sinst_cnt",
1844 "pinst_list", "sinst_list",
1845 "pip", "sip", "tags",
1853 def ExpandNames(self):
1854 _CheckOutputFields(static=self._FIELDS_STATIC,
1855 dynamic=self._FIELDS_DYNAMIC,
1856 selected=self.op.output_fields)
1858 self.needed_locks = {}
1859 self.share_locks[locking.LEVEL_NODE] = 1
1862 self.wanted = _GetWantedNodes(self, self.op.names)
1864 self.wanted = locking.ALL_SET
1866 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1867 self.do_locking = self.do_node_query and self.op.use_locking
1869 # if we don't request only static fields, we need to lock the nodes
1870 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1873 def CheckPrereq(self):
1874 """Check prerequisites.
1877 # The validation of the node list is done in the _GetWantedNodes,
1878 # if non empty, and if empty, there's no validation to do
1881 def Exec(self, feedback_fn):
1882 """Computes the list of nodes and their attributes.
1885 all_info = self.cfg.GetAllNodesInfo()
1887 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1888 elif self.wanted != locking.ALL_SET:
1889 nodenames = self.wanted
1890 missing = set(nodenames).difference(all_info.keys())
1892 raise errors.OpExecError(
1893 "Some nodes were removed before retrieving their data: %s" % missing)
1895 nodenames = all_info.keys()
1897 nodenames = utils.NiceSort(nodenames)
1898 nodelist = [all_info[name] for name in nodenames]
1900 # begin data gathering
1902 if self.do_node_query:
1904 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1905 self.cfg.GetHypervisorType())
1906 for name in nodenames:
1907 nodeinfo = node_data[name]
1908 if not nodeinfo.failed and nodeinfo.data:
1909 nodeinfo = nodeinfo.data
1910 fn = utils.TryConvert
1912 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1913 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1914 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1915 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1916 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1917 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1918 "bootid": nodeinfo.get('bootid', None),
1919 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1920 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1923 live_data[name] = {}
1925 live_data = dict.fromkeys(nodenames, {})
1927 node_to_primary = dict([(name, set()) for name in nodenames])
1928 node_to_secondary = dict([(name, set()) for name in nodenames])
1930 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1931 "sinst_cnt", "sinst_list"))
1932 if inst_fields & frozenset(self.op.output_fields):
1933 instancelist = self.cfg.GetInstanceList()
1935 for instance_name in instancelist:
1936 inst = self.cfg.GetInstanceInfo(instance_name)
1937 if inst.primary_node in node_to_primary:
1938 node_to_primary[inst.primary_node].add(inst.name)
1939 for secnode in inst.secondary_nodes:
1940 if secnode in node_to_secondary:
1941 node_to_secondary[secnode].add(inst.name)
1943 master_node = self.cfg.GetMasterNode()
1945 # end data gathering
1948 for node in nodelist:
1950 for field in self.op.output_fields:
1953 elif field == "pinst_list":
1954 val = list(node_to_primary[node.name])
1955 elif field == "sinst_list":
1956 val = list(node_to_secondary[node.name])
1957 elif field == "pinst_cnt":
1958 val = len(node_to_primary[node.name])
1959 elif field == "sinst_cnt":
1960 val = len(node_to_secondary[node.name])
1961 elif field == "pip":
1962 val = node.primary_ip
1963 elif field == "sip":
1964 val = node.secondary_ip
1965 elif field == "tags":
1966 val = list(node.GetTags())
1967 elif field == "serial_no":
1968 val = node.serial_no
1969 elif field == "master_candidate":
1970 val = node.master_candidate
1971 elif field == "master":
1972 val = node.name == master_node
1973 elif field == "offline":
1975 elif field == "drained":
1977 elif self._FIELDS_DYNAMIC.Matches(field):
1978 val = live_data[node.name].get(field, None)
1980 raise errors.ParameterError(field)
1981 node_output.append(val)
1982 output.append(node_output)
1987 class LUQueryNodeVolumes(NoHooksLU):
1988 """Logical unit for getting volumes on node(s).
1991 _OP_REQP = ["nodes", "output_fields"]
1993 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1994 _FIELDS_STATIC = utils.FieldSet("node")
1996 def ExpandNames(self):
1997 _CheckOutputFields(static=self._FIELDS_STATIC,
1998 dynamic=self._FIELDS_DYNAMIC,
1999 selected=self.op.output_fields)
2001 self.needed_locks = {}
2002 self.share_locks[locking.LEVEL_NODE] = 1
2003 if not self.op.nodes:
2004 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2006 self.needed_locks[locking.LEVEL_NODE] = \
2007 _GetWantedNodes(self, self.op.nodes)
2009 def CheckPrereq(self):
2010 """Check prerequisites.
2012 This checks that the fields required are valid output fields.
2015 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2017 def Exec(self, feedback_fn):
2018 """Computes the list of nodes and their attributes.
2021 nodenames = self.nodes
2022 volumes = self.rpc.call_node_volumes(nodenames)
2024 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2025 in self.cfg.GetInstanceList()]
2027 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2030 for node in nodenames:
2031 if node not in volumes or volumes[node].failed or not volumes[node].data:
2034 node_vols = volumes[node].data[:]
2035 node_vols.sort(key=lambda vol: vol['dev'])
2037 for vol in node_vols:
2039 for field in self.op.output_fields:
2042 elif field == "phys":
2046 elif field == "name":
2048 elif field == "size":
2049 val = int(float(vol['size']))
2050 elif field == "instance":
2052 if node not in lv_by_node[inst]:
2054 if vol['name'] in lv_by_node[inst][node]:
2060 raise errors.ParameterError(field)
2061 node_output.append(str(val))
2063 output.append(node_output)
2068 class LUAddNode(LogicalUnit):
2069 """Logical unit for adding node to the cluster.
2073 HTYPE = constants.HTYPE_NODE
2074 _OP_REQP = ["node_name"]
2076 def BuildHooksEnv(self):
2079 This will run on all nodes before, and on all nodes + the new node after.
2083 "OP_TARGET": self.op.node_name,
2084 "NODE_NAME": self.op.node_name,
2085 "NODE_PIP": self.op.primary_ip,
2086 "NODE_SIP": self.op.secondary_ip,
2088 nodes_0 = self.cfg.GetNodeList()
2089 nodes_1 = nodes_0 + [self.op.node_name, ]
2090 return env, nodes_0, nodes_1
2092 def CheckPrereq(self):
2093 """Check prerequisites.
2096 - the new node is not already in the config
2098 - its parameters (single/dual homed) matches the cluster
2100 Any errors are signalled by raising errors.OpPrereqError.
2103 node_name = self.op.node_name
2106 dns_data = utils.HostInfo(node_name)
2108 node = dns_data.name
2109 primary_ip = self.op.primary_ip = dns_data.ip
2110 secondary_ip = getattr(self.op, "secondary_ip", None)
2111 if secondary_ip is None:
2112 secondary_ip = primary_ip
2113 if not utils.IsValidIP(secondary_ip):
2114 raise errors.OpPrereqError("Invalid secondary IP given")
2115 self.op.secondary_ip = secondary_ip
2117 node_list = cfg.GetNodeList()
2118 if not self.op.readd and node in node_list:
2119 raise errors.OpPrereqError("Node %s is already in the configuration" %
2121 elif self.op.readd and node not in node_list:
2122 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2124 for existing_node_name in node_list:
2125 existing_node = cfg.GetNodeInfo(existing_node_name)
2127 if self.op.readd and node == existing_node_name:
2128 if (existing_node.primary_ip != primary_ip or
2129 existing_node.secondary_ip != secondary_ip):
2130 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2131 " address configuration as before")
2134 if (existing_node.primary_ip == primary_ip or
2135 existing_node.secondary_ip == primary_ip or
2136 existing_node.primary_ip == secondary_ip or
2137 existing_node.secondary_ip == secondary_ip):
2138 raise errors.OpPrereqError("New node ip address(es) conflict with"
2139 " existing node %s" % existing_node.name)
2141 # check that the type of the node (single versus dual homed) is the
2142 # same as for the master
2143 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2144 master_singlehomed = myself.secondary_ip == myself.primary_ip
2145 newbie_singlehomed = secondary_ip == primary_ip
2146 if master_singlehomed != newbie_singlehomed:
2147 if master_singlehomed:
2148 raise errors.OpPrereqError("The master has no private ip but the"
2149 " new node has one")
2151 raise errors.OpPrereqError("The master has a private ip but the"
2152 " new node doesn't have one")
2154 # checks reachablity
2155 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2156 raise errors.OpPrereqError("Node not reachable by ping")
2158 if not newbie_singlehomed:
2159 # check reachability from my secondary ip to newbie's secondary ip
2160 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2161 source=myself.secondary_ip):
2162 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2163 " based ping to noded port")
2165 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2166 mc_now, _ = self.cfg.GetMasterCandidateStats()
2167 master_candidate = mc_now < cp_size
2169 self.new_node = objects.Node(name=node,
2170 primary_ip=primary_ip,
2171 secondary_ip=secondary_ip,
2172 master_candidate=master_candidate,
2173 offline=False, drained=False)
2175 def Exec(self, feedback_fn):
2176 """Adds the new node to the cluster.
2179 new_node = self.new_node
2180 node = new_node.name
2182 # check connectivity
2183 result = self.rpc.call_version([node])[node]
2186 if constants.PROTOCOL_VERSION == result.data:
2187 logging.info("Communication to node %s fine, sw version %s match",
2190 raise errors.OpExecError("Version mismatch master version %s,"
2191 " node version %s" %
2192 (constants.PROTOCOL_VERSION, result.data))
2194 raise errors.OpExecError("Cannot get version from the new node")
2197 logging.info("Copy ssh key to node %s", node)
2198 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2200 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2201 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2207 keyarray.append(f.read())
2211 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2213 keyarray[3], keyarray[4], keyarray[5])
2215 msg = result.RemoteFailMsg()
2217 raise errors.OpExecError("Cannot transfer ssh keys to the"
2218 " new node: %s" % msg)
2220 # Add node to our /etc/hosts, and add key to known_hosts
2221 utils.AddHostToEtcHosts(new_node.name)
2223 if new_node.secondary_ip != new_node.primary_ip:
2224 result = self.rpc.call_node_has_ip_address(new_node.name,
2225 new_node.secondary_ip)
2226 if result.failed or not result.data:
2227 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2228 " you gave (%s). Please fix and re-run this"
2229 " command." % new_node.secondary_ip)
2231 node_verify_list = [self.cfg.GetMasterNode()]
2232 node_verify_param = {
2234 # TODO: do a node-net-test as well?
2237 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2238 self.cfg.GetClusterName())
2239 for verifier in node_verify_list:
2240 if result[verifier].failed or not result[verifier].data:
2241 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2242 " for remote verification" % verifier)
2243 if result[verifier].data['nodelist']:
2244 for failed in result[verifier].data['nodelist']:
2245 feedback_fn("ssh/hostname verification failed %s -> %s" %
2246 (verifier, result[verifier].data['nodelist'][failed]))
2247 raise errors.OpExecError("ssh/hostname verification failed.")
2249 # Distribute updated /etc/hosts and known_hosts to all nodes,
2250 # including the node just added
2251 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2252 dist_nodes = self.cfg.GetNodeList()
2253 if not self.op.readd:
2254 dist_nodes.append(node)
2255 if myself.name in dist_nodes:
2256 dist_nodes.remove(myself.name)
2258 logging.debug("Copying hosts and known_hosts to all nodes")
2259 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2260 result = self.rpc.call_upload_file(dist_nodes, fname)
2261 for to_node, to_result in result.iteritems():
2262 if to_result.failed or not to_result.data:
2263 logging.error("Copy of file %s to node %s failed", fname, to_node)
2266 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2267 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2268 to_copy.append(constants.VNC_PASSWORD_FILE)
2270 for fname in to_copy:
2271 result = self.rpc.call_upload_file([node], fname)
2272 if result[node].failed or not result[node]:
2273 logging.error("Could not copy file %s to node %s", fname, node)
2276 self.context.ReaddNode(new_node)
2278 self.context.AddNode(new_node)
2281 class LUSetNodeParams(LogicalUnit):
2282 """Modifies the parameters of a node.
2285 HPATH = "node-modify"
2286 HTYPE = constants.HTYPE_NODE
2287 _OP_REQP = ["node_name"]
2290 def CheckArguments(self):
2291 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2292 if node_name is None:
2293 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2294 self.op.node_name = node_name
2295 _CheckBooleanOpField(self.op, 'master_candidate')
2296 _CheckBooleanOpField(self.op, 'offline')
2297 _CheckBooleanOpField(self.op, 'drained')
2298 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2299 if all_mods.count(None) == 3:
2300 raise errors.OpPrereqError("Please pass at least one modification")
2301 if all_mods.count(True) > 1:
2302 raise errors.OpPrereqError("Can't set the node into more than one"
2303 " state at the same time")
2305 def ExpandNames(self):
2306 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2308 def BuildHooksEnv(self):
2311 This runs on the master node.
2315 "OP_TARGET": self.op.node_name,
2316 "MASTER_CANDIDATE": str(self.op.master_candidate),
2317 "OFFLINE": str(self.op.offline),
2318 "DRAINED": str(self.op.drained),
2320 nl = [self.cfg.GetMasterNode(),
2324 def CheckPrereq(self):
2325 """Check prerequisites.
2327 This only checks the instance list against the existing names.
2330 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2332 if ((self.op.master_candidate == False or self.op.offline == True or
2333 self.op.drained == True) and node.master_candidate):
2334 # we will demote the node from master_candidate
2335 if self.op.node_name == self.cfg.GetMasterNode():
2336 raise errors.OpPrereqError("The master node has to be a"
2337 " master candidate, online and not drained")
2338 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2339 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2340 if num_candidates <= cp_size:
2341 msg = ("Not enough master candidates (desired"
2342 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2344 self.LogWarning(msg)
2346 raise errors.OpPrereqError(msg)
2348 if (self.op.master_candidate == True and
2349 ((node.offline and not self.op.offline == False) or
2350 (node.drained and not self.op.drained == False))):
2351 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2352 " to master_candidate" % node.name)
2356 def Exec(self, feedback_fn):
2365 if self.op.offline is not None:
2366 node.offline = self.op.offline
2367 result.append(("offline", str(self.op.offline)))
2368 if self.op.offline == True:
2369 if node.master_candidate:
2370 node.master_candidate = False
2372 result.append(("master_candidate", "auto-demotion due to offline"))
2374 node.drained = False
2375 result.append(("drained", "clear drained status due to offline"))
2377 if self.op.master_candidate is not None:
2378 node.master_candidate = self.op.master_candidate
2380 result.append(("master_candidate", str(self.op.master_candidate)))
2381 if self.op.master_candidate == False:
2382 rrc = self.rpc.call_node_demote_from_mc(node.name)
2383 msg = rrc.RemoteFailMsg()
2385 self.LogWarning("Node failed to demote itself: %s" % msg)
2387 if self.op.drained is not None:
2388 node.drained = self.op.drained
2389 result.append(("drained", str(self.op.drained)))
2390 if self.op.drained == True:
2391 if node.master_candidate:
2392 node.master_candidate = False
2394 result.append(("master_candidate", "auto-demotion due to drain"))
2396 node.offline = False
2397 result.append(("offline", "clear offline status due to drain"))
2399 # this will trigger configuration file update, if needed
2400 self.cfg.Update(node)
2401 # this will trigger job queue propagation or cleanup
2403 self.context.ReaddNode(node)
2408 class LUQueryClusterInfo(NoHooksLU):
2409 """Query cluster configuration.
2415 def ExpandNames(self):
2416 self.needed_locks = {}
2418 def CheckPrereq(self):
2419 """No prerequsites needed for this LU.
2424 def Exec(self, feedback_fn):
2425 """Return cluster config.
2428 cluster = self.cfg.GetClusterInfo()
2430 "software_version": constants.RELEASE_VERSION,
2431 "protocol_version": constants.PROTOCOL_VERSION,
2432 "config_version": constants.CONFIG_VERSION,
2433 "os_api_version": constants.OS_API_VERSION,
2434 "export_version": constants.EXPORT_VERSION,
2435 "architecture": (platform.architecture()[0], platform.machine()),
2436 "name": cluster.cluster_name,
2437 "master": cluster.master_node,
2438 "default_hypervisor": cluster.default_hypervisor,
2439 "enabled_hypervisors": cluster.enabled_hypervisors,
2440 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2441 for hypervisor in cluster.enabled_hypervisors]),
2442 "beparams": cluster.beparams,
2443 "candidate_pool_size": cluster.candidate_pool_size,
2449 class LUQueryConfigValues(NoHooksLU):
2450 """Return configuration values.
2455 _FIELDS_DYNAMIC = utils.FieldSet()
2456 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2458 def ExpandNames(self):
2459 self.needed_locks = {}
2461 _CheckOutputFields(static=self._FIELDS_STATIC,
2462 dynamic=self._FIELDS_DYNAMIC,
2463 selected=self.op.output_fields)
2465 def CheckPrereq(self):
2466 """No prerequisites.
2471 def Exec(self, feedback_fn):
2472 """Dump a representation of the cluster config to the standard output.
2476 for field in self.op.output_fields:
2477 if field == "cluster_name":
2478 entry = self.cfg.GetClusterName()
2479 elif field == "master_node":
2480 entry = self.cfg.GetMasterNode()
2481 elif field == "drain_flag":
2482 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2484 raise errors.ParameterError(field)
2485 values.append(entry)
2489 class LUActivateInstanceDisks(NoHooksLU):
2490 """Bring up an instance's disks.
2493 _OP_REQP = ["instance_name"]
2496 def ExpandNames(self):
2497 self._ExpandAndLockInstance()
2498 self.needed_locks[locking.LEVEL_NODE] = []
2499 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2501 def DeclareLocks(self, level):
2502 if level == locking.LEVEL_NODE:
2503 self._LockInstancesNodes()
2505 def CheckPrereq(self):
2506 """Check prerequisites.
2508 This checks that the instance is in the cluster.
2511 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512 assert self.instance is not None, \
2513 "Cannot retrieve locked instance %s" % self.op.instance_name
2514 _CheckNodeOnline(self, self.instance.primary_node)
2516 def Exec(self, feedback_fn):
2517 """Activate the disks.
2520 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2522 raise errors.OpExecError("Cannot activate block devices")
2527 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2528 """Prepare the block devices for an instance.
2530 This sets up the block devices on all nodes.
2532 @type lu: L{LogicalUnit}
2533 @param lu: the logical unit on whose behalf we execute
2534 @type instance: L{objects.Instance}
2535 @param instance: the instance for whose disks we assemble
2536 @type ignore_secondaries: boolean
2537 @param ignore_secondaries: if true, errors on secondary nodes
2538 won't result in an error return from the function
2539 @return: False if the operation failed, otherwise a list of
2540 (host, instance_visible_name, node_visible_name)
2541 with the mapping from node devices to instance devices
2546 iname = instance.name
2547 # With the two passes mechanism we try to reduce the window of
2548 # opportunity for the race condition of switching DRBD to primary
2549 # before handshaking occured, but we do not eliminate it
2551 # The proper fix would be to wait (with some limits) until the
2552 # connection has been made and drbd transitions from WFConnection
2553 # into any other network-connected state (Connected, SyncTarget,
2556 # 1st pass, assemble on all nodes in secondary mode
2557 for inst_disk in instance.disks:
2558 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2559 lu.cfg.SetDiskID(node_disk, node)
2560 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2561 msg = result.RemoteFailMsg()
2563 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2564 " (is_primary=False, pass=1): %s",
2565 inst_disk.iv_name, node, msg)
2566 if not ignore_secondaries:
2569 # FIXME: race condition on drbd migration to primary
2571 # 2nd pass, do only the primary node
2572 for inst_disk in instance.disks:
2573 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2574 if node != instance.primary_node:
2576 lu.cfg.SetDiskID(node_disk, node)
2577 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2578 msg = result.RemoteFailMsg()
2580 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2581 " (is_primary=True, pass=2): %s",
2582 inst_disk.iv_name, node, msg)
2584 device_info.append((instance.primary_node, inst_disk.iv_name,
2587 # leave the disks configured for the primary node
2588 # this is a workaround that would be fixed better by
2589 # improving the logical/physical id handling
2590 for disk in instance.disks:
2591 lu.cfg.SetDiskID(disk, instance.primary_node)
2593 return disks_ok, device_info
2596 def _StartInstanceDisks(lu, instance, force):
2597 """Start the disks of an instance.
2600 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2601 ignore_secondaries=force)
2603 _ShutdownInstanceDisks(lu, instance)
2604 if force is not None and not force:
2605 lu.proc.LogWarning("", hint="If the message above refers to a"
2607 " you can retry the operation using '--force'.")
2608 raise errors.OpExecError("Disk consistency error")
2611 class LUDeactivateInstanceDisks(NoHooksLU):
2612 """Shutdown an instance's disks.
2615 _OP_REQP = ["instance_name"]
2618 def ExpandNames(self):
2619 self._ExpandAndLockInstance()
2620 self.needed_locks[locking.LEVEL_NODE] = []
2621 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2623 def DeclareLocks(self, level):
2624 if level == locking.LEVEL_NODE:
2625 self._LockInstancesNodes()
2627 def CheckPrereq(self):
2628 """Check prerequisites.
2630 This checks that the instance is in the cluster.
2633 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2634 assert self.instance is not None, \
2635 "Cannot retrieve locked instance %s" % self.op.instance_name
2637 def Exec(self, feedback_fn):
2638 """Deactivate the disks
2641 instance = self.instance
2642 _SafeShutdownInstanceDisks(self, instance)
2645 def _SafeShutdownInstanceDisks(lu, instance):
2646 """Shutdown block devices of an instance.
2648 This function checks if an instance is running, before calling
2649 _ShutdownInstanceDisks.
2652 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2653 [instance.hypervisor])
2654 ins_l = ins_l[instance.primary_node]
2655 if ins_l.failed or not isinstance(ins_l.data, list):
2656 raise errors.OpExecError("Can't contact node '%s'" %
2657 instance.primary_node)
2659 if instance.name in ins_l.data:
2660 raise errors.OpExecError("Instance is running, can't shutdown"
2663 _ShutdownInstanceDisks(lu, instance)
2666 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2667 """Shutdown block devices of an instance.
2669 This does the shutdown on all nodes of the instance.
2671 If the ignore_primary is false, errors on the primary node are
2676 for disk in instance.disks:
2677 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2678 lu.cfg.SetDiskID(top_disk, node)
2679 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2680 msg = result.RemoteFailMsg()
2682 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2683 disk.iv_name, node, msg)
2684 if not ignore_primary or node != instance.primary_node:
2689 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2690 """Checks if a node has enough free memory.
2692 This function check if a given node has the needed amount of free
2693 memory. In case the node has less memory or we cannot get the
2694 information from the node, this function raise an OpPrereqError
2697 @type lu: C{LogicalUnit}
2698 @param lu: a logical unit from which we get configuration data
2700 @param node: the node to check
2701 @type reason: C{str}
2702 @param reason: string to use in the error message
2703 @type requested: C{int}
2704 @param requested: the amount of memory in MiB to check for
2705 @type hypervisor_name: C{str}
2706 @param hypervisor_name: the hypervisor to ask for memory stats
2707 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2708 we cannot check the node
2711 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2712 nodeinfo[node].Raise()
2713 free_mem = nodeinfo[node].data.get('memory_free')
2714 if not isinstance(free_mem, int):
2715 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2716 " was '%s'" % (node, free_mem))
2717 if requested > free_mem:
2718 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2719 " needed %s MiB, available %s MiB" %
2720 (node, reason, requested, free_mem))
2723 class LUStartupInstance(LogicalUnit):
2724 """Starts an instance.
2727 HPATH = "instance-start"
2728 HTYPE = constants.HTYPE_INSTANCE
2729 _OP_REQP = ["instance_name", "force"]
2732 def ExpandNames(self):
2733 self._ExpandAndLockInstance()
2735 def BuildHooksEnv(self):
2738 This runs on master, primary and secondary nodes of the instance.
2742 "FORCE": self.op.force,
2744 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2745 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2748 def CheckPrereq(self):
2749 """Check prerequisites.
2751 This checks that the instance is in the cluster.
2754 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2755 assert self.instance is not None, \
2756 "Cannot retrieve locked instance %s" % self.op.instance_name
2758 _CheckNodeOnline(self, instance.primary_node)
2760 bep = self.cfg.GetClusterInfo().FillBE(instance)
2761 # check bridges existance
2762 _CheckInstanceBridgesExist(self, instance)
2764 _CheckNodeFreeMemory(self, instance.primary_node,
2765 "starting instance %s" % instance.name,
2766 bep[constants.BE_MEMORY], instance.hypervisor)
2768 def Exec(self, feedback_fn):
2769 """Start the instance.
2772 instance = self.instance
2773 force = self.op.force
2775 self.cfg.MarkInstanceUp(instance.name)
2777 node_current = instance.primary_node
2779 _StartInstanceDisks(self, instance, force)
2781 result = self.rpc.call_instance_start(node_current, instance)
2782 msg = result.RemoteFailMsg()
2784 _ShutdownInstanceDisks(self, instance)
2785 raise errors.OpExecError("Could not start instance: %s" % msg)
2788 class LURebootInstance(LogicalUnit):
2789 """Reboot an instance.
2792 HPATH = "instance-reboot"
2793 HTYPE = constants.HTYPE_INSTANCE
2794 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2797 def ExpandNames(self):
2798 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2799 constants.INSTANCE_REBOOT_HARD,
2800 constants.INSTANCE_REBOOT_FULL]:
2801 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2802 (constants.INSTANCE_REBOOT_SOFT,
2803 constants.INSTANCE_REBOOT_HARD,
2804 constants.INSTANCE_REBOOT_FULL))
2805 self._ExpandAndLockInstance()
2807 def BuildHooksEnv(self):
2810 This runs on master, primary and secondary nodes of the instance.
2814 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2815 "REBOOT_TYPE": self.op.reboot_type,
2817 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2818 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2821 def CheckPrereq(self):
2822 """Check prerequisites.
2824 This checks that the instance is in the cluster.
2827 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2828 assert self.instance is not None, \
2829 "Cannot retrieve locked instance %s" % self.op.instance_name
2831 _CheckNodeOnline(self, instance.primary_node)
2833 # check bridges existance
2834 _CheckInstanceBridgesExist(self, instance)
2836 def Exec(self, feedback_fn):
2837 """Reboot the instance.
2840 instance = self.instance
2841 ignore_secondaries = self.op.ignore_secondaries
2842 reboot_type = self.op.reboot_type
2844 node_current = instance.primary_node
2846 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2847 constants.INSTANCE_REBOOT_HARD]:
2848 for disk in instance.disks:
2849 self.cfg.SetDiskID(disk, node_current)
2850 result = self.rpc.call_instance_reboot(node_current, instance,
2852 msg = result.RemoteFailMsg()
2854 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2856 result = self.rpc.call_instance_shutdown(node_current, instance)
2857 msg = result.RemoteFailMsg()
2859 raise errors.OpExecError("Could not shutdown instance for"
2860 " full reboot: %s" % msg)
2861 _ShutdownInstanceDisks(self, instance)
2862 _StartInstanceDisks(self, instance, ignore_secondaries)
2863 result = self.rpc.call_instance_start(node_current, instance)
2864 msg = result.RemoteFailMsg()
2866 _ShutdownInstanceDisks(self, instance)
2867 raise errors.OpExecError("Could not start instance for"
2868 " full reboot: %s" % msg)
2870 self.cfg.MarkInstanceUp(instance.name)
2873 class LUShutdownInstance(LogicalUnit):
2874 """Shutdown an instance.
2877 HPATH = "instance-stop"
2878 HTYPE = constants.HTYPE_INSTANCE
2879 _OP_REQP = ["instance_name"]
2882 def ExpandNames(self):
2883 self._ExpandAndLockInstance()
2885 def BuildHooksEnv(self):
2888 This runs on master, primary and secondary nodes of the instance.
2891 env = _BuildInstanceHookEnvByObject(self, self.instance)
2892 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2895 def CheckPrereq(self):
2896 """Check prerequisites.
2898 This checks that the instance is in the cluster.
2901 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2902 assert self.instance is not None, \
2903 "Cannot retrieve locked instance %s" % self.op.instance_name
2904 _CheckNodeOnline(self, self.instance.primary_node)
2906 def Exec(self, feedback_fn):
2907 """Shutdown the instance.
2910 instance = self.instance
2911 node_current = instance.primary_node
2912 self.cfg.MarkInstanceDown(instance.name)
2913 result = self.rpc.call_instance_shutdown(node_current, instance)
2914 msg = result.RemoteFailMsg()
2916 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2918 _ShutdownInstanceDisks(self, instance)
2921 class LUReinstallInstance(LogicalUnit):
2922 """Reinstall an instance.
2925 HPATH = "instance-reinstall"
2926 HTYPE = constants.HTYPE_INSTANCE
2927 _OP_REQP = ["instance_name"]
2930 def ExpandNames(self):
2931 self._ExpandAndLockInstance()
2933 def BuildHooksEnv(self):
2936 This runs on master, primary and secondary nodes of the instance.
2939 env = _BuildInstanceHookEnvByObject(self, self.instance)
2940 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2943 def CheckPrereq(self):
2944 """Check prerequisites.
2946 This checks that the instance is in the cluster and is not running.
2949 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2950 assert instance is not None, \
2951 "Cannot retrieve locked instance %s" % self.op.instance_name
2952 _CheckNodeOnline(self, instance.primary_node)
2954 if instance.disk_template == constants.DT_DISKLESS:
2955 raise errors.OpPrereqError("Instance '%s' has no disks" %
2956 self.op.instance_name)
2957 if instance.admin_up:
2958 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2959 self.op.instance_name)
2960 remote_info = self.rpc.call_instance_info(instance.primary_node,
2962 instance.hypervisor)
2963 if remote_info.failed or remote_info.data:
2964 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2965 (self.op.instance_name,
2966 instance.primary_node))
2968 self.op.os_type = getattr(self.op, "os_type", None)
2969 if self.op.os_type is not None:
2971 pnode = self.cfg.GetNodeInfo(
2972 self.cfg.ExpandNodeName(instance.primary_node))
2974 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2976 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2978 if not isinstance(result.data, objects.OS):
2979 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2980 " primary node" % self.op.os_type)
2982 self.instance = instance
2984 def Exec(self, feedback_fn):
2985 """Reinstall the instance.
2988 inst = self.instance
2990 if self.op.os_type is not None:
2991 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2992 inst.os = self.op.os_type
2993 self.cfg.Update(inst)
2995 _StartInstanceDisks(self, inst, None)
2997 feedback_fn("Running the instance OS create scripts...")
2998 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
2999 msg = result.RemoteFailMsg()
3001 raise errors.OpExecError("Could not install OS for instance %s"
3003 (inst.name, inst.primary_node, msg))
3005 _ShutdownInstanceDisks(self, inst)
3008 class LURenameInstance(LogicalUnit):
3009 """Rename an instance.
3012 HPATH = "instance-rename"
3013 HTYPE = constants.HTYPE_INSTANCE
3014 _OP_REQP = ["instance_name", "new_name"]
3016 def BuildHooksEnv(self):
3019 This runs on master, primary and secondary nodes of the instance.
3022 env = _BuildInstanceHookEnvByObject(self, self.instance)
3023 env["INSTANCE_NEW_NAME"] = self.op.new_name
3024 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3027 def CheckPrereq(self):
3028 """Check prerequisites.
3030 This checks that the instance is in the cluster and is not running.
3033 instance = self.cfg.GetInstanceInfo(
3034 self.cfg.ExpandInstanceName(self.op.instance_name))
3035 if instance is None:
3036 raise errors.OpPrereqError("Instance '%s' not known" %
3037 self.op.instance_name)
3038 _CheckNodeOnline(self, instance.primary_node)
3040 if instance.admin_up:
3041 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3042 self.op.instance_name)
3043 remote_info = self.rpc.call_instance_info(instance.primary_node,
3045 instance.hypervisor)
3047 if remote_info.data:
3048 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3049 (self.op.instance_name,
3050 instance.primary_node))
3051 self.instance = instance
3053 # new name verification
3054 name_info = utils.HostInfo(self.op.new_name)
3056 self.op.new_name = new_name = name_info.name
3057 instance_list = self.cfg.GetInstanceList()
3058 if new_name in instance_list:
3059 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3062 if not getattr(self.op, "ignore_ip", False):
3063 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3064 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3065 (name_info.ip, new_name))
3068 def Exec(self, feedback_fn):
3069 """Reinstall the instance.
3072 inst = self.instance
3073 old_name = inst.name
3075 if inst.disk_template == constants.DT_FILE:
3076 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3078 self.cfg.RenameInstance(inst.name, self.op.new_name)
3079 # Change the instance lock. This is definitely safe while we hold the BGL
3080 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3081 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3083 # re-read the instance from the configuration after rename
3084 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3086 if inst.disk_template == constants.DT_FILE:
3087 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3088 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3089 old_file_storage_dir,
3090 new_file_storage_dir)
3093 raise errors.OpExecError("Could not connect to node '%s' to rename"
3094 " directory '%s' to '%s' (but the instance"
3095 " has been renamed in Ganeti)" % (
3096 inst.primary_node, old_file_storage_dir,
3097 new_file_storage_dir))
3099 if not result.data[0]:
3100 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3101 " (but the instance has been renamed in"
3102 " Ganeti)" % (old_file_storage_dir,
3103 new_file_storage_dir))
3105 _StartInstanceDisks(self, inst, None)
3107 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3109 msg = result.RemoteFailMsg()
3111 msg = ("Could not run OS rename script for instance %s on node %s"
3112 " (but the instance has been renamed in Ganeti): %s" %
3113 (inst.name, inst.primary_node, msg))
3114 self.proc.LogWarning(msg)
3116 _ShutdownInstanceDisks(self, inst)
3119 class LURemoveInstance(LogicalUnit):
3120 """Remove an instance.
3123 HPATH = "instance-remove"
3124 HTYPE = constants.HTYPE_INSTANCE
3125 _OP_REQP = ["instance_name", "ignore_failures"]
3128 def ExpandNames(self):
3129 self._ExpandAndLockInstance()
3130 self.needed_locks[locking.LEVEL_NODE] = []
3131 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3133 def DeclareLocks(self, level):
3134 if level == locking.LEVEL_NODE:
3135 self._LockInstancesNodes()
3137 def BuildHooksEnv(self):
3140 This runs on master, primary and secondary nodes of the instance.
3143 env = _BuildInstanceHookEnvByObject(self, self.instance)
3144 nl = [self.cfg.GetMasterNode()]
3147 def CheckPrereq(self):
3148 """Check prerequisites.
3150 This checks that the instance is in the cluster.
3153 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3154 assert self.instance is not None, \
3155 "Cannot retrieve locked instance %s" % self.op.instance_name
3157 def Exec(self, feedback_fn):
3158 """Remove the instance.
3161 instance = self.instance
3162 logging.info("Shutting down instance %s on node %s",
3163 instance.name, instance.primary_node)
3165 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3166 msg = result.RemoteFailMsg()
3168 if self.op.ignore_failures:
3169 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3171 raise errors.OpExecError("Could not shutdown instance %s on"
3173 (instance.name, instance.primary_node, msg))
3175 logging.info("Removing block devices for instance %s", instance.name)
3177 if not _RemoveDisks(self, instance):
3178 if self.op.ignore_failures:
3179 feedback_fn("Warning: can't remove instance's disks")
3181 raise errors.OpExecError("Can't remove instance's disks")
3183 logging.info("Removing instance %s out of cluster config", instance.name)
3185 self.cfg.RemoveInstance(instance.name)
3186 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3189 class LUQueryInstances(NoHooksLU):
3190 """Logical unit for querying instances.
3193 _OP_REQP = ["output_fields", "names", "use_locking"]
3195 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3197 "disk_template", "ip", "mac", "bridge",
3198 "sda_size", "sdb_size", "vcpus", "tags",
3199 "network_port", "beparams",
3200 r"(disk)\.(size)/([0-9]+)",
3201 r"(disk)\.(sizes)", "disk_usage",
3202 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3203 r"(nic)\.(macs|ips|bridges)",
3204 r"(disk|nic)\.(count)",
3205 "serial_no", "hypervisor", "hvparams",] +
3207 for name in constants.HVS_PARAMETERS] +
3209 for name in constants.BES_PARAMETERS])
3210 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3213 def ExpandNames(self):
3214 _CheckOutputFields(static=self._FIELDS_STATIC,
3215 dynamic=self._FIELDS_DYNAMIC,
3216 selected=self.op.output_fields)
3218 self.needed_locks = {}
3219 self.share_locks[locking.LEVEL_INSTANCE] = 1
3220 self.share_locks[locking.LEVEL_NODE] = 1
3223 self.wanted = _GetWantedInstances(self, self.op.names)
3225 self.wanted = locking.ALL_SET
3227 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3228 self.do_locking = self.do_node_query and self.op.use_locking
3230 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3231 self.needed_locks[locking.LEVEL_NODE] = []
3232 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3234 def DeclareLocks(self, level):
3235 if level == locking.LEVEL_NODE and self.do_locking:
3236 self._LockInstancesNodes()
3238 def CheckPrereq(self):
3239 """Check prerequisites.
3244 def Exec(self, feedback_fn):
3245 """Computes the list of nodes and their attributes.
3248 all_info = self.cfg.GetAllInstancesInfo()
3249 if self.wanted == locking.ALL_SET:
3250 # caller didn't specify instance names, so ordering is not important
3252 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3254 instance_names = all_info.keys()
3255 instance_names = utils.NiceSort(instance_names)
3257 # caller did specify names, so we must keep the ordering
3259 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3261 tgt_set = all_info.keys()
3262 missing = set(self.wanted).difference(tgt_set)
3264 raise errors.OpExecError("Some instances were removed before"
3265 " retrieving their data: %s" % missing)
3266 instance_names = self.wanted
3268 instance_list = [all_info[iname] for iname in instance_names]
3270 # begin data gathering
3272 nodes = frozenset([inst.primary_node for inst in instance_list])
3273 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3277 if self.do_node_query:
3279 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3281 result = node_data[name]
3283 # offline nodes will be in both lists
3284 off_nodes.append(name)
3286 bad_nodes.append(name)
3289 live_data.update(result.data)
3290 # else no instance is alive
3292 live_data = dict([(name, {}) for name in instance_names])
3294 # end data gathering
3299 for instance in instance_list:
3301 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3302 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3303 for field in self.op.output_fields:
3304 st_match = self._FIELDS_STATIC.Matches(field)
3309 elif field == "pnode":
3310 val = instance.primary_node
3311 elif field == "snodes":
3312 val = list(instance.secondary_nodes)
3313 elif field == "admin_state":
3314 val = instance.admin_up
3315 elif field == "oper_state":
3316 if instance.primary_node in bad_nodes:
3319 val = bool(live_data.get(instance.name))
3320 elif field == "status":
3321 if instance.primary_node in off_nodes:
3322 val = "ERROR_nodeoffline"
3323 elif instance.primary_node in bad_nodes:
3324 val = "ERROR_nodedown"
3326 running = bool(live_data.get(instance.name))
3328 if instance.admin_up:
3333 if instance.admin_up:
3337 elif field == "oper_ram":
3338 if instance.primary_node in bad_nodes:
3340 elif instance.name in live_data:
3341 val = live_data[instance.name].get("memory", "?")
3344 elif field == "disk_template":
3345 val = instance.disk_template
3347 val = instance.nics[0].ip
3348 elif field == "bridge":
3349 val = instance.nics[0].bridge
3350 elif field == "mac":
3351 val = instance.nics[0].mac
3352 elif field == "sda_size" or field == "sdb_size":
3353 idx = ord(field[2]) - ord('a')
3355 val = instance.FindDisk(idx).size
3356 except errors.OpPrereqError:
3358 elif field == "disk_usage": # total disk usage per node
3359 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3360 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3361 elif field == "tags":
3362 val = list(instance.GetTags())
3363 elif field == "serial_no":
3364 val = instance.serial_no
3365 elif field == "network_port":
3366 val = instance.network_port
3367 elif field == "hypervisor":
3368 val = instance.hypervisor
3369 elif field == "hvparams":
3371 elif (field.startswith(HVPREFIX) and
3372 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3373 val = i_hv.get(field[len(HVPREFIX):], None)
3374 elif field == "beparams":
3376 elif (field.startswith(BEPREFIX) and
3377 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3378 val = i_be.get(field[len(BEPREFIX):], None)
3379 elif st_match and st_match.groups():
3380 # matches a variable list
3381 st_groups = st_match.groups()
3382 if st_groups and st_groups[0] == "disk":
3383 if st_groups[1] == "count":
3384 val = len(instance.disks)
3385 elif st_groups[1] == "sizes":
3386 val = [disk.size for disk in instance.disks]
3387 elif st_groups[1] == "size":
3389 val = instance.FindDisk(st_groups[2]).size
3390 except errors.OpPrereqError:
3393 assert False, "Unhandled disk parameter"
3394 elif st_groups[0] == "nic":
3395 if st_groups[1] == "count":
3396 val = len(instance.nics)
3397 elif st_groups[1] == "macs":
3398 val = [nic.mac for nic in instance.nics]
3399 elif st_groups[1] == "ips":
3400 val = [nic.ip for nic in instance.nics]
3401 elif st_groups[1] == "bridges":
3402 val = [nic.bridge for nic in instance.nics]
3405 nic_idx = int(st_groups[2])
3406 if nic_idx >= len(instance.nics):
3409 if st_groups[1] == "mac":
3410 val = instance.nics[nic_idx].mac
3411 elif st_groups[1] == "ip":
3412 val = instance.nics[nic_idx].ip
3413 elif st_groups[1] == "bridge":
3414 val = instance.nics[nic_idx].bridge
3416 assert False, "Unhandled NIC parameter"
3418 assert False, "Unhandled variable parameter"
3420 raise errors.ParameterError(field)
3427 class LUFailoverInstance(LogicalUnit):
3428 """Failover an instance.
3431 HPATH = "instance-failover"
3432 HTYPE = constants.HTYPE_INSTANCE
3433 _OP_REQP = ["instance_name", "ignore_consistency"]
3436 def ExpandNames(self):
3437 self._ExpandAndLockInstance()
3438 self.needed_locks[locking.LEVEL_NODE] = []
3439 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3441 def DeclareLocks(self, level):
3442 if level == locking.LEVEL_NODE:
3443 self._LockInstancesNodes()
3445 def BuildHooksEnv(self):
3448 This runs on master, primary and secondary nodes of the instance.
3452 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3454 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3455 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3458 def CheckPrereq(self):
3459 """Check prerequisites.
3461 This checks that the instance is in the cluster.
3464 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3465 assert self.instance is not None, \
3466 "Cannot retrieve locked instance %s" % self.op.instance_name
3468 bep = self.cfg.GetClusterInfo().FillBE(instance)
3469 if instance.disk_template not in constants.DTS_NET_MIRROR:
3470 raise errors.OpPrereqError("Instance's disk layout is not"
3471 " network mirrored, cannot failover.")
3473 secondary_nodes = instance.secondary_nodes
3474 if not secondary_nodes:
3475 raise errors.ProgrammerError("no secondary node but using "
3476 "a mirrored disk template")
3478 target_node = secondary_nodes[0]
3479 _CheckNodeOnline(self, target_node)
3480 _CheckNodeNotDrained(self, target_node)
3481 # check memory requirements on the secondary node
3482 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3483 instance.name, bep[constants.BE_MEMORY],
3484 instance.hypervisor)
3486 # check bridge existance
3487 brlist = [nic.bridge for nic in instance.nics]
3488 result = self.rpc.call_bridges_exist(target_node, brlist)
3491 raise errors.OpPrereqError("One or more target bridges %s does not"
3492 " exist on destination node '%s'" %
3493 (brlist, target_node))
3495 def Exec(self, feedback_fn):
3496 """Failover an instance.
3498 The failover is done by shutting it down on its present node and
3499 starting it on the secondary.
3502 instance = self.instance
3504 source_node = instance.primary_node
3505 target_node = instance.secondary_nodes[0]
3507 feedback_fn("* checking disk consistency between source and target")
3508 for dev in instance.disks:
3509 # for drbd, these are drbd over lvm
3510 if not _CheckDiskConsistency(self, dev, target_node, False):
3511 if instance.admin_up and not self.op.ignore_consistency:
3512 raise errors.OpExecError("Disk %s is degraded on target node,"
3513 " aborting failover." % dev.iv_name)
3515 feedback_fn("* shutting down instance on source node")
3516 logging.info("Shutting down instance %s on node %s",
3517 instance.name, source_node)
3519 result = self.rpc.call_instance_shutdown(source_node, instance)
3520 msg = result.RemoteFailMsg()
3522 if self.op.ignore_consistency:
3523 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3524 " Proceeding anyway. Please make sure node"
3525 " %s is down. Error details: %s",
3526 instance.name, source_node, source_node, msg)
3528 raise errors.OpExecError("Could not shutdown instance %s on"
3530 (instance.name, source_node, msg))
3532 feedback_fn("* deactivating the instance's disks on source node")
3533 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3534 raise errors.OpExecError("Can't shut down the instance's disks.")
3536 instance.primary_node = target_node
3537 # distribute new instance config to the other nodes
3538 self.cfg.Update(instance)
3540 # Only start the instance if it's marked as up
3541 if instance.admin_up:
3542 feedback_fn("* activating the instance's disks on target node")
3543 logging.info("Starting instance %s on node %s",
3544 instance.name, target_node)
3546 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3547 ignore_secondaries=True)
3549 _ShutdownInstanceDisks(self, instance)
3550 raise errors.OpExecError("Can't activate the instance's disks")
3552 feedback_fn("* starting the instance on the target node")
3553 result = self.rpc.call_instance_start(target_node, instance)
3554 msg = result.RemoteFailMsg()
3556 _ShutdownInstanceDisks(self, instance)
3557 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3558 (instance.name, target_node, msg))
3561 class LUMigrateInstance(LogicalUnit):
3562 """Migrate an instance.
3564 This is migration without shutting down, compared to the failover,
3565 which is done with shutdown.
3568 HPATH = "instance-migrate"
3569 HTYPE = constants.HTYPE_INSTANCE
3570 _OP_REQP = ["instance_name", "live", "cleanup"]
3574 def ExpandNames(self):
3575 self._ExpandAndLockInstance()
3576 self.needed_locks[locking.LEVEL_NODE] = []
3577 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3579 def DeclareLocks(self, level):
3580 if level == locking.LEVEL_NODE:
3581 self._LockInstancesNodes()
3583 def BuildHooksEnv(self):
3586 This runs on master, primary and secondary nodes of the instance.
3589 env = _BuildInstanceHookEnvByObject(self, self.instance)
3590 env["MIGRATE_LIVE"] = self.op.live
3591 env["MIGRATE_CLEANUP"] = self.op.cleanup
3592 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3595 def CheckPrereq(self):
3596 """Check prerequisites.
3598 This checks that the instance is in the cluster.
3601 instance = self.cfg.GetInstanceInfo(
3602 self.cfg.ExpandInstanceName(self.op.instance_name))
3603 if instance is None:
3604 raise errors.OpPrereqError("Instance '%s' not known" %
3605 self.op.instance_name)
3607 if instance.disk_template != constants.DT_DRBD8:
3608 raise errors.OpPrereqError("Instance's disk layout is not"
3609 " drbd8, cannot migrate.")
3611 secondary_nodes = instance.secondary_nodes
3612 if not secondary_nodes:
3613 raise errors.ConfigurationError("No secondary node but using"
3614 " drbd8 disk template")
3616 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3618 target_node = secondary_nodes[0]
3619 # check memory requirements on the secondary node
3620 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3621 instance.name, i_be[constants.BE_MEMORY],
3622 instance.hypervisor)
3624 # check bridge existance
3625 brlist = [nic.bridge for nic in instance.nics]
3626 result = self.rpc.call_bridges_exist(target_node, brlist)
3627 if result.failed or not result.data:
3628 raise errors.OpPrereqError("One or more target bridges %s does not"
3629 " exist on destination node '%s'" %
3630 (brlist, target_node))
3632 if not self.op.cleanup:
3633 _CheckNodeNotDrained(self, target_node)
3634 result = self.rpc.call_instance_migratable(instance.primary_node,
3636 msg = result.RemoteFailMsg()
3638 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3641 self.instance = instance
3643 def _WaitUntilSync(self):
3644 """Poll with custom rpc for disk sync.
3646 This uses our own step-based rpc call.
3649 self.feedback_fn("* wait until resync is done")
3653 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3655 self.instance.disks)
3657 for node, nres in result.items():
3658 msg = nres.RemoteFailMsg()
3660 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3662 node_done, node_percent = nres.payload
3663 all_done = all_done and node_done
3664 if node_percent is not None:
3665 min_percent = min(min_percent, node_percent)
3667 if min_percent < 100:
3668 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3671 def _EnsureSecondary(self, node):
3672 """Demote a node to secondary.
3675 self.feedback_fn("* switching node %s to secondary mode" % node)
3677 for dev in self.instance.disks:
3678 self.cfg.SetDiskID(dev, node)
3680 result = self.rpc.call_blockdev_close(node, self.instance.name,
3681 self.instance.disks)
3682 msg = result.RemoteFailMsg()
3684 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3685 " error %s" % (node, msg))
3687 def _GoStandalone(self):
3688 """Disconnect from the network.
3691 self.feedback_fn("* changing into standalone mode")
3692 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3693 self.instance.disks)
3694 for node, nres in result.items():
3695 msg = nres.RemoteFailMsg()
3697 raise errors.OpExecError("Cannot disconnect disks node %s,"
3698 " error %s" % (node, msg))
3700 def _GoReconnect(self, multimaster):
3701 """Reconnect to the network.
3707 msg = "single-master"
3708 self.feedback_fn("* changing disks into %s mode" % msg)
3709 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3710 self.instance.disks,
3711 self.instance.name, multimaster)
3712 for node, nres in result.items():
3713 msg = nres.RemoteFailMsg()
3715 raise errors.OpExecError("Cannot change disks config on node %s,"
3716 " error: %s" % (node, msg))
3718 def _ExecCleanup(self):
3719 """Try to cleanup after a failed migration.
3721 The cleanup is done by:
3722 - check that the instance is running only on one node
3723 (and update the config if needed)
3724 - change disks on its secondary node to secondary
3725 - wait until disks are fully synchronized
3726 - disconnect from the network
3727 - change disks into single-master mode
3728 - wait again until disks are fully synchronized
3731 instance = self.instance
3732 target_node = self.target_node
3733 source_node = self.source_node
3735 # check running on only one node
3736 self.feedback_fn("* checking where the instance actually runs"
3737 " (if this hangs, the hypervisor might be in"
3739 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3740 for node, result in ins_l.items():
3742 if not isinstance(result.data, list):
3743 raise errors.OpExecError("Can't contact node '%s'" % node)
3745 runningon_source = instance.name in ins_l[source_node].data
3746 runningon_target = instance.name in ins_l[target_node].data
3748 if runningon_source and runningon_target:
3749 raise errors.OpExecError("Instance seems to be running on two nodes,"
3750 " or the hypervisor is confused. You will have"
3751 " to ensure manually that it runs only on one"
3752 " and restart this operation.")
3754 if not (runningon_source or runningon_target):
3755 raise errors.OpExecError("Instance does not seem to be running at all."
3756 " In this case, it's safer to repair by"
3757 " running 'gnt-instance stop' to ensure disk"
3758 " shutdown, and then restarting it.")
3760 if runningon_target:
3761 # the migration has actually succeeded, we need to update the config
3762 self.feedback_fn("* instance running on secondary node (%s),"
3763 " updating config" % target_node)
3764 instance.primary_node = target_node
3765 self.cfg.Update(instance)
3766 demoted_node = source_node
3768 self.feedback_fn("* instance confirmed to be running on its"
3769 " primary node (%s)" % source_node)
3770 demoted_node = target_node
3772 self._EnsureSecondary(demoted_node)
3774 self._WaitUntilSync()
3775 except errors.OpExecError:
3776 # we ignore here errors, since if the device is standalone, it
3777 # won't be able to sync
3779 self._GoStandalone()
3780 self._GoReconnect(False)
3781 self._WaitUntilSync()
3783 self.feedback_fn("* done")
3785 def _RevertDiskStatus(self):
3786 """Try to revert the disk status after a failed migration.
3789 target_node = self.target_node
3791 self._EnsureSecondary(target_node)
3792 self._GoStandalone()
3793 self._GoReconnect(False)
3794 self._WaitUntilSync()
3795 except errors.OpExecError, err:
3796 self.LogWarning("Migration failed and I can't reconnect the"
3797 " drives: error '%s'\n"
3798 "Please look and recover the instance status" %
3801 def _AbortMigration(self):
3802 """Call the hypervisor code to abort a started migration.
3805 instance = self.instance
3806 target_node = self.target_node
3807 migration_info = self.migration_info
3809 abort_result = self.rpc.call_finalize_migration(target_node,
3813 abort_msg = abort_result.RemoteFailMsg()
3815 logging.error("Aborting migration failed on target node %s: %s" %
3816 (target_node, abort_msg))
3817 # Don't raise an exception here, as we stil have to try to revert the
3818 # disk status, even if this step failed.
3820 def _ExecMigration(self):
3821 """Migrate an instance.
3823 The migrate is done by:
3824 - change the disks into dual-master mode
3825 - wait until disks are fully synchronized again
3826 - migrate the instance
3827 - change disks on the new secondary node (the old primary) to secondary
3828 - wait until disks are fully synchronized
3829 - change disks into single-master mode
3832 instance = self.instance
3833 target_node = self.target_node
3834 source_node = self.source_node
3836 self.feedback_fn("* checking disk consistency between source and target")
3837 for dev in instance.disks:
3838 if not _CheckDiskConsistency(self, dev, target_node, False):
3839 raise errors.OpExecError("Disk %s is degraded or not fully"
3840 " synchronized on target node,"
3841 " aborting migrate." % dev.iv_name)
3843 # First get the migration information from the remote node
3844 result = self.rpc.call_migration_info(source_node, instance)
3845 msg = result.RemoteFailMsg()
3847 log_err = ("Failed fetching source migration information from %s: %s" %
3849 logging.error(log_err)
3850 raise errors.OpExecError(log_err)
3852 self.migration_info = migration_info = result.payload
3854 # Then switch the disks to master/master mode
3855 self._EnsureSecondary(target_node)
3856 self._GoStandalone()
3857 self._GoReconnect(True)
3858 self._WaitUntilSync()
3860 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3861 result = self.rpc.call_accept_instance(target_node,
3864 self.nodes_ip[target_node])
3866 msg = result.RemoteFailMsg()
3868 logging.error("Instance pre-migration failed, trying to revert"
3869 " disk status: %s", msg)
3870 self._AbortMigration()
3871 self._RevertDiskStatus()
3872 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3873 (instance.name, msg))
3875 self.feedback_fn("* migrating instance to %s" % target_node)
3877 result = self.rpc.call_instance_migrate(source_node, instance,
3878 self.nodes_ip[target_node],
3880 msg = result.RemoteFailMsg()
3882 logging.error("Instance migration failed, trying to revert"
3883 " disk status: %s", msg)
3884 self._AbortMigration()
3885 self._RevertDiskStatus()
3886 raise errors.OpExecError("Could not migrate instance %s: %s" %
3887 (instance.name, msg))
3890 instance.primary_node = target_node
3891 # distribute new instance config to the other nodes
3892 self.cfg.Update(instance)
3894 result = self.rpc.call_finalize_migration(target_node,
3898 msg = result.RemoteFailMsg()
3900 logging.error("Instance migration succeeded, but finalization failed:"
3902 raise errors.OpExecError("Could not finalize instance migration: %s" %
3905 self._EnsureSecondary(source_node)
3906 self._WaitUntilSync()
3907 self._GoStandalone()
3908 self._GoReconnect(False)
3909 self._WaitUntilSync()
3911 self.feedback_fn("* done")
3913 def Exec(self, feedback_fn):
3914 """Perform the migration.
3917 self.feedback_fn = feedback_fn
3919 self.source_node = self.instance.primary_node
3920 self.target_node = self.instance.secondary_nodes[0]
3921 self.all_nodes = [self.source_node, self.target_node]
3923 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3924 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3927 return self._ExecCleanup()
3929 return self._ExecMigration()
3932 def _CreateBlockDev(lu, node, instance, device, force_create,
3934 """Create a tree of block devices on a given node.
3936 If this device type has to be created on secondaries, create it and
3939 If not, just recurse to children keeping the same 'force' value.
3941 @param lu: the lu on whose behalf we execute
3942 @param node: the node on which to create the device
3943 @type instance: L{objects.Instance}
3944 @param instance: the instance which owns the device
3945 @type device: L{objects.Disk}
3946 @param device: the device to create
3947 @type force_create: boolean
3948 @param force_create: whether to force creation of this device; this
3949 will be change to True whenever we find a device which has
3950 CreateOnSecondary() attribute
3951 @param info: the extra 'metadata' we should attach to the device
3952 (this will be represented as a LVM tag)
3953 @type force_open: boolean
3954 @param force_open: this parameter will be passes to the
3955 L{backend.BlockdevCreate} function where it specifies
3956 whether we run on primary or not, and it affects both
3957 the child assembly and the device own Open() execution
3960 if device.CreateOnSecondary():
3964 for child in device.children:
3965 _CreateBlockDev(lu, node, instance, child, force_create,
3968 if not force_create:
3971 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3974 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3975 """Create a single block device on a given node.
3977 This will not recurse over children of the device, so they must be
3980 @param lu: the lu on whose behalf we execute
3981 @param node: the node on which to create the device
3982 @type instance: L{objects.Instance}
3983 @param instance: the instance which owns the device
3984 @type device: L{objects.Disk}
3985 @param device: the device to create
3986 @param info: the extra 'metadata' we should attach to the device
3987 (this will be represented as a LVM tag)
3988 @type force_open: boolean
3989 @param force_open: this parameter will be passes to the
3990 L{backend.BlockdevCreate} function where it specifies
3991 whether we run on primary or not, and it affects both
3992 the child assembly and the device own Open() execution
3995 lu.cfg.SetDiskID(device, node)
3996 result = lu.rpc.call_blockdev_create(node, device, device.size,
3997 instance.name, force_open, info)
3998 msg = result.RemoteFailMsg()
4000 raise errors.OpExecError("Can't create block device %s on"
4001 " node %s for instance %s: %s" %
4002 (device, node, instance.name, msg))
4003 if device.physical_id is None:
4004 device.physical_id = result.payload
4007 def _GenerateUniqueNames(lu, exts):
4008 """Generate a suitable LV name.
4010 This will generate a logical volume name for the given instance.
4015 new_id = lu.cfg.GenerateUniqueID()
4016 results.append("%s%s" % (new_id, val))
4020 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4022 """Generate a drbd8 device complete with its children.
4025 port = lu.cfg.AllocatePort()
4026 vgname = lu.cfg.GetVGName()
4027 shared_secret = lu.cfg.GenerateDRBDSecret()
4028 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4029 logical_id=(vgname, names[0]))
4030 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4031 logical_id=(vgname, names[1]))
4032 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4033 logical_id=(primary, secondary, port,
4036 children=[dev_data, dev_meta],
4041 def _GenerateDiskTemplate(lu, template_name,
4042 instance_name, primary_node,
4043 secondary_nodes, disk_info,
4044 file_storage_dir, file_driver,
4046 """Generate the entire disk layout for a given template type.
4049 #TODO: compute space requirements
4051 vgname = lu.cfg.GetVGName()
4052 disk_count = len(disk_info)
4054 if template_name == constants.DT_DISKLESS:
4056 elif template_name == constants.DT_PLAIN:
4057 if len(secondary_nodes) != 0:
4058 raise errors.ProgrammerError("Wrong template configuration")
4060 names = _GenerateUniqueNames(lu, [".disk%d" % i
4061 for i in range(disk_count)])
4062 for idx, disk in enumerate(disk_info):
4063 disk_index = idx + base_index
4064 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4065 logical_id=(vgname, names[idx]),
4066 iv_name="disk/%d" % disk_index,
4068 disks.append(disk_dev)
4069 elif template_name == constants.DT_DRBD8:
4070 if len(secondary_nodes) != 1:
4071 raise errors.ProgrammerError("Wrong template configuration")
4072 remote_node = secondary_nodes[0]
4073 minors = lu.cfg.AllocateDRBDMinor(
4074 [primary_node, remote_node] * len(disk_info), instance_name)
4077 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4078 for i in range(disk_count)]):
4079 names.append(lv_prefix + "_data")
4080 names.append(lv_prefix + "_meta")
4081 for idx, disk in enumerate(disk_info):
4082 disk_index = idx + base_index
4083 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4084 disk["size"], names[idx*2:idx*2+2],
4085 "disk/%d" % disk_index,
4086 minors[idx*2], minors[idx*2+1])
4087 disk_dev.mode = disk["mode"]
4088 disks.append(disk_dev)
4089 elif template_name == constants.DT_FILE:
4090 if len(secondary_nodes) != 0:
4091 raise errors.ProgrammerError("Wrong template configuration")
4093 for idx, disk in enumerate(disk_info):
4094 disk_index = idx + base_index
4095 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4096 iv_name="disk/%d" % disk_index,
4097 logical_id=(file_driver,
4098 "%s/disk%d" % (file_storage_dir,
4101 disks.append(disk_dev)
4103 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4107 def _GetInstanceInfoText(instance):
4108 """Compute that text that should be added to the disk's metadata.
4111 return "originstname+%s" % instance.name
4114 def _CreateDisks(lu, instance):
4115 """Create all disks for an instance.
4117 This abstracts away some work from AddInstance.
4119 @type lu: L{LogicalUnit}
4120 @param lu: the logical unit on whose behalf we execute
4121 @type instance: L{objects.Instance}
4122 @param instance: the instance whose disks we should create
4124 @return: the success of the creation
4127 info = _GetInstanceInfoText(instance)
4128 pnode = instance.primary_node
4130 if instance.disk_template == constants.DT_FILE:
4131 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4132 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4134 if result.failed or not result.data:
4135 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4137 if not result.data[0]:
4138 raise errors.OpExecError("Failed to create directory '%s'" %
4141 # Note: this needs to be kept in sync with adding of disks in
4142 # LUSetInstanceParams
4143 for device in instance.disks:
4144 logging.info("Creating volume %s for instance %s",
4145 device.iv_name, instance.name)
4147 for node in instance.all_nodes:
4148 f_create = node == pnode
4149 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4152 def _RemoveDisks(lu, instance):
4153 """Remove all disks for an instance.
4155 This abstracts away some work from `AddInstance()` and
4156 `RemoveInstance()`. Note that in case some of the devices couldn't
4157 be removed, the removal will continue with the other ones (compare
4158 with `_CreateDisks()`).
4160 @type lu: L{LogicalUnit}
4161 @param lu: the logical unit on whose behalf we execute
4162 @type instance: L{objects.Instance}
4163 @param instance: the instance whose disks we should remove
4165 @return: the success of the removal
4168 logging.info("Removing block devices for instance %s", instance.name)
4171 for device in instance.disks:
4172 for node, disk in device.ComputeNodeTree(instance.primary_node):
4173 lu.cfg.SetDiskID(disk, node)
4174 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4176 lu.LogWarning("Could not remove block device %s on node %s,"
4177 " continuing anyway: %s", device.iv_name, node, msg)
4180 if instance.disk_template == constants.DT_FILE:
4181 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4182 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4184 if result.failed or not result.data:
4185 logging.error("Could not remove directory '%s'", file_storage_dir)
4191 def _ComputeDiskSize(disk_template, disks):
4192 """Compute disk size requirements in the volume group
4195 # Required free disk space as a function of disk and swap space
4197 constants.DT_DISKLESS: None,
4198 constants.DT_PLAIN: sum(d["size"] for d in disks),
4199 # 128 MB are added for drbd metadata for each disk
4200 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4201 constants.DT_FILE: None,
4204 if disk_template not in req_size_dict:
4205 raise errors.ProgrammerError("Disk template '%s' size requirement"
4206 " is unknown" % disk_template)
4208 return req_size_dict[disk_template]
4211 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4212 """Hypervisor parameter validation.
4214 This function abstract the hypervisor parameter validation to be
4215 used in both instance create and instance modify.
4217 @type lu: L{LogicalUnit}
4218 @param lu: the logical unit for which we check
4219 @type nodenames: list
4220 @param nodenames: the list of nodes on which we should check
4221 @type hvname: string
4222 @param hvname: the name of the hypervisor we should use
4223 @type hvparams: dict
4224 @param hvparams: the parameters which we need to check
4225 @raise errors.OpPrereqError: if the parameters are not valid
4228 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4231 for node in nodenames:
4235 msg = info.RemoteFailMsg()
4237 raise errors.OpPrereqError("Hypervisor parameter validation"
4238 " failed on node %s: %s" % (node, msg))
4241 class LUCreateInstance(LogicalUnit):
4242 """Create an instance.
4245 HPATH = "instance-add"
4246 HTYPE = constants.HTYPE_INSTANCE
4247 _OP_REQP = ["instance_name", "disks", "disk_template",
4249 "wait_for_sync", "ip_check", "nics",
4250 "hvparams", "beparams"]
4253 def _ExpandNode(self, node):
4254 """Expands and checks one node name.
4257 node_full = self.cfg.ExpandNodeName(node)
4258 if node_full is None:
4259 raise errors.OpPrereqError("Unknown node %s" % node)
4262 def ExpandNames(self):
4263 """ExpandNames for CreateInstance.
4265 Figure out the right locks for instance creation.
4268 self.needed_locks = {}
4270 # set optional parameters to none if they don't exist
4271 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4272 if not hasattr(self.op, attr):
4273 setattr(self.op, attr, None)
4275 # cheap checks, mostly valid constants given
4277 # verify creation mode
4278 if self.op.mode not in (constants.INSTANCE_CREATE,
4279 constants.INSTANCE_IMPORT):
4280 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4283 # disk template and mirror node verification
4284 if self.op.disk_template not in constants.DISK_TEMPLATES:
4285 raise errors.OpPrereqError("Invalid disk template name")
4287 if self.op.hypervisor is None:
4288 self.op.hypervisor = self.cfg.GetHypervisorType()
4290 cluster = self.cfg.GetClusterInfo()
4291 enabled_hvs = cluster.enabled_hypervisors
4292 if self.op.hypervisor not in enabled_hvs:
4293 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4294 " cluster (%s)" % (self.op.hypervisor,
4295 ",".join(enabled_hvs)))
4297 # check hypervisor parameter syntax (locally)
4298 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4299 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4301 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4302 hv_type.CheckParameterSyntax(filled_hvp)
4304 # fill and remember the beparams dict
4305 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4306 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4309 #### instance parameters check
4311 # instance name verification
4312 hostname1 = utils.HostInfo(self.op.instance_name)
4313 self.op.instance_name = instance_name = hostname1.name
4315 # this is just a preventive check, but someone might still add this
4316 # instance in the meantime, and creation will fail at lock-add time
4317 if instance_name in self.cfg.GetInstanceList():
4318 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4321 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4325 for nic in self.op.nics:
4326 # ip validity checks
4327 ip = nic.get("ip", None)
4328 if ip is None or ip.lower() == "none":
4330 elif ip.lower() == constants.VALUE_AUTO:
4331 nic_ip = hostname1.ip
4333 if not utils.IsValidIP(ip):
4334 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4335 " like a valid IP" % ip)
4338 # MAC address verification
4339 mac = nic.get("mac", constants.VALUE_AUTO)
4340 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4341 if not utils.IsValidMac(mac.lower()):
4342 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4344 # bridge verification
4345 bridge = nic.get("bridge", None)
4347 bridge = self.cfg.GetDefBridge()
4348 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4350 # disk checks/pre-build
4352 for disk in self.op.disks:
4353 mode = disk.get("mode", constants.DISK_RDWR)
4354 if mode not in constants.DISK_ACCESS_SET:
4355 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4357 size = disk.get("size", None)
4359 raise errors.OpPrereqError("Missing disk size")
4363 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4364 self.disks.append({"size": size, "mode": mode})
4366 # used in CheckPrereq for ip ping check
4367 self.check_ip = hostname1.ip
4369 # file storage checks
4370 if (self.op.file_driver and
4371 not self.op.file_driver in constants.FILE_DRIVER):
4372 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4373 self.op.file_driver)
4375 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4376 raise errors.OpPrereqError("File storage directory path not absolute")
4378 ### Node/iallocator related checks
4379 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4380 raise errors.OpPrereqError("One and only one of iallocator and primary"
4381 " node must be given")
4383 if self.op.iallocator:
4384 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4386 self.op.pnode = self._ExpandNode(self.op.pnode)
4387 nodelist = [self.op.pnode]
4388 if self.op.snode is not None:
4389 self.op.snode = self._ExpandNode(self.op.snode)
4390 nodelist.append(self.op.snode)
4391 self.needed_locks[locking.LEVEL_NODE] = nodelist
4393 # in case of import lock the source node too
4394 if self.op.mode == constants.INSTANCE_IMPORT:
4395 src_node = getattr(self.op, "src_node", None)
4396 src_path = getattr(self.op, "src_path", None)
4398 if src_path is None:
4399 self.op.src_path = src_path = self.op.instance_name
4401 if src_node is None:
4402 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4403 self.op.src_node = None
4404 if os.path.isabs(src_path):
4405 raise errors.OpPrereqError("Importing an instance from an absolute"
4406 " path requires a source node option.")
4408 self.op.src_node = src_node = self._ExpandNode(src_node)
4409 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4410 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4411 if not os.path.isabs(src_path):
4412 self.op.src_path = src_path = \
4413 os.path.join(constants.EXPORT_DIR, src_path)
4415 else: # INSTANCE_CREATE
4416 if getattr(self.op, "os_type", None) is None:
4417 raise errors.OpPrereqError("No guest OS specified")
4419 def _RunAllocator(self):
4420 """Run the allocator based on input opcode.
4423 nics = [n.ToDict() for n in self.nics]
4424 ial = IAllocator(self,
4425 mode=constants.IALLOCATOR_MODE_ALLOC,
4426 name=self.op.instance_name,
4427 disk_template=self.op.disk_template,
4430 vcpus=self.be_full[constants.BE_VCPUS],
4431 mem_size=self.be_full[constants.BE_MEMORY],
4434 hypervisor=self.op.hypervisor,
4437 ial.Run(self.op.iallocator)
4440 raise errors.OpPrereqError("Can't compute nodes using"
4441 " iallocator '%s': %s" % (self.op.iallocator,
4443 if len(ial.nodes) != ial.required_nodes:
4444 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4445 " of nodes (%s), required %s" %
4446 (self.op.iallocator, len(ial.nodes),
4447 ial.required_nodes))
4448 self.op.pnode = ial.nodes[0]
4449 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4450 self.op.instance_name, self.op.iallocator,
4451 ", ".join(ial.nodes))
4452 if ial.required_nodes == 2:
4453 self.op.snode = ial.nodes[1]
4455 def BuildHooksEnv(self):
4458 This runs on master, primary and secondary nodes of the instance.
4462 "ADD_MODE": self.op.mode,
4464 if self.op.mode == constants.INSTANCE_IMPORT:
4465 env["SRC_NODE"] = self.op.src_node
4466 env["SRC_PATH"] = self.op.src_path
4467 env["SRC_IMAGES"] = self.src_images
4469 env.update(_BuildInstanceHookEnv(
4470 name=self.op.instance_name,
4471 primary_node=self.op.pnode,
4472 secondary_nodes=self.secondaries,
4473 status=self.op.start,
4474 os_type=self.op.os_type,
4475 memory=self.be_full[constants.BE_MEMORY],
4476 vcpus=self.be_full[constants.BE_VCPUS],
4477 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4478 disk_template=self.op.disk_template,
4479 disks=[(d["size"], d["mode"]) for d in self.disks],
4482 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4487 def CheckPrereq(self):
4488 """Check prerequisites.
4491 if (not self.cfg.GetVGName() and
4492 self.op.disk_template not in constants.DTS_NOT_LVM):
4493 raise errors.OpPrereqError("Cluster does not support lvm-based"
4496 if self.op.mode == constants.INSTANCE_IMPORT:
4497 src_node = self.op.src_node
4498 src_path = self.op.src_path
4500 if src_node is None:
4501 exp_list = self.rpc.call_export_list(
4502 self.acquired_locks[locking.LEVEL_NODE])
4504 for node in exp_list:
4505 if not exp_list[node].failed and src_path in exp_list[node].data:
4507 self.op.src_node = src_node = node
4508 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4512 raise errors.OpPrereqError("No export found for relative path %s" %
4515 _CheckNodeOnline(self, src_node)
4516 result = self.rpc.call_export_info(src_node, src_path)
4519 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4521 export_info = result.data
4522 if not export_info.has_section(constants.INISECT_EXP):
4523 raise errors.ProgrammerError("Corrupted export config")
4525 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4526 if (int(ei_version) != constants.EXPORT_VERSION):
4527 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4528 (ei_version, constants.EXPORT_VERSION))
4530 # Check that the new instance doesn't have less disks than the export
4531 instance_disks = len(self.disks)
4532 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4533 if instance_disks < export_disks:
4534 raise errors.OpPrereqError("Not enough disks to import."
4535 " (instance: %d, export: %d)" %
4536 (instance_disks, export_disks))
4538 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4540 for idx in range(export_disks):
4541 option = 'disk%d_dump' % idx
4542 if export_info.has_option(constants.INISECT_INS, option):
4543 # FIXME: are the old os-es, disk sizes, etc. useful?
4544 export_name = export_info.get(constants.INISECT_INS, option)
4545 image = os.path.join(src_path, export_name)
4546 disk_images.append(image)
4548 disk_images.append(False)
4550 self.src_images = disk_images
4552 old_name = export_info.get(constants.INISECT_INS, 'name')
4553 # FIXME: int() here could throw a ValueError on broken exports
4554 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4555 if self.op.instance_name == old_name:
4556 for idx, nic in enumerate(self.nics):
4557 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4558 nic_mac_ini = 'nic%d_mac' % idx
4559 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4561 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4562 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4563 if self.op.start and not self.op.ip_check:
4564 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4565 " adding an instance in start mode")
4567 if self.op.ip_check:
4568 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4569 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4570 (self.check_ip, self.op.instance_name))
4572 #### mac address generation
4573 # By generating here the mac address both the allocator and the hooks get
4574 # the real final mac address rather than the 'auto' or 'generate' value.
4575 # There is a race condition between the generation and the instance object
4576 # creation, which means that we know the mac is valid now, but we're not
4577 # sure it will be when we actually add the instance. If things go bad
4578 # adding the instance will abort because of a duplicate mac, and the
4579 # creation job will fail.
4580 for nic in self.nics:
4581 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4582 nic.mac = self.cfg.GenerateMAC()
4586 if self.op.iallocator is not None:
4587 self._RunAllocator()
4589 #### node related checks
4591 # check primary node
4592 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4593 assert self.pnode is not None, \
4594 "Cannot retrieve locked node %s" % self.op.pnode
4596 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4599 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4602 self.secondaries = []
4604 # mirror node verification
4605 if self.op.disk_template in constants.DTS_NET_MIRROR:
4606 if self.op.snode is None:
4607 raise errors.OpPrereqError("The networked disk templates need"
4609 if self.op.snode == pnode.name:
4610 raise errors.OpPrereqError("The secondary node cannot be"
4611 " the primary node.")
4612 _CheckNodeOnline(self, self.op.snode)
4613 _CheckNodeNotDrained(self, self.op.snode)
4614 self.secondaries.append(self.op.snode)
4616 nodenames = [pnode.name] + self.secondaries
4618 req_size = _ComputeDiskSize(self.op.disk_template,
4621 # Check lv size requirements
4622 if req_size is not None:
4623 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4625 for node in nodenames:
4626 info = nodeinfo[node]
4630 raise errors.OpPrereqError("Cannot get current information"
4631 " from node '%s'" % node)
4632 vg_free = info.get('vg_free', None)
4633 if not isinstance(vg_free, int):
4634 raise errors.OpPrereqError("Can't compute free disk space on"
4636 if req_size > info['vg_free']:
4637 raise errors.OpPrereqError("Not enough disk space on target node %s."
4638 " %d MB available, %d MB required" %
4639 (node, info['vg_free'], req_size))
4641 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4644 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4646 if not isinstance(result.data, objects.OS):
4647 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4648 " primary node" % self.op.os_type)
4650 # bridge check on primary node
4651 bridges = [n.bridge for n in self.nics]
4652 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4655 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4656 " exist on destination node '%s'" %
4657 (",".join(bridges), pnode.name))
4659 # memory check on primary node
4661 _CheckNodeFreeMemory(self, self.pnode.name,
4662 "creating instance %s" % self.op.instance_name,
4663 self.be_full[constants.BE_MEMORY],
4666 def Exec(self, feedback_fn):
4667 """Create and add the instance to the cluster.
4670 instance = self.op.instance_name
4671 pnode_name = self.pnode.name
4673 ht_kind = self.op.hypervisor
4674 if ht_kind in constants.HTS_REQ_PORT:
4675 network_port = self.cfg.AllocatePort()
4679 ##if self.op.vnc_bind_address is None:
4680 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4682 # this is needed because os.path.join does not accept None arguments
4683 if self.op.file_storage_dir is None:
4684 string_file_storage_dir = ""
4686 string_file_storage_dir = self.op.file_storage_dir
4688 # build the full file storage dir path
4689 file_storage_dir = os.path.normpath(os.path.join(
4690 self.cfg.GetFileStorageDir(),
4691 string_file_storage_dir, instance))
4694 disks = _GenerateDiskTemplate(self,
4695 self.op.disk_template,
4696 instance, pnode_name,
4700 self.op.file_driver,
4703 iobj = objects.Instance(name=instance, os=self.op.os_type,
4704 primary_node=pnode_name,
4705 nics=self.nics, disks=disks,
4706 disk_template=self.op.disk_template,
4708 network_port=network_port,
4709 beparams=self.op.beparams,
4710 hvparams=self.op.hvparams,
4711 hypervisor=self.op.hypervisor,
4714 feedback_fn("* creating instance disks...")
4716 _CreateDisks(self, iobj)
4717 except errors.OpExecError:
4718 self.LogWarning("Device creation failed, reverting...")
4720 _RemoveDisks(self, iobj)
4722 self.cfg.ReleaseDRBDMinors(instance)
4725 feedback_fn("adding instance %s to cluster config" % instance)
4727 self.cfg.AddInstance(iobj)
4728 # Declare that we don't want to remove the instance lock anymore, as we've
4729 # added the instance to the config
4730 del self.remove_locks[locking.LEVEL_INSTANCE]
4731 # Unlock all the nodes
4732 if self.op.mode == constants.INSTANCE_IMPORT:
4733 nodes_keep = [self.op.src_node]
4734 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4735 if node != self.op.src_node]
4736 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4737 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4739 self.context.glm.release(locking.LEVEL_NODE)
4740 del self.acquired_locks[locking.LEVEL_NODE]
4742 if self.op.wait_for_sync:
4743 disk_abort = not _WaitForSync(self, iobj)
4744 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4745 # make sure the disks are not degraded (still sync-ing is ok)
4747 feedback_fn("* checking mirrors status")
4748 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4753 _RemoveDisks(self, iobj)
4754 self.cfg.RemoveInstance(iobj.name)
4755 # Make sure the instance lock gets removed
4756 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4757 raise errors.OpExecError("There are some degraded disks for"
4760 feedback_fn("creating os for instance %s on node %s" %
4761 (instance, pnode_name))
4763 if iobj.disk_template != constants.DT_DISKLESS:
4764 if self.op.mode == constants.INSTANCE_CREATE:
4765 feedback_fn("* running the instance OS create scripts...")
4766 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4767 msg = result.RemoteFailMsg()
4769 raise errors.OpExecError("Could not add os for instance %s"
4771 (instance, pnode_name, msg))
4773 elif self.op.mode == constants.INSTANCE_IMPORT:
4774 feedback_fn("* running the instance OS import scripts...")
4775 src_node = self.op.src_node
4776 src_images = self.src_images
4777 cluster_name = self.cfg.GetClusterName()
4778 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4779 src_node, src_images,
4781 import_result.Raise()
4782 for idx, result in enumerate(import_result.data):
4784 self.LogWarning("Could not import the image %s for instance"
4785 " %s, disk %d, on node %s" %
4786 (src_images[idx], instance, idx, pnode_name))
4788 # also checked in the prereq part
4789 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4793 iobj.admin_up = True
4794 self.cfg.Update(iobj)
4795 logging.info("Starting instance %s on node %s", instance, pnode_name)
4796 feedback_fn("* starting instance...")
4797 result = self.rpc.call_instance_start(pnode_name, iobj)
4798 msg = result.RemoteFailMsg()
4800 raise errors.OpExecError("Could not start instance: %s" % msg)
4803 class LUConnectConsole(NoHooksLU):
4804 """Connect to an instance's console.
4806 This is somewhat special in that it returns the command line that
4807 you need to run on the master node in order to connect to the
4811 _OP_REQP = ["instance_name"]
4814 def ExpandNames(self):
4815 self._ExpandAndLockInstance()
4817 def CheckPrereq(self):
4818 """Check prerequisites.
4820 This checks that the instance is in the cluster.
4823 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4824 assert self.instance is not None, \
4825 "Cannot retrieve locked instance %s" % self.op.instance_name
4826 _CheckNodeOnline(self, self.instance.primary_node)
4828 def Exec(self, feedback_fn):
4829 """Connect to the console of an instance
4832 instance = self.instance
4833 node = instance.primary_node
4835 node_insts = self.rpc.call_instance_list([node],
4836 [instance.hypervisor])[node]
4839 if instance.name not in node_insts.data:
4840 raise errors.OpExecError("Instance %s is not running." % instance.name)
4842 logging.debug("Connecting to console of %s on %s", instance.name, node)
4844 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4845 cluster = self.cfg.GetClusterInfo()
4846 # beparams and hvparams are passed separately, to avoid editing the
4847 # instance and then saving the defaults in the instance itself.
4848 hvparams = cluster.FillHV(instance)
4849 beparams = cluster.FillBE(instance)
4850 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4853 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4856 class LUReplaceDisks(LogicalUnit):
4857 """Replace the disks of an instance.
4860 HPATH = "mirrors-replace"
4861 HTYPE = constants.HTYPE_INSTANCE
4862 _OP_REQP = ["instance_name", "mode", "disks"]
4865 def CheckArguments(self):
4866 if not hasattr(self.op, "remote_node"):
4867 self.op.remote_node = None
4868 if not hasattr(self.op, "iallocator"):
4869 self.op.iallocator = None
4871 # check for valid parameter combination
4872 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4873 if self.op.mode == constants.REPLACE_DISK_CHG:
4875 raise errors.OpPrereqError("When changing the secondary either an"
4876 " iallocator script must be used or the"
4879 raise errors.OpPrereqError("Give either the iallocator or the new"
4880 " secondary, not both")
4881 else: # not replacing the secondary
4883 raise errors.OpPrereqError("The iallocator and new node options can"
4884 " be used only when changing the"
4887 def ExpandNames(self):
4888 self._ExpandAndLockInstance()
4890 if self.op.iallocator is not None:
4891 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4892 elif self.op.remote_node is not None:
4893 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4894 if remote_node is None:
4895 raise errors.OpPrereqError("Node '%s' not known" %
4896 self.op.remote_node)
4897 self.op.remote_node = remote_node
4898 # Warning: do not remove the locking of the new secondary here
4899 # unless DRBD8.AddChildren is changed to work in parallel;
4900 # currently it doesn't since parallel invocations of
4901 # FindUnusedMinor will conflict
4902 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4903 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4905 self.needed_locks[locking.LEVEL_NODE] = []
4906 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4908 def DeclareLocks(self, level):
4909 # If we're not already locking all nodes in the set we have to declare the
4910 # instance's primary/secondary nodes.
4911 if (level == locking.LEVEL_NODE and
4912 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4913 self._LockInstancesNodes()
4915 def _RunAllocator(self):
4916 """Compute a new secondary node using an IAllocator.
4919 ial = IAllocator(self,
4920 mode=constants.IALLOCATOR_MODE_RELOC,
4921 name=self.op.instance_name,
4922 relocate_from=[self.sec_node])
4924 ial.Run(self.op.iallocator)
4927 raise errors.OpPrereqError("Can't compute nodes using"
4928 " iallocator '%s': %s" % (self.op.iallocator,
4930 if len(ial.nodes) != ial.required_nodes:
4931 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4932 " of nodes (%s), required %s" %
4933 (len(ial.nodes), ial.required_nodes))
4934 self.op.remote_node = ial.nodes[0]
4935 self.LogInfo("Selected new secondary for the instance: %s",
4936 self.op.remote_node)
4938 def BuildHooksEnv(self):
4941 This runs on the master, the primary and all the secondaries.
4945 "MODE": self.op.mode,
4946 "NEW_SECONDARY": self.op.remote_node,
4947 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4949 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4951 self.cfg.GetMasterNode(),
4952 self.instance.primary_node,
4954 if self.op.remote_node is not None:
4955 nl.append(self.op.remote_node)
4958 def CheckPrereq(self):
4959 """Check prerequisites.
4961 This checks that the instance is in the cluster.
4964 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4965 assert instance is not None, \
4966 "Cannot retrieve locked instance %s" % self.op.instance_name
4967 self.instance = instance
4969 if instance.disk_template != constants.DT_DRBD8:
4970 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4973 if len(instance.secondary_nodes) != 1:
4974 raise errors.OpPrereqError("The instance has a strange layout,"
4975 " expected one secondary but found %d" %
4976 len(instance.secondary_nodes))
4978 self.sec_node = instance.secondary_nodes[0]
4980 if self.op.iallocator is not None:
4981 self._RunAllocator()
4983 remote_node = self.op.remote_node
4984 if remote_node is not None:
4985 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4986 assert self.remote_node_info is not None, \
4987 "Cannot retrieve locked node %s" % remote_node
4989 self.remote_node_info = None
4990 if remote_node == instance.primary_node:
4991 raise errors.OpPrereqError("The specified node is the primary node of"
4993 elif remote_node == self.sec_node:
4994 raise errors.OpPrereqError("The specified node is already the"
4995 " secondary node of the instance.")
4997 if self.op.mode == constants.REPLACE_DISK_PRI:
4998 n1 = self.tgt_node = instance.primary_node
4999 n2 = self.oth_node = self.sec_node
5000 elif self.op.mode == constants.REPLACE_DISK_SEC:
5001 n1 = self.tgt_node = self.sec_node
5002 n2 = self.oth_node = instance.primary_node
5003 elif self.op.mode == constants.REPLACE_DISK_CHG:
5004 n1 = self.new_node = remote_node
5005 n2 = self.oth_node = instance.primary_node
5006 self.tgt_node = self.sec_node
5007 _CheckNodeNotDrained(self, remote_node)
5009 raise errors.ProgrammerError("Unhandled disk replace mode")
5011 _CheckNodeOnline(self, n1)
5012 _CheckNodeOnline(self, n2)
5014 if not self.op.disks:
5015 self.op.disks = range(len(instance.disks))
5017 for disk_idx in self.op.disks:
5018 instance.FindDisk(disk_idx)
5020 def _ExecD8DiskOnly(self, feedback_fn):
5021 """Replace a disk on the primary or secondary for dbrd8.
5023 The algorithm for replace is quite complicated:
5025 1. for each disk to be replaced:
5027 1. create new LVs on the target node with unique names
5028 1. detach old LVs from the drbd device
5029 1. rename old LVs to name_replaced.<time_t>
5030 1. rename new LVs to old LVs
5031 1. attach the new LVs (with the old names now) to the drbd device
5033 1. wait for sync across all devices
5035 1. for each modified disk:
5037 1. remove old LVs (which have the name name_replaces.<time_t>)
5039 Failures are not very well handled.
5043 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5044 instance = self.instance
5046 vgname = self.cfg.GetVGName()
5049 tgt_node = self.tgt_node
5050 oth_node = self.oth_node
5052 # Step: check device activation
5053 self.proc.LogStep(1, steps_total, "check device existence")
5054 info("checking volume groups")
5055 my_vg = cfg.GetVGName()
5056 results = self.rpc.call_vg_list([oth_node, tgt_node])
5058 raise errors.OpExecError("Can't list volume groups on the nodes")
5059 for node in oth_node, tgt_node:
5061 if res.failed or not res.data or my_vg not in res.data:
5062 raise errors.OpExecError("Volume group '%s' not found on %s" %
5064 for idx, dev in enumerate(instance.disks):
5065 if idx not in self.op.disks:
5067 for node in tgt_node, oth_node:
5068 info("checking disk/%d on %s" % (idx, node))
5069 cfg.SetDiskID(dev, node)
5070 result = self.rpc.call_blockdev_find(node, dev)
5071 msg = result.RemoteFailMsg()
5072 if not msg and not result.payload:
5073 msg = "disk not found"
5075 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5078 # Step: check other node consistency
5079 self.proc.LogStep(2, steps_total, "check peer consistency")
5080 for idx, dev in enumerate(instance.disks):
5081 if idx not in self.op.disks:
5083 info("checking disk/%d consistency on %s" % (idx, oth_node))
5084 if not _CheckDiskConsistency(self, dev, oth_node,
5085 oth_node==instance.primary_node):
5086 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5087 " to replace disks on this node (%s)" %
5088 (oth_node, tgt_node))
5090 # Step: create new storage
5091 self.proc.LogStep(3, steps_total, "allocate new storage")
5092 for idx, dev in enumerate(instance.disks):
5093 if idx not in self.op.disks:
5096 cfg.SetDiskID(dev, tgt_node)
5097 lv_names = [".disk%d_%s" % (idx, suf)
5098 for suf in ["data", "meta"]]
5099 names = _GenerateUniqueNames(self, lv_names)
5100 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5101 logical_id=(vgname, names[0]))
5102 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5103 logical_id=(vgname, names[1]))
5104 new_lvs = [lv_data, lv_meta]
5105 old_lvs = dev.children
5106 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5107 info("creating new local storage on %s for %s" %
5108 (tgt_node, dev.iv_name))
5109 # we pass force_create=True to force the LVM creation
5110 for new_lv in new_lvs:
5111 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5112 _GetInstanceInfoText(instance), False)
5114 # Step: for each lv, detach+rename*2+attach
5115 self.proc.LogStep(4, steps_total, "change drbd configuration")
5116 for dev, old_lvs, new_lvs in iv_names.itervalues():
5117 info("detaching %s drbd from local storage" % dev.iv_name)
5118 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5121 raise errors.OpExecError("Can't detach drbd from local storage on node"
5122 " %s for device %s" % (tgt_node, dev.iv_name))
5124 #cfg.Update(instance)
5126 # ok, we created the new LVs, so now we know we have the needed
5127 # storage; as such, we proceed on the target node to rename
5128 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5129 # using the assumption that logical_id == physical_id (which in
5130 # turn is the unique_id on that node)
5132 # FIXME(iustin): use a better name for the replaced LVs
5133 temp_suffix = int(time.time())
5134 ren_fn = lambda d, suff: (d.physical_id[0],
5135 d.physical_id[1] + "_replaced-%s" % suff)
5136 # build the rename list based on what LVs exist on the node
5138 for to_ren in old_lvs:
5139 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5140 if not result.RemoteFailMsg() and result.payload:
5142 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5144 info("renaming the old LVs on the target node")
5145 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5148 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5149 # now we rename the new LVs to the old LVs
5150 info("renaming the new LVs on the target node")
5151 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5152 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5155 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5157 for old, new in zip(old_lvs, new_lvs):
5158 new.logical_id = old.logical_id
5159 cfg.SetDiskID(new, tgt_node)
5161 for disk in old_lvs:
5162 disk.logical_id = ren_fn(disk, temp_suffix)
5163 cfg.SetDiskID(disk, tgt_node)
5165 # now that the new lvs have the old name, we can add them to the device
5166 info("adding new mirror component on %s" % tgt_node)
5167 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5168 if result.failed or not result.data:
5169 for new_lv in new_lvs:
5170 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5172 warning("Can't rollback device %s: %s", dev, msg,
5173 hint="cleanup manually the unused logical volumes")
5174 raise errors.OpExecError("Can't add local storage to drbd")
5176 dev.children = new_lvs
5177 cfg.Update(instance)
5179 # Step: wait for sync
5181 # this can fail as the old devices are degraded and _WaitForSync
5182 # does a combined result over all disks, so we don't check its
5184 self.proc.LogStep(5, steps_total, "sync devices")
5185 _WaitForSync(self, instance, unlock=True)
5187 # so check manually all the devices
5188 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5189 cfg.SetDiskID(dev, instance.primary_node)
5190 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5191 msg = result.RemoteFailMsg()
5192 if not msg and not result.payload:
5193 msg = "disk not found"
5195 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5197 if result.payload[5]:
5198 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5200 # Step: remove old storage
5201 self.proc.LogStep(6, steps_total, "removing old storage")
5202 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5203 info("remove logical volumes for %s" % name)
5205 cfg.SetDiskID(lv, tgt_node)
5206 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5208 warning("Can't remove old LV: %s" % msg,
5209 hint="manually remove unused LVs")
5212 def _ExecD8Secondary(self, feedback_fn):
5213 """Replace the secondary node for drbd8.
5215 The algorithm for replace is quite complicated:
5216 - for all disks of the instance:
5217 - create new LVs on the new node with same names
5218 - shutdown the drbd device on the old secondary
5219 - disconnect the drbd network on the primary
5220 - create the drbd device on the new secondary
5221 - network attach the drbd on the primary, using an artifice:
5222 the drbd code for Attach() will connect to the network if it
5223 finds a device which is connected to the good local disks but
5225 - wait for sync across all devices
5226 - remove all disks from the old secondary
5228 Failures are not very well handled.
5232 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5233 instance = self.instance
5237 old_node = self.tgt_node
5238 new_node = self.new_node
5239 pri_node = instance.primary_node
5241 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5242 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5243 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5246 # Step: check device activation
5247 self.proc.LogStep(1, steps_total, "check device existence")
5248 info("checking volume groups")
5249 my_vg = cfg.GetVGName()
5250 results = self.rpc.call_vg_list([pri_node, new_node])
5251 for node in pri_node, new_node:
5253 if res.failed or not res.data or my_vg not in res.data:
5254 raise errors.OpExecError("Volume group '%s' not found on %s" %
5256 for idx, dev in enumerate(instance.disks):
5257 if idx not in self.op.disks:
5259 info("checking disk/%d on %s" % (idx, pri_node))
5260 cfg.SetDiskID(dev, pri_node)
5261 result = self.rpc.call_blockdev_find(pri_node, dev)
5262 msg = result.RemoteFailMsg()
5263 if not msg and not result.payload:
5264 msg = "disk not found"
5266 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5267 (idx, pri_node, msg))
5269 # Step: check other node consistency
5270 self.proc.LogStep(2, steps_total, "check peer consistency")
5271 for idx, dev in enumerate(instance.disks):
5272 if idx not in self.op.disks:
5274 info("checking disk/%d consistency on %s" % (idx, pri_node))
5275 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5276 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5277 " unsafe to replace the secondary" %
5280 # Step: create new storage
5281 self.proc.LogStep(3, steps_total, "allocate new storage")
5282 for idx, dev in enumerate(instance.disks):
5283 info("adding new local storage on %s for disk/%d" %
5285 # we pass force_create=True to force LVM creation
5286 for new_lv in dev.children:
5287 _CreateBlockDev(self, new_node, instance, new_lv, True,
5288 _GetInstanceInfoText(instance), False)
5290 # Step 4: dbrd minors and drbd setups changes
5291 # after this, we must manually remove the drbd minors on both the
5292 # error and the success paths
5293 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5295 logging.debug("Allocated minors %s" % (minors,))
5296 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5297 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5299 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5300 # create new devices on new_node; note that we create two IDs:
5301 # one without port, so the drbd will be activated without
5302 # networking information on the new node at this stage, and one
5303 # with network, for the latter activation in step 4
5304 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5305 if pri_node == o_node1:
5310 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5311 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5313 iv_names[idx] = (dev, dev.children, new_net_id)
5314 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5316 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5317 logical_id=new_alone_id,
5318 children=dev.children)
5320 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5321 _GetInstanceInfoText(instance), False)
5322 except errors.GenericError:
5323 self.cfg.ReleaseDRBDMinors(instance.name)
5326 for idx, dev in enumerate(instance.disks):
5327 # we have new devices, shutdown the drbd on the old secondary
5328 info("shutting down drbd for disk/%d on old node" % idx)
5329 cfg.SetDiskID(dev, old_node)
5330 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5332 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5334 hint="Please cleanup this device manually as soon as possible")
5336 info("detaching primary drbds from the network (=> standalone)")
5337 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5338 instance.disks)[pri_node]
5340 msg = result.RemoteFailMsg()
5342 # detaches didn't succeed (unlikely)
5343 self.cfg.ReleaseDRBDMinors(instance.name)
5344 raise errors.OpExecError("Can't detach the disks from the network on"
5345 " old node: %s" % (msg,))
5347 # if we managed to detach at least one, we update all the disks of
5348 # the instance to point to the new secondary
5349 info("updating instance configuration")
5350 for dev, _, new_logical_id in iv_names.itervalues():
5351 dev.logical_id = new_logical_id
5352 cfg.SetDiskID(dev, pri_node)
5353 cfg.Update(instance)
5355 # and now perform the drbd attach
5356 info("attaching primary drbds to new secondary (standalone => connected)")
5357 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5358 instance.disks, instance.name,
5360 for to_node, to_result in result.items():
5361 msg = to_result.RemoteFailMsg()
5363 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5364 hint="please do a gnt-instance info to see the"
5367 # this can fail as the old devices are degraded and _WaitForSync
5368 # does a combined result over all disks, so we don't check its
5370 self.proc.LogStep(5, steps_total, "sync devices")
5371 _WaitForSync(self, instance, unlock=True)
5373 # so check manually all the devices
5374 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5375 cfg.SetDiskID(dev, pri_node)
5376 result = self.rpc.call_blockdev_find(pri_node, dev)
5377 msg = result.RemoteFailMsg()
5378 if not msg and not result.payload:
5379 msg = "disk not found"
5381 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5383 if result.payload[5]:
5384 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5386 self.proc.LogStep(6, steps_total, "removing old storage")
5387 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5388 info("remove logical volumes for disk/%d" % idx)
5390 cfg.SetDiskID(lv, old_node)
5391 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5393 warning("Can't remove LV on old secondary: %s", msg,
5394 hint="Cleanup stale volumes by hand")
5396 def Exec(self, feedback_fn):
5397 """Execute disk replacement.
5399 This dispatches the disk replacement to the appropriate handler.
5402 instance = self.instance
5404 # Activate the instance disks if we're replacing them on a down instance
5405 if not instance.admin_up:
5406 _StartInstanceDisks(self, instance, True)
5408 if self.op.mode == constants.REPLACE_DISK_CHG:
5409 fn = self._ExecD8Secondary
5411 fn = self._ExecD8DiskOnly
5413 ret = fn(feedback_fn)
5415 # Deactivate the instance disks if we're replacing them on a down instance
5416 if not instance.admin_up:
5417 _SafeShutdownInstanceDisks(self, instance)
5422 class LUGrowDisk(LogicalUnit):
5423 """Grow a disk of an instance.
5427 HTYPE = constants.HTYPE_INSTANCE
5428 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5431 def ExpandNames(self):
5432 self._ExpandAndLockInstance()
5433 self.needed_locks[locking.LEVEL_NODE] = []
5434 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5436 def DeclareLocks(self, level):
5437 if level == locking.LEVEL_NODE:
5438 self._LockInstancesNodes()
5440 def BuildHooksEnv(self):
5443 This runs on the master, the primary and all the secondaries.
5447 "DISK": self.op.disk,
5448 "AMOUNT": self.op.amount,
5450 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5452 self.cfg.GetMasterNode(),
5453 self.instance.primary_node,
5457 def CheckPrereq(self):
5458 """Check prerequisites.
5460 This checks that the instance is in the cluster.
5463 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5464 assert instance is not None, \
5465 "Cannot retrieve locked instance %s" % self.op.instance_name
5466 nodenames = list(instance.all_nodes)
5467 for node in nodenames:
5468 _CheckNodeOnline(self, node)
5471 self.instance = instance
5473 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5474 raise errors.OpPrereqError("Instance's disk layout does not support"
5477 self.disk = instance.FindDisk(self.op.disk)
5479 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5480 instance.hypervisor)
5481 for node in nodenames:
5482 info = nodeinfo[node]
5483 if info.failed or not info.data:
5484 raise errors.OpPrereqError("Cannot get current information"
5485 " from node '%s'" % node)
5486 vg_free = info.data.get('vg_free', None)
5487 if not isinstance(vg_free, int):
5488 raise errors.OpPrereqError("Can't compute free disk space on"
5490 if self.op.amount > vg_free:
5491 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5492 " %d MiB available, %d MiB required" %
5493 (node, vg_free, self.op.amount))
5495 def Exec(self, feedback_fn):
5496 """Execute disk grow.
5499 instance = self.instance
5501 for node in instance.all_nodes:
5502 self.cfg.SetDiskID(disk, node)
5503 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5504 msg = result.RemoteFailMsg()
5506 raise errors.OpExecError("Grow request failed to node %s: %s" %
5508 disk.RecordGrow(self.op.amount)
5509 self.cfg.Update(instance)
5510 if self.op.wait_for_sync:
5511 disk_abort = not _WaitForSync(self, instance)
5513 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5514 " status.\nPlease check the instance.")
5517 class LUQueryInstanceData(NoHooksLU):
5518 """Query runtime instance data.
5521 _OP_REQP = ["instances", "static"]
5524 def ExpandNames(self):
5525 self.needed_locks = {}
5526 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5528 if not isinstance(self.op.instances, list):
5529 raise errors.OpPrereqError("Invalid argument type 'instances'")
5531 if self.op.instances:
5532 self.wanted_names = []
5533 for name in self.op.instances:
5534 full_name = self.cfg.ExpandInstanceName(name)
5535 if full_name is None:
5536 raise errors.OpPrereqError("Instance '%s' not known" % name)
5537 self.wanted_names.append(full_name)
5538 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5540 self.wanted_names = None
5541 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5543 self.needed_locks[locking.LEVEL_NODE] = []
5544 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5546 def DeclareLocks(self, level):
5547 if level == locking.LEVEL_NODE:
5548 self._LockInstancesNodes()
5550 def CheckPrereq(self):
5551 """Check prerequisites.
5553 This only checks the optional instance list against the existing names.
5556 if self.wanted_names is None:
5557 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5559 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5560 in self.wanted_names]
5563 def _ComputeDiskStatus(self, instance, snode, dev):
5564 """Compute block device status.
5567 static = self.op.static
5569 self.cfg.SetDiskID(dev, instance.primary_node)
5570 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5571 if dev_pstatus.offline:
5574 msg = dev_pstatus.RemoteFailMsg()
5576 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5577 (instance.name, msg))
5578 dev_pstatus = dev_pstatus.payload
5582 if dev.dev_type in constants.LDS_DRBD:
5583 # we change the snode then (otherwise we use the one passed in)
5584 if dev.logical_id[0] == instance.primary_node:
5585 snode = dev.logical_id[1]
5587 snode = dev.logical_id[0]
5589 if snode and not static:
5590 self.cfg.SetDiskID(dev, snode)
5591 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5592 if dev_sstatus.offline:
5595 msg = dev_sstatus.RemoteFailMsg()
5597 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5598 (instance.name, msg))
5599 dev_sstatus = dev_sstatus.payload
5604 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5605 for child in dev.children]
5610 "iv_name": dev.iv_name,
5611 "dev_type": dev.dev_type,
5612 "logical_id": dev.logical_id,
5613 "physical_id": dev.physical_id,
5614 "pstatus": dev_pstatus,
5615 "sstatus": dev_sstatus,
5616 "children": dev_children,
5622 def Exec(self, feedback_fn):
5623 """Gather and return data"""
5626 cluster = self.cfg.GetClusterInfo()
5628 for instance in self.wanted_instances:
5629 if not self.op.static:
5630 remote_info = self.rpc.call_instance_info(instance.primary_node,
5632 instance.hypervisor)
5634 remote_info = remote_info.data
5635 if remote_info and "state" in remote_info:
5638 remote_state = "down"
5641 if instance.admin_up:
5644 config_state = "down"
5646 disks = [self._ComputeDiskStatus(instance, None, device)
5647 for device in instance.disks]
5650 "name": instance.name,
5651 "config_state": config_state,
5652 "run_state": remote_state,
5653 "pnode": instance.primary_node,
5654 "snodes": instance.secondary_nodes,
5656 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5658 "hypervisor": instance.hypervisor,
5659 "network_port": instance.network_port,
5660 "hv_instance": instance.hvparams,
5661 "hv_actual": cluster.FillHV(instance),
5662 "be_instance": instance.beparams,
5663 "be_actual": cluster.FillBE(instance),
5666 result[instance.name] = idict
5671 class LUSetInstanceParams(LogicalUnit):
5672 """Modifies an instances's parameters.
5675 HPATH = "instance-modify"
5676 HTYPE = constants.HTYPE_INSTANCE
5677 _OP_REQP = ["instance_name"]
5680 def CheckArguments(self):
5681 if not hasattr(self.op, 'nics'):
5683 if not hasattr(self.op, 'disks'):
5685 if not hasattr(self.op, 'beparams'):
5686 self.op.beparams = {}
5687 if not hasattr(self.op, 'hvparams'):
5688 self.op.hvparams = {}
5689 self.op.force = getattr(self.op, "force", False)
5690 if not (self.op.nics or self.op.disks or
5691 self.op.hvparams or self.op.beparams):
5692 raise errors.OpPrereqError("No changes submitted")
5696 for disk_op, disk_dict in self.op.disks:
5697 if disk_op == constants.DDM_REMOVE:
5700 elif disk_op == constants.DDM_ADD:
5703 if not isinstance(disk_op, int):
5704 raise errors.OpPrereqError("Invalid disk index")
5705 if disk_op == constants.DDM_ADD:
5706 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5707 if mode not in constants.DISK_ACCESS_SET:
5708 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5709 size = disk_dict.get('size', None)
5711 raise errors.OpPrereqError("Required disk parameter size missing")
5714 except ValueError, err:
5715 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5717 disk_dict['size'] = size
5719 # modification of disk
5720 if 'size' in disk_dict:
5721 raise errors.OpPrereqError("Disk size change not possible, use"
5724 if disk_addremove > 1:
5725 raise errors.OpPrereqError("Only one disk add or remove operation"
5726 " supported at a time")
5730 for nic_op, nic_dict in self.op.nics:
5731 if nic_op == constants.DDM_REMOVE:
5734 elif nic_op == constants.DDM_ADD:
5737 if not isinstance(nic_op, int):
5738 raise errors.OpPrereqError("Invalid nic index")
5740 # nic_dict should be a dict
5741 nic_ip = nic_dict.get('ip', None)
5742 if nic_ip is not None:
5743 if nic_ip.lower() == constants.VALUE_NONE:
5744 nic_dict['ip'] = None
5746 if not utils.IsValidIP(nic_ip):
5747 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5749 if nic_op == constants.DDM_ADD:
5750 nic_bridge = nic_dict.get('bridge', None)
5751 if nic_bridge is None:
5752 nic_dict['bridge'] = self.cfg.GetDefBridge()
5753 nic_mac = nic_dict.get('mac', None)
5755 nic_dict['mac'] = constants.VALUE_AUTO
5757 if 'mac' in nic_dict:
5758 nic_mac = nic_dict['mac']
5759 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5760 if not utils.IsValidMac(nic_mac):
5761 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5762 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5763 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5764 " modifying an existing nic")
5766 if nic_addremove > 1:
5767 raise errors.OpPrereqError("Only one NIC add or remove operation"
5768 " supported at a time")
5770 def ExpandNames(self):
5771 self._ExpandAndLockInstance()
5772 self.needed_locks[locking.LEVEL_NODE] = []
5773 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5775 def DeclareLocks(self, level):
5776 if level == locking.LEVEL_NODE:
5777 self._LockInstancesNodes()
5779 def BuildHooksEnv(self):
5782 This runs on the master, primary and secondaries.
5786 if constants.BE_MEMORY in self.be_new:
5787 args['memory'] = self.be_new[constants.BE_MEMORY]
5788 if constants.BE_VCPUS in self.be_new:
5789 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5790 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5791 # information at all.
5794 nic_override = dict(self.op.nics)
5795 for idx, nic in enumerate(self.instance.nics):
5796 if idx in nic_override:
5797 this_nic_override = nic_override[idx]
5799 this_nic_override = {}
5800 if 'ip' in this_nic_override:
5801 ip = this_nic_override['ip']
5804 if 'bridge' in this_nic_override:
5805 bridge = this_nic_override['bridge']
5808 if 'mac' in this_nic_override:
5809 mac = this_nic_override['mac']
5812 args['nics'].append((ip, bridge, mac))
5813 if constants.DDM_ADD in nic_override:
5814 ip = nic_override[constants.DDM_ADD].get('ip', None)
5815 bridge = nic_override[constants.DDM_ADD]['bridge']
5816 mac = nic_override[constants.DDM_ADD]['mac']
5817 args['nics'].append((ip, bridge, mac))
5818 elif constants.DDM_REMOVE in nic_override:
5819 del args['nics'][-1]
5821 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5822 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5825 def CheckPrereq(self):
5826 """Check prerequisites.
5828 This only checks the instance list against the existing names.
5831 force = self.force = self.op.force
5833 # checking the new params on the primary/secondary nodes
5835 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5836 assert self.instance is not None, \
5837 "Cannot retrieve locked instance %s" % self.op.instance_name
5838 pnode = instance.primary_node
5839 nodelist = list(instance.all_nodes)
5841 # hvparams processing
5842 if self.op.hvparams:
5843 i_hvdict = copy.deepcopy(instance.hvparams)
5844 for key, val in self.op.hvparams.iteritems():
5845 if val == constants.VALUE_DEFAULT:
5852 cluster = self.cfg.GetClusterInfo()
5853 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5854 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5857 hypervisor.GetHypervisor(
5858 instance.hypervisor).CheckParameterSyntax(hv_new)
5859 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5860 self.hv_new = hv_new # the new actual values
5861 self.hv_inst = i_hvdict # the new dict (without defaults)
5863 self.hv_new = self.hv_inst = {}
5865 # beparams processing
5866 if self.op.beparams:
5867 i_bedict = copy.deepcopy(instance.beparams)
5868 for key, val in self.op.beparams.iteritems():
5869 if val == constants.VALUE_DEFAULT:
5876 cluster = self.cfg.GetClusterInfo()
5877 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5878 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5880 self.be_new = be_new # the new actual values
5881 self.be_inst = i_bedict # the new dict (without defaults)
5883 self.be_new = self.be_inst = {}
5887 if constants.BE_MEMORY in self.op.beparams and not self.force:
5888 mem_check_list = [pnode]
5889 if be_new[constants.BE_AUTO_BALANCE]:
5890 # either we changed auto_balance to yes or it was from before
5891 mem_check_list.extend(instance.secondary_nodes)
5892 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5893 instance.hypervisor)
5894 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5895 instance.hypervisor)
5896 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5897 # Assume the primary node is unreachable and go ahead
5898 self.warn.append("Can't get info from primary node %s" % pnode)
5900 if not instance_info.failed and instance_info.data:
5901 current_mem = instance_info.data['memory']
5903 # Assume instance not running
5904 # (there is a slight race condition here, but it's not very probable,
5905 # and we have no other way to check)
5907 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5908 nodeinfo[pnode].data['memory_free'])
5910 raise errors.OpPrereqError("This change will prevent the instance"
5911 " from starting, due to %d MB of memory"
5912 " missing on its primary node" % miss_mem)
5914 if be_new[constants.BE_AUTO_BALANCE]:
5915 for node, nres in nodeinfo.iteritems():
5916 if node not in instance.secondary_nodes:
5918 if nres.failed or not isinstance(nres.data, dict):
5919 self.warn.append("Can't get info from secondary node %s" % node)
5920 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5921 self.warn.append("Not enough memory to failover instance to"
5922 " secondary node %s" % node)
5925 for nic_op, nic_dict in self.op.nics:
5926 if nic_op == constants.DDM_REMOVE:
5927 if not instance.nics:
5928 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5930 if nic_op != constants.DDM_ADD:
5932 if nic_op < 0 or nic_op >= len(instance.nics):
5933 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5935 (nic_op, len(instance.nics)))
5936 if 'bridge' in nic_dict:
5937 nic_bridge = nic_dict['bridge']
5938 if nic_bridge is None:
5939 raise errors.OpPrereqError('Cannot set the nic bridge to None')
5940 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5941 msg = ("Bridge '%s' doesn't exist on one of"
5942 " the instance nodes" % nic_bridge)
5944 self.warn.append(msg)
5946 raise errors.OpPrereqError(msg)
5947 if 'mac' in nic_dict:
5948 nic_mac = nic_dict['mac']
5950 raise errors.OpPrereqError('Cannot set the nic mac to None')
5951 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5952 # otherwise generate the mac
5953 nic_dict['mac'] = self.cfg.GenerateMAC()
5955 # or validate/reserve the current one
5956 if self.cfg.IsMacInUse(nic_mac):
5957 raise errors.OpPrereqError("MAC address %s already in use"
5958 " in cluster" % nic_mac)
5961 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5962 raise errors.OpPrereqError("Disk operations not supported for"
5963 " diskless instances")
5964 for disk_op, disk_dict in self.op.disks:
5965 if disk_op == constants.DDM_REMOVE:
5966 if len(instance.disks) == 1:
5967 raise errors.OpPrereqError("Cannot remove the last disk of"
5969 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5970 ins_l = ins_l[pnode]
5971 if ins_l.failed or not isinstance(ins_l.data, list):
5972 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5973 if instance.name in ins_l.data:
5974 raise errors.OpPrereqError("Instance is running, can't remove"
5977 if (disk_op == constants.DDM_ADD and
5978 len(instance.nics) >= constants.MAX_DISKS):
5979 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5980 " add more" % constants.MAX_DISKS)
5981 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5983 if disk_op < 0 or disk_op >= len(instance.disks):
5984 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5986 (disk_op, len(instance.disks)))
5990 def Exec(self, feedback_fn):
5991 """Modifies an instance.
5993 All parameters take effect only at the next restart of the instance.
5996 # Process here the warnings from CheckPrereq, as we don't have a
5997 # feedback_fn there.
5998 for warn in self.warn:
5999 feedback_fn("WARNING: %s" % warn)
6002 instance = self.instance
6004 for disk_op, disk_dict in self.op.disks:
6005 if disk_op == constants.DDM_REMOVE:
6006 # remove the last disk
6007 device = instance.disks.pop()
6008 device_idx = len(instance.disks)
6009 for node, disk in device.ComputeNodeTree(instance.primary_node):
6010 self.cfg.SetDiskID(disk, node)
6011 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6013 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6014 " continuing anyway", device_idx, node, msg)
6015 result.append(("disk/%d" % device_idx, "remove"))
6016 elif disk_op == constants.DDM_ADD:
6018 if instance.disk_template == constants.DT_FILE:
6019 file_driver, file_path = instance.disks[0].logical_id
6020 file_path = os.path.dirname(file_path)
6022 file_driver = file_path = None
6023 disk_idx_base = len(instance.disks)
6024 new_disk = _GenerateDiskTemplate(self,
6025 instance.disk_template,
6026 instance.name, instance.primary_node,
6027 instance.secondary_nodes,
6032 instance.disks.append(new_disk)
6033 info = _GetInstanceInfoText(instance)
6035 logging.info("Creating volume %s for instance %s",
6036 new_disk.iv_name, instance.name)
6037 # Note: this needs to be kept in sync with _CreateDisks
6039 for node in instance.all_nodes:
6040 f_create = node == instance.primary_node
6042 _CreateBlockDev(self, node, instance, new_disk,
6043 f_create, info, f_create)
6044 except errors.OpExecError, err:
6045 self.LogWarning("Failed to create volume %s (%s) on"
6047 new_disk.iv_name, new_disk, node, err)
6048 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6049 (new_disk.size, new_disk.mode)))
6051 # change a given disk
6052 instance.disks[disk_op].mode = disk_dict['mode']
6053 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6055 for nic_op, nic_dict in self.op.nics:
6056 if nic_op == constants.DDM_REMOVE:
6057 # remove the last nic
6058 del instance.nics[-1]
6059 result.append(("nic.%d" % len(instance.nics), "remove"))
6060 elif nic_op == constants.DDM_ADD:
6061 # mac and bridge should be set, by now
6062 mac = nic_dict['mac']
6063 bridge = nic_dict['bridge']
6064 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6066 instance.nics.append(new_nic)
6067 result.append(("nic.%d" % (len(instance.nics) - 1),
6068 "add:mac=%s,ip=%s,bridge=%s" %
6069 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6071 # change a given nic
6072 for key in 'mac', 'ip', 'bridge':
6074 setattr(instance.nics[nic_op], key, nic_dict[key])
6075 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6078 if self.op.hvparams:
6079 instance.hvparams = self.hv_inst
6080 for key, val in self.op.hvparams.iteritems():
6081 result.append(("hv/%s" % key, val))
6084 if self.op.beparams:
6085 instance.beparams = self.be_inst
6086 for key, val in self.op.beparams.iteritems():
6087 result.append(("be/%s" % key, val))
6089 self.cfg.Update(instance)
6094 class LUQueryExports(NoHooksLU):
6095 """Query the exports list
6098 _OP_REQP = ['nodes']
6101 def ExpandNames(self):
6102 self.needed_locks = {}
6103 self.share_locks[locking.LEVEL_NODE] = 1
6104 if not self.op.nodes:
6105 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6107 self.needed_locks[locking.LEVEL_NODE] = \
6108 _GetWantedNodes(self, self.op.nodes)
6110 def CheckPrereq(self):
6111 """Check prerequisites.
6114 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6116 def Exec(self, feedback_fn):
6117 """Compute the list of all the exported system images.
6120 @return: a dictionary with the structure node->(export-list)
6121 where export-list is a list of the instances exported on
6125 rpcresult = self.rpc.call_export_list(self.nodes)
6127 for node in rpcresult:
6128 if rpcresult[node].failed:
6129 result[node] = False
6131 result[node] = rpcresult[node].data
6136 class LUExportInstance(LogicalUnit):
6137 """Export an instance to an image in the cluster.
6140 HPATH = "instance-export"
6141 HTYPE = constants.HTYPE_INSTANCE
6142 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6145 def ExpandNames(self):
6146 self._ExpandAndLockInstance()
6147 # FIXME: lock only instance primary and destination node
6149 # Sad but true, for now we have do lock all nodes, as we don't know where
6150 # the previous export might be, and and in this LU we search for it and
6151 # remove it from its current node. In the future we could fix this by:
6152 # - making a tasklet to search (share-lock all), then create the new one,
6153 # then one to remove, after
6154 # - removing the removal operation altoghether
6155 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6157 def DeclareLocks(self, level):
6158 """Last minute lock declaration."""
6159 # All nodes are locked anyway, so nothing to do here.
6161 def BuildHooksEnv(self):
6164 This will run on the master, primary node and target node.
6168 "EXPORT_NODE": self.op.target_node,
6169 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6171 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6172 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6173 self.op.target_node]
6176 def CheckPrereq(self):
6177 """Check prerequisites.
6179 This checks that the instance and node names are valid.
6182 instance_name = self.op.instance_name
6183 self.instance = self.cfg.GetInstanceInfo(instance_name)
6184 assert self.instance is not None, \
6185 "Cannot retrieve locked instance %s" % self.op.instance_name
6186 _CheckNodeOnline(self, self.instance.primary_node)
6188 self.dst_node = self.cfg.GetNodeInfo(
6189 self.cfg.ExpandNodeName(self.op.target_node))
6191 if self.dst_node is None:
6192 # This is wrong node name, not a non-locked node
6193 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6194 _CheckNodeOnline(self, self.dst_node.name)
6195 _CheckNodeNotDrained(self, self.dst_node.name)
6197 # instance disk type verification
6198 for disk in self.instance.disks:
6199 if disk.dev_type == constants.LD_FILE:
6200 raise errors.OpPrereqError("Export not supported for instances with"
6201 " file-based disks")
6203 def Exec(self, feedback_fn):
6204 """Export an instance to an image in the cluster.
6207 instance = self.instance
6208 dst_node = self.dst_node
6209 src_node = instance.primary_node
6210 if self.op.shutdown:
6211 # shutdown the instance, but not the disks
6212 result = self.rpc.call_instance_shutdown(src_node, instance)
6213 msg = result.RemoteFailMsg()
6215 raise errors.OpExecError("Could not shutdown instance %s on"
6217 (instance.name, src_node, msg))
6219 vgname = self.cfg.GetVGName()
6223 # set the disks ID correctly since call_instance_start needs the
6224 # correct drbd minor to create the symlinks
6225 for disk in instance.disks:
6226 self.cfg.SetDiskID(disk, src_node)
6229 for disk in instance.disks:
6230 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6231 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6232 if new_dev_name.failed or not new_dev_name.data:
6233 self.LogWarning("Could not snapshot block device %s on node %s",
6234 disk.logical_id[1], src_node)
6235 snap_disks.append(False)
6237 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6238 logical_id=(vgname, new_dev_name.data),
6239 physical_id=(vgname, new_dev_name.data),
6240 iv_name=disk.iv_name)
6241 snap_disks.append(new_dev)
6244 if self.op.shutdown and instance.admin_up:
6245 result = self.rpc.call_instance_start(src_node, instance)
6246 msg = result.RemoteFailMsg()
6248 _ShutdownInstanceDisks(self, instance)
6249 raise errors.OpExecError("Could not start instance: %s" % msg)
6251 # TODO: check for size
6253 cluster_name = self.cfg.GetClusterName()
6254 for idx, dev in enumerate(snap_disks):
6256 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6257 instance, cluster_name, idx)
6258 if result.failed or not result.data:
6259 self.LogWarning("Could not export block device %s from node %s to"
6260 " node %s", dev.logical_id[1], src_node,
6262 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6264 self.LogWarning("Could not remove snapshot block device %s from node"
6265 " %s: %s", dev.logical_id[1], src_node, msg)
6267 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6268 if result.failed or not result.data:
6269 self.LogWarning("Could not finalize export for instance %s on node %s",
6270 instance.name, dst_node.name)
6272 nodelist = self.cfg.GetNodeList()
6273 nodelist.remove(dst_node.name)
6275 # on one-node clusters nodelist will be empty after the removal
6276 # if we proceed the backup would be removed because OpQueryExports
6277 # substitutes an empty list with the full cluster node list.
6279 exportlist = self.rpc.call_export_list(nodelist)
6280 for node in exportlist:
6281 if exportlist[node].failed:
6283 if instance.name in exportlist[node].data:
6284 if not self.rpc.call_export_remove(node, instance.name):
6285 self.LogWarning("Could not remove older export for instance %s"
6286 " on node %s", instance.name, node)
6289 class LURemoveExport(NoHooksLU):
6290 """Remove exports related to the named instance.
6293 _OP_REQP = ["instance_name"]
6296 def ExpandNames(self):
6297 self.needed_locks = {}
6298 # We need all nodes to be locked in order for RemoveExport to work, but we
6299 # don't need to lock the instance itself, as nothing will happen to it (and
6300 # we can remove exports also for a removed instance)
6301 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6303 def CheckPrereq(self):
6304 """Check prerequisites.
6308 def Exec(self, feedback_fn):
6309 """Remove any export.
6312 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6313 # If the instance was not found we'll try with the name that was passed in.
6314 # This will only work if it was an FQDN, though.
6316 if not instance_name:
6318 instance_name = self.op.instance_name
6320 exportlist = self.rpc.call_export_list(self.acquired_locks[
6321 locking.LEVEL_NODE])
6323 for node in exportlist:
6324 if exportlist[node].failed:
6325 self.LogWarning("Failed to query node %s, continuing" % node)
6327 if instance_name in exportlist[node].data:
6329 result = self.rpc.call_export_remove(node, instance_name)
6330 if result.failed or not result.data:
6331 logging.error("Could not remove export for instance %s"
6332 " on node %s", instance_name, node)
6334 if fqdn_warn and not found:
6335 feedback_fn("Export not found. If trying to remove an export belonging"
6336 " to a deleted instance please use its Fully Qualified"
6340 class TagsLU(NoHooksLU):
6343 This is an abstract class which is the parent of all the other tags LUs.
6347 def ExpandNames(self):
6348 self.needed_locks = {}
6349 if self.op.kind == constants.TAG_NODE:
6350 name = self.cfg.ExpandNodeName(self.op.name)
6352 raise errors.OpPrereqError("Invalid node name (%s)" %
6355 self.needed_locks[locking.LEVEL_NODE] = name
6356 elif self.op.kind == constants.TAG_INSTANCE:
6357 name = self.cfg.ExpandInstanceName(self.op.name)
6359 raise errors.OpPrereqError("Invalid instance name (%s)" %
6362 self.needed_locks[locking.LEVEL_INSTANCE] = name
6364 def CheckPrereq(self):
6365 """Check prerequisites.
6368 if self.op.kind == constants.TAG_CLUSTER:
6369 self.target = self.cfg.GetClusterInfo()
6370 elif self.op.kind == constants.TAG_NODE:
6371 self.target = self.cfg.GetNodeInfo(self.op.name)
6372 elif self.op.kind == constants.TAG_INSTANCE:
6373 self.target = self.cfg.GetInstanceInfo(self.op.name)
6375 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6379 class LUGetTags(TagsLU):
6380 """Returns the tags of a given object.
6383 _OP_REQP = ["kind", "name"]
6386 def Exec(self, feedback_fn):
6387 """Returns the tag list.
6390 return list(self.target.GetTags())
6393 class LUSearchTags(NoHooksLU):
6394 """Searches the tags for a given pattern.
6397 _OP_REQP = ["pattern"]
6400 def ExpandNames(self):
6401 self.needed_locks = {}
6403 def CheckPrereq(self):
6404 """Check prerequisites.
6406 This checks the pattern passed for validity by compiling it.
6410 self.re = re.compile(self.op.pattern)
6411 except re.error, err:
6412 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6413 (self.op.pattern, err))
6415 def Exec(self, feedback_fn):
6416 """Returns the tag list.
6420 tgts = [("/cluster", cfg.GetClusterInfo())]
6421 ilist = cfg.GetAllInstancesInfo().values()
6422 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6423 nlist = cfg.GetAllNodesInfo().values()
6424 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6426 for path, target in tgts:
6427 for tag in target.GetTags():
6428 if self.re.search(tag):
6429 results.append((path, tag))
6433 class LUAddTags(TagsLU):
6434 """Sets a tag on a given object.
6437 _OP_REQP = ["kind", "name", "tags"]
6440 def CheckPrereq(self):
6441 """Check prerequisites.
6443 This checks the type and length of the tag name and value.
6446 TagsLU.CheckPrereq(self)
6447 for tag in self.op.tags:
6448 objects.TaggableObject.ValidateTag(tag)
6450 def Exec(self, feedback_fn):
6455 for tag in self.op.tags:
6456 self.target.AddTag(tag)
6457 except errors.TagError, err:
6458 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6460 self.cfg.Update(self.target)
6461 except errors.ConfigurationError:
6462 raise errors.OpRetryError("There has been a modification to the"
6463 " config file and the operation has been"
6464 " aborted. Please retry.")
6467 class LUDelTags(TagsLU):
6468 """Delete a list of tags from a given object.
6471 _OP_REQP = ["kind", "name", "tags"]
6474 def CheckPrereq(self):
6475 """Check prerequisites.
6477 This checks that we have the given tag.
6480 TagsLU.CheckPrereq(self)
6481 for tag in self.op.tags:
6482 objects.TaggableObject.ValidateTag(tag)
6483 del_tags = frozenset(self.op.tags)
6484 cur_tags = self.target.GetTags()
6485 if not del_tags <= cur_tags:
6486 diff_tags = del_tags - cur_tags
6487 diff_names = ["'%s'" % tag for tag in diff_tags]
6489 raise errors.OpPrereqError("Tag(s) %s not found" %
6490 (",".join(diff_names)))
6492 def Exec(self, feedback_fn):
6493 """Remove the tag from the object.
6496 for tag in self.op.tags:
6497 self.target.RemoveTag(tag)
6499 self.cfg.Update(self.target)
6500 except errors.ConfigurationError:
6501 raise errors.OpRetryError("There has been a modification to the"
6502 " config file and the operation has been"
6503 " aborted. Please retry.")
6506 class LUTestDelay(NoHooksLU):
6507 """Sleep for a specified amount of time.
6509 This LU sleeps on the master and/or nodes for a specified amount of
6513 _OP_REQP = ["duration", "on_master", "on_nodes"]
6516 def ExpandNames(self):
6517 """Expand names and set required locks.
6519 This expands the node list, if any.
6522 self.needed_locks = {}
6523 if self.op.on_nodes:
6524 # _GetWantedNodes can be used here, but is not always appropriate to use
6525 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6527 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6528 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6530 def CheckPrereq(self):
6531 """Check prerequisites.
6535 def Exec(self, feedback_fn):
6536 """Do the actual sleep.
6539 if self.op.on_master:
6540 if not utils.TestDelay(self.op.duration):
6541 raise errors.OpExecError("Error during master delay test")
6542 if self.op.on_nodes:
6543 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6545 raise errors.OpExecError("Complete failure from rpc call")
6546 for node, node_result in result.items():
6548 if not node_result.data:
6549 raise errors.OpExecError("Failure during rpc call to node %s,"
6550 " result: %s" % (node, node_result.data))
6553 class IAllocator(object):
6554 """IAllocator framework.
6556 An IAllocator instance has three sets of attributes:
6557 - cfg that is needed to query the cluster
6558 - input data (all members of the _KEYS class attribute are required)
6559 - four buffer attributes (in|out_data|text), that represent the
6560 input (to the external script) in text and data structure format,
6561 and the output from it, again in two formats
6562 - the result variables from the script (success, info, nodes) for
6567 "mem_size", "disks", "disk_template",
6568 "os", "tags", "nics", "vcpus", "hypervisor",
6574 def __init__(self, lu, mode, name, **kwargs):
6576 # init buffer variables
6577 self.in_text = self.out_text = self.in_data = self.out_data = None
6578 # init all input fields so that pylint is happy
6581 self.mem_size = self.disks = self.disk_template = None
6582 self.os = self.tags = self.nics = self.vcpus = None
6583 self.hypervisor = None
6584 self.relocate_from = None
6586 self.required_nodes = None
6587 # init result fields
6588 self.success = self.info = self.nodes = None
6589 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6590 keyset = self._ALLO_KEYS
6591 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6592 keyset = self._RELO_KEYS
6594 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6595 " IAllocator" % self.mode)
6597 if key not in keyset:
6598 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6599 " IAllocator" % key)
6600 setattr(self, key, kwargs[key])
6602 if key not in kwargs:
6603 raise errors.ProgrammerError("Missing input parameter '%s' to"
6604 " IAllocator" % key)
6605 self._BuildInputData()
6607 def _ComputeClusterData(self):
6608 """Compute the generic allocator input data.
6610 This is the data that is independent of the actual operation.
6614 cluster_info = cfg.GetClusterInfo()
6617 "version": constants.IALLOCATOR_VERSION,
6618 "cluster_name": cfg.GetClusterName(),
6619 "cluster_tags": list(cluster_info.GetTags()),
6620 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6621 # we don't have job IDs
6623 iinfo = cfg.GetAllInstancesInfo().values()
6624 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6628 node_list = cfg.GetNodeList()
6630 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6631 hypervisor_name = self.hypervisor
6632 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6633 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6635 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6637 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6638 cluster_info.enabled_hypervisors)
6639 for nname, nresult in node_data.items():
6640 # first fill in static (config-based) values
6641 ninfo = cfg.GetNodeInfo(nname)
6643 "tags": list(ninfo.GetTags()),
6644 "primary_ip": ninfo.primary_ip,
6645 "secondary_ip": ninfo.secondary_ip,
6646 "offline": ninfo.offline,
6647 "drained": ninfo.drained,
6648 "master_candidate": ninfo.master_candidate,
6651 if not ninfo.offline:
6653 if not isinstance(nresult.data, dict):
6654 raise errors.OpExecError("Can't get data for node %s" % nname)
6655 remote_info = nresult.data
6656 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6657 'vg_size', 'vg_free', 'cpu_total']:
6658 if attr not in remote_info:
6659 raise errors.OpExecError("Node '%s' didn't return attribute"
6660 " '%s'" % (nname, attr))
6662 remote_info[attr] = int(remote_info[attr])
6663 except ValueError, err:
6664 raise errors.OpExecError("Node '%s' returned invalid value"
6665 " for '%s': %s" % (nname, attr, err))
6666 # compute memory used by primary instances
6667 i_p_mem = i_p_up_mem = 0
6668 for iinfo, beinfo in i_list:
6669 if iinfo.primary_node == nname:
6670 i_p_mem += beinfo[constants.BE_MEMORY]
6671 if iinfo.name not in node_iinfo[nname].data:
6674 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6675 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6676 remote_info['memory_free'] -= max(0, i_mem_diff)
6679 i_p_up_mem += beinfo[constants.BE_MEMORY]
6681 # compute memory used by instances
6683 "total_memory": remote_info['memory_total'],
6684 "reserved_memory": remote_info['memory_dom0'],
6685 "free_memory": remote_info['memory_free'],
6686 "total_disk": remote_info['vg_size'],
6687 "free_disk": remote_info['vg_free'],
6688 "total_cpus": remote_info['cpu_total'],
6689 "i_pri_memory": i_p_mem,
6690 "i_pri_up_memory": i_p_up_mem,
6694 node_results[nname] = pnr
6695 data["nodes"] = node_results
6699 for iinfo, beinfo in i_list:
6700 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6701 for n in iinfo.nics]
6703 "tags": list(iinfo.GetTags()),
6704 "admin_up": iinfo.admin_up,
6705 "vcpus": beinfo[constants.BE_VCPUS],
6706 "memory": beinfo[constants.BE_MEMORY],
6708 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6710 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6711 "disk_template": iinfo.disk_template,
6712 "hypervisor": iinfo.hypervisor,
6714 instance_data[iinfo.name] = pir
6716 data["instances"] = instance_data
6720 def _AddNewInstance(self):
6721 """Add new instance data to allocator structure.
6723 This in combination with _AllocatorGetClusterData will create the
6724 correct structure needed as input for the allocator.
6726 The checks for the completeness of the opcode must have already been
6732 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6734 if self.disk_template in constants.DTS_NET_MIRROR:
6735 self.required_nodes = 2
6737 self.required_nodes = 1
6741 "disk_template": self.disk_template,
6744 "vcpus": self.vcpus,
6745 "memory": self.mem_size,
6746 "disks": self.disks,
6747 "disk_space_total": disk_space,
6749 "required_nodes": self.required_nodes,
6751 data["request"] = request
6753 def _AddRelocateInstance(self):
6754 """Add relocate instance data to allocator structure.
6756 This in combination with _IAllocatorGetClusterData will create the
6757 correct structure needed as input for the allocator.
6759 The checks for the completeness of the opcode must have already been
6763 instance = self.lu.cfg.GetInstanceInfo(self.name)
6764 if instance is None:
6765 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6766 " IAllocator" % self.name)
6768 if instance.disk_template not in constants.DTS_NET_MIRROR:
6769 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6771 if len(instance.secondary_nodes) != 1:
6772 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6774 self.required_nodes = 1
6775 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6776 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6781 "disk_space_total": disk_space,
6782 "required_nodes": self.required_nodes,
6783 "relocate_from": self.relocate_from,
6785 self.in_data["request"] = request
6787 def _BuildInputData(self):
6788 """Build input data structures.
6791 self._ComputeClusterData()
6793 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6794 self._AddNewInstance()
6796 self._AddRelocateInstance()
6798 self.in_text = serializer.Dump(self.in_data)
6800 def Run(self, name, validate=True, call_fn=None):
6801 """Run an instance allocator and return the results.
6805 call_fn = self.lu.rpc.call_iallocator_runner
6808 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6811 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6812 raise errors.OpExecError("Invalid result from master iallocator runner")
6814 rcode, stdout, stderr, fail = result.data
6816 if rcode == constants.IARUN_NOTFOUND:
6817 raise errors.OpExecError("Can't find allocator '%s'" % name)
6818 elif rcode == constants.IARUN_FAILURE:
6819 raise errors.OpExecError("Instance allocator call failed: %s,"
6820 " output: %s" % (fail, stdout+stderr))
6821 self.out_text = stdout
6823 self._ValidateResult()
6825 def _ValidateResult(self):
6826 """Process the allocator results.
6828 This will process and if successful save the result in
6829 self.out_data and the other parameters.
6833 rdict = serializer.Load(self.out_text)
6834 except Exception, err:
6835 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6837 if not isinstance(rdict, dict):
6838 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6840 for key in "success", "info", "nodes":
6841 if key not in rdict:
6842 raise errors.OpExecError("Can't parse iallocator results:"
6843 " missing key '%s'" % key)
6844 setattr(self, key, rdict[key])
6846 if not isinstance(rdict["nodes"], list):
6847 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6849 self.out_data = rdict
6852 class LUTestAllocator(NoHooksLU):
6853 """Run allocator tests.
6855 This LU runs the allocator tests
6858 _OP_REQP = ["direction", "mode", "name"]
6860 def CheckPrereq(self):
6861 """Check prerequisites.
6863 This checks the opcode parameters depending on the director and mode test.
6866 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6867 for attr in ["name", "mem_size", "disks", "disk_template",
6868 "os", "tags", "nics", "vcpus"]:
6869 if not hasattr(self.op, attr):
6870 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6872 iname = self.cfg.ExpandInstanceName(self.op.name)
6873 if iname is not None:
6874 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6876 if not isinstance(self.op.nics, list):
6877 raise errors.OpPrereqError("Invalid parameter 'nics'")
6878 for row in self.op.nics:
6879 if (not isinstance(row, dict) or
6882 "bridge" not in row):
6883 raise errors.OpPrereqError("Invalid contents of the"
6884 " 'nics' parameter")
6885 if not isinstance(self.op.disks, list):
6886 raise errors.OpPrereqError("Invalid parameter 'disks'")
6887 for row in self.op.disks:
6888 if (not isinstance(row, dict) or
6889 "size" not in row or
6890 not isinstance(row["size"], int) or
6891 "mode" not in row or
6892 row["mode"] not in ['r', 'w']):
6893 raise errors.OpPrereqError("Invalid contents of the"
6894 " 'disks' parameter")
6895 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6896 self.op.hypervisor = self.cfg.GetHypervisorType()
6897 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6898 if not hasattr(self.op, "name"):
6899 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6900 fname = self.cfg.ExpandInstanceName(self.op.name)
6902 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6904 self.op.name = fname
6905 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6907 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6910 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6911 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6912 raise errors.OpPrereqError("Missing allocator name")
6913 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6914 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6917 def Exec(self, feedback_fn):
6918 """Run the allocator test.
6921 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6922 ial = IAllocator(self,
6925 mem_size=self.op.mem_size,
6926 disks=self.op.disks,
6927 disk_template=self.op.disk_template,
6931 vcpus=self.op.vcpus,
6932 hypervisor=self.op.hypervisor,
6935 ial = IAllocator(self,
6938 relocate_from=list(self.relocate_from),
6941 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6942 result = ial.in_text
6944 ial.Run(self.op.allocator, validate=False)
6945 result = ial.out_text