4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the node is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _CheckNodeNotDrained(lu, node):
445 """Ensure that a given node is not drained.
447 @param lu: the LU on behalf of which we make the check
448 @param node: the node to check
449 @raise errors.OpPrereqError: if the node is drained
452 if lu.cfg.GetNodeInfo(node).drained:
453 raise errors.OpPrereqError("Can't use drained node %s" % node)
456 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457 memory, vcpus, nics, disk_template, disks):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @return: the hook environment for this instance
493 "INSTANCE_NAME": name,
494 "INSTANCE_PRIMARY": primary_node,
495 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496 "INSTANCE_OS_TYPE": os_type,
497 "INSTANCE_STATUS": str_status,
498 "INSTANCE_MEMORY": memory,
499 "INSTANCE_VCPUS": vcpus,
500 "INSTANCE_DISK_TEMPLATE": disk_template,
504 nic_count = len(nics)
505 for idx, (ip, bridge, mac) in enumerate(nics):
508 env["INSTANCE_NIC%d_IP" % idx] = ip
509 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510 env["INSTANCE_NIC%d_MAC" % idx] = mac
514 env["INSTANCE_NIC_COUNT"] = nic_count
517 disk_count = len(disks)
518 for idx, (size, mode) in enumerate(disks):
519 env["INSTANCE_DISK%d_SIZE" % idx] = size
520 env["INSTANCE_DISK%d_MODE" % idx] = mode
524 env["INSTANCE_DISK_COUNT"] = disk_count
529 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530 """Builds instance related env variables for hooks from an object.
532 @type lu: L{LogicalUnit}
533 @param lu: the logical unit on whose behalf we execute
534 @type instance: L{objects.Instance}
535 @param instance: the instance for which we should build the
538 @param override: dictionary with key/values that will override
541 @return: the hook environment dictionary
544 bep = lu.cfg.GetClusterInfo().FillBE(instance)
546 'name': instance.name,
547 'primary_node': instance.primary_node,
548 'secondary_nodes': instance.secondary_nodes,
549 'os_type': instance.os,
550 'status': instance.admin_up,
551 'memory': bep[constants.BE_MEMORY],
552 'vcpus': bep[constants.BE_VCPUS],
553 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554 'disk_template': instance.disk_template,
555 'disks': [(disk.size, disk.mode) for disk in instance.disks],
558 args.update(override)
559 return _BuildInstanceHookEnv(**args)
562 def _AdjustCandidatePool(lu):
563 """Adjust the candidate pool after node operations.
566 mod_list = lu.cfg.MaintainCandidatePool()
568 lu.LogInfo("Promoted nodes to master candidate role: %s",
569 ", ".join(node.name for node in mod_list))
570 for name in mod_list:
571 lu.context.ReaddNode(name)
572 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
574 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
578 def _CheckInstanceBridgesExist(lu, instance):
579 """Check that the brigdes needed by an instance exist.
582 # check bridges existance
583 brlist = [nic.bridge for nic in instance.nics]
584 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
587 raise errors.OpPrereqError("One or more target bridges %s does not"
588 " exist on destination node '%s'" %
589 (brlist, instance.primary_node))
592 class LUDestroyCluster(NoHooksLU):
593 """Logical unit for destroying the cluster.
598 def CheckPrereq(self):
599 """Check prerequisites.
601 This checks whether the cluster is empty.
603 Any errors are signalled by raising errors.OpPrereqError.
606 master = self.cfg.GetMasterNode()
608 nodelist = self.cfg.GetNodeList()
609 if len(nodelist) != 1 or nodelist[0] != master:
610 raise errors.OpPrereqError("There are still %d node(s) in"
611 " this cluster." % (len(nodelist) - 1))
612 instancelist = self.cfg.GetInstanceList()
614 raise errors.OpPrereqError("There are still %d instance(s) in"
615 " this cluster." % len(instancelist))
617 def Exec(self, feedback_fn):
618 """Destroys the cluster.
621 master = self.cfg.GetMasterNode()
622 result = self.rpc.call_node_stop_master(master, False)
625 raise errors.OpExecError("Could not disable the master role")
626 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627 utils.CreateBackup(priv_key)
628 utils.CreateBackup(pub_key)
632 class LUVerifyCluster(LogicalUnit):
633 """Verifies the cluster status.
636 HPATH = "cluster-verify"
637 HTYPE = constants.HTYPE_CLUSTER
638 _OP_REQP = ["skip_checks"]
641 def ExpandNames(self):
642 self.needed_locks = {
643 locking.LEVEL_NODE: locking.ALL_SET,
644 locking.LEVEL_INSTANCE: locking.ALL_SET,
646 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
648 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649 node_result, feedback_fn, master_files,
651 """Run multiple tests against a node.
655 - compares ganeti version
656 - checks vg existance and size > 20G
657 - checks config file checksum
658 - checks ssh to other nodes
660 @type nodeinfo: L{objects.Node}
661 @param nodeinfo: the node to check
662 @param file_list: required list of files
663 @param local_cksum: dictionary of local files and their checksums
664 @param node_result: the results from the node
665 @param feedback_fn: function used to accumulate results
666 @param master_files: list of files that only masters should have
667 @param drbd_map: the useddrbd minors for this node, in
668 form of minor: (instance, must_exist) which correspond to instances
669 and their running status
670 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
675 # main result, node_result should be a non-empty dict
676 if not node_result or not isinstance(node_result, dict):
677 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
680 # compares ganeti version
681 local_version = constants.PROTOCOL_VERSION
682 remote_version = node_result.get('version', None)
683 if not (remote_version and isinstance(remote_version, (list, tuple)) and
684 len(remote_version) == 2):
685 feedback_fn(" - ERROR: connection to %s failed" % (node))
688 if local_version != remote_version[0]:
689 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
690 " node %s %s" % (local_version, node, remote_version[0]))
693 # node seems compatible, we can actually try to look into its results
697 # full package version
698 if constants.RELEASE_VERSION != remote_version[1]:
699 feedback_fn(" - WARNING: software version mismatch: master %s,"
701 (constants.RELEASE_VERSION, node, remote_version[1]))
703 # checks vg existence and size > 20G
704 if vg_name is not None:
705 vglist = node_result.get(constants.NV_VGLIST, None)
707 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
711 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712 constants.MIN_VG_SIZE)
714 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
717 # checks config file checksum
719 remote_cksum = node_result.get(constants.NV_FILELIST, None)
720 if not isinstance(remote_cksum, dict):
722 feedback_fn(" - ERROR: node hasn't returned file checksum data")
724 for file_name in file_list:
725 node_is_mc = nodeinfo.master_candidate
726 must_have_file = file_name not in master_files
727 if file_name not in remote_cksum:
728 if node_is_mc or must_have_file:
730 feedback_fn(" - ERROR: file '%s' missing" % file_name)
731 elif remote_cksum[file_name] != local_cksum[file_name]:
732 if node_is_mc or must_have_file:
734 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
736 # not candidate and this is not a must-have file
738 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
741 # all good, except non-master/non-must have combination
742 if not node_is_mc and not must_have_file:
743 feedback_fn(" - ERROR: file '%s' should not exist on non master"
744 " candidates" % file_name)
748 if constants.NV_NODELIST not in node_result:
750 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
752 if node_result[constants.NV_NODELIST]:
754 for node in node_result[constants.NV_NODELIST]:
755 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
756 (node, node_result[constants.NV_NODELIST][node]))
758 if constants.NV_NODENETTEST not in node_result:
760 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
762 if node_result[constants.NV_NODENETTEST]:
764 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
766 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
767 (node, node_result[constants.NV_NODENETTEST][node]))
769 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770 if isinstance(hyp_result, dict):
771 for hv_name, hv_result in hyp_result.iteritems():
772 if hv_result is not None:
773 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
774 (hv_name, hv_result))
776 # check used drbd list
777 if vg_name is not None:
778 used_minors = node_result.get(constants.NV_DRBDLIST, [])
779 if not isinstance(used_minors, (tuple, list)):
780 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
783 for minor, (iname, must_exist) in drbd_map.items():
784 if minor not in used_minors and must_exist:
785 feedback_fn(" - ERROR: drbd minor %d of instance %s is not active" %
788 for minor in used_minors:
789 if minor not in drbd_map:
790 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % minor)
795 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796 node_instance, feedback_fn, n_offline):
797 """Verify an instance.
799 This function checks to see if the required block devices are
800 available on the instance's node.
805 node_current = instanceconfig.primary_node
808 instanceconfig.MapLVsByNode(node_vol_should)
810 for node in node_vol_should:
811 if node in n_offline:
812 # ignore missing volumes on offline nodes
814 for volume in node_vol_should[node]:
815 if node not in node_vol_is or volume not in node_vol_is[node]:
816 feedback_fn(" - ERROR: volume %s missing on node %s" %
820 if instanceconfig.admin_up:
821 if ((node_current not in node_instance or
822 not instance in node_instance[node_current]) and
823 node_current not in n_offline):
824 feedback_fn(" - ERROR: instance %s not running on node %s" %
825 (instance, node_current))
828 for node in node_instance:
829 if (not node == node_current):
830 if instance in node_instance[node]:
831 feedback_fn(" - ERROR: instance %s should not run on node %s" %
837 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838 """Verify if there are any unknown volumes in the cluster.
840 The .os, .swap and backup volumes are ignored. All other volumes are
846 for node in node_vol_is:
847 for volume in node_vol_is[node]:
848 if node not in node_vol_should or volume not in node_vol_should[node]:
849 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
854 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855 """Verify the list of running instances.
857 This checks what instances are running but unknown to the cluster.
861 for node in node_instance:
862 for runninginstance in node_instance[node]:
863 if runninginstance not in instancelist:
864 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
865 (runninginstance, node))
869 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870 """Verify N+1 Memory Resilience.
872 Check that if one single node dies we can still start all the instances it
878 for node, nodeinfo in node_info.iteritems():
879 # This code checks that every node which is now listed as secondary has
880 # enough memory to host all instances it is supposed to should a single
881 # other node in the cluster fail.
882 # FIXME: not ready for failover to an arbitrary node
883 # FIXME: does not support file-backed instances
884 # WARNING: we currently take into account down instances as well as up
885 # ones, considering that even if they're down someone might want to start
886 # them even in the event of a node failure.
887 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889 for instance in instances:
890 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891 if bep[constants.BE_AUTO_BALANCE]:
892 needed_mem += bep[constants.BE_MEMORY]
893 if nodeinfo['mfree'] < needed_mem:
894 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
895 " failovers should node %s fail" % (node, prinode))
899 def CheckPrereq(self):
900 """Check prerequisites.
902 Transform the list of checks we're going to skip into a set and check that
903 all its members are valid.
906 self.skip_set = frozenset(self.op.skip_checks)
907 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908 raise errors.OpPrereqError("Invalid checks to be skipped specified")
910 def BuildHooksEnv(self):
913 Cluster-Verify hooks just rone in the post phase and their failure makes
914 the output be logged in the verify output and the verification to fail.
917 all_nodes = self.cfg.GetNodeList()
918 # TODO: populate the environment with useful information for verify hooks
920 return env, [], all_nodes
922 def Exec(self, feedback_fn):
923 """Verify integrity of cluster, performing various test on nodes.
927 feedback_fn("* Verifying global settings")
928 for msg in self.cfg.VerifyConfig():
929 feedback_fn(" - ERROR: %s" % msg)
931 vg_name = self.cfg.GetVGName()
932 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
933 nodelist = utils.NiceSort(self.cfg.GetNodeList())
934 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
935 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
936 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
937 for iname in instancelist)
938 i_non_redundant = [] # Non redundant instances
939 i_non_a_balanced = [] # Non auto-balanced instances
940 n_offline = [] # List of offline nodes
941 n_drained = [] # List of nodes being drained
947 # FIXME: verify OS list
949 master_files = [constants.CLUSTER_CONF_FILE]
951 file_names = ssconf.SimpleStore().GetFileList()
952 file_names.append(constants.SSL_CERT_FILE)
953 file_names.append(constants.RAPI_CERT_FILE)
954 file_names.extend(master_files)
956 local_checksums = utils.FingerprintFiles(file_names)
958 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
959 node_verify_param = {
960 constants.NV_FILELIST: file_names,
961 constants.NV_NODELIST: [node.name for node in nodeinfo
962 if not node.offline],
963 constants.NV_HYPERVISOR: hypervisors,
964 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
965 node.secondary_ip) for node in nodeinfo
966 if not node.offline],
967 constants.NV_INSTANCELIST: hypervisors,
968 constants.NV_VERSION: None,
969 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
971 if vg_name is not None:
972 node_verify_param[constants.NV_VGLIST] = None
973 node_verify_param[constants.NV_LVLIST] = vg_name
974 node_verify_param[constants.NV_DRBDLIST] = None
975 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
976 self.cfg.GetClusterName())
978 cluster = self.cfg.GetClusterInfo()
979 master_node = self.cfg.GetMasterNode()
980 all_drbd_map = self.cfg.ComputeDRBDMap()
982 for node_i in nodeinfo:
984 nresult = all_nvinfo[node].data
987 feedback_fn("* Skipping offline node %s" % (node,))
988 n_offline.append(node)
991 if node == master_node:
993 elif node_i.master_candidate:
994 ntype = "master candidate"
997 n_drained.append(node)
1000 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1002 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1003 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1008 for minor, instance in all_drbd_map[node].items():
1009 instance = instanceinfo[instance]
1010 node_drbd[minor] = (instance.name, instance.admin_up)
1011 result = self._VerifyNode(node_i, file_names, local_checksums,
1012 nresult, feedback_fn, master_files,
1016 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1018 node_volume[node] = {}
1019 elif isinstance(lvdata, basestring):
1020 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1021 (node, utils.SafeEncode(lvdata)))
1023 node_volume[node] = {}
1024 elif not isinstance(lvdata, dict):
1025 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1029 node_volume[node] = lvdata
1032 idata = nresult.get(constants.NV_INSTANCELIST, None)
1033 if not isinstance(idata, list):
1034 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1039 node_instance[node] = idata
1042 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1043 if not isinstance(nodeinfo, dict):
1044 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1050 "mfree": int(nodeinfo['memory_free']),
1053 # dictionary holding all instances this node is secondary for,
1054 # grouped by their primary node. Each key is a cluster node, and each
1055 # value is a list of instances which have the key as primary and the
1056 # current node as secondary. this is handy to calculate N+1 memory
1057 # availability if you can only failover from a primary to its
1059 "sinst-by-pnode": {},
1061 # FIXME: devise a free space model for file based instances as well
1062 if vg_name is not None:
1063 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1065 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
1069 node_vol_should = {}
1071 for instance in instancelist:
1072 feedback_fn("* Verifying instance %s" % instance)
1073 inst_config = instanceinfo[instance]
1074 result = self._VerifyInstance(instance, inst_config, node_volume,
1075 node_instance, feedback_fn, n_offline)
1077 inst_nodes_offline = []
1079 inst_config.MapLVsByNode(node_vol_should)
1081 instance_cfg[instance] = inst_config
1083 pnode = inst_config.primary_node
1084 if pnode in node_info:
1085 node_info[pnode]['pinst'].append(instance)
1086 elif pnode not in n_offline:
1087 feedback_fn(" - ERROR: instance %s, connection to primary node"
1088 " %s failed" % (instance, pnode))
1091 if pnode in n_offline:
1092 inst_nodes_offline.append(pnode)
1094 # If the instance is non-redundant we cannot survive losing its primary
1095 # node, so we are not N+1 compliant. On the other hand we have no disk
1096 # templates with more than one secondary so that situation is not well
1098 # FIXME: does not support file-backed instances
1099 if len(inst_config.secondary_nodes) == 0:
1100 i_non_redundant.append(instance)
1101 elif len(inst_config.secondary_nodes) > 1:
1102 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1105 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1106 i_non_a_balanced.append(instance)
1108 for snode in inst_config.secondary_nodes:
1109 if snode in node_info:
1110 node_info[snode]['sinst'].append(instance)
1111 if pnode not in node_info[snode]['sinst-by-pnode']:
1112 node_info[snode]['sinst-by-pnode'][pnode] = []
1113 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1114 elif snode not in n_offline:
1115 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1116 " %s failed" % (instance, snode))
1118 if snode in n_offline:
1119 inst_nodes_offline.append(snode)
1121 if inst_nodes_offline:
1122 # warn that the instance lives on offline nodes, and set bad=True
1123 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1124 ", ".join(inst_nodes_offline))
1127 feedback_fn("* Verifying orphan volumes")
1128 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1132 feedback_fn("* Verifying remaining instances")
1133 result = self._VerifyOrphanInstances(instancelist, node_instance,
1137 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1138 feedback_fn("* Verifying N+1 Memory redundancy")
1139 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1142 feedback_fn("* Other Notes")
1144 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1145 % len(i_non_redundant))
1147 if i_non_a_balanced:
1148 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1149 % len(i_non_a_balanced))
1152 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1155 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1159 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1160 """Analize the post-hooks' result
1162 This method analyses the hook result, handles it, and sends some
1163 nicely-formatted feedback back to the user.
1165 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1166 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1167 @param hooks_results: the results of the multi-node hooks rpc call
1168 @param feedback_fn: function used send feedback back to the caller
1169 @param lu_result: previous Exec result
1170 @return: the new Exec result, based on the previous result
1174 # We only really run POST phase hooks, and are only interested in
1176 if phase == constants.HOOKS_PHASE_POST:
1177 # Used to change hooks' output to proper indentation
1178 indent_re = re.compile('^', re.M)
1179 feedback_fn("* Hooks Results")
1180 if not hooks_results:
1181 feedback_fn(" - ERROR: general communication failure")
1184 for node_name in hooks_results:
1185 show_node_header = True
1186 res = hooks_results[node_name]
1187 if res.failed or res.data is False or not isinstance(res.data, list):
1189 # no need to warn or set fail return value
1191 feedback_fn(" Communication failure in hooks execution")
1194 for script, hkr, output in res.data:
1195 if hkr == constants.HKR_FAIL:
1196 # The node header is only shown once, if there are
1197 # failing hooks on that node
1198 if show_node_header:
1199 feedback_fn(" Node %s:" % node_name)
1200 show_node_header = False
1201 feedback_fn(" ERROR: Script %s failed, output:" % script)
1202 output = indent_re.sub(' ', output)
1203 feedback_fn("%s" % output)
1209 class LUVerifyDisks(NoHooksLU):
1210 """Verifies the cluster disks status.
1216 def ExpandNames(self):
1217 self.needed_locks = {
1218 locking.LEVEL_NODE: locking.ALL_SET,
1219 locking.LEVEL_INSTANCE: locking.ALL_SET,
1221 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1223 def CheckPrereq(self):
1224 """Check prerequisites.
1226 This has no prerequisites.
1231 def Exec(self, feedback_fn):
1232 """Verify integrity of cluster disks.
1235 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1237 vg_name = self.cfg.GetVGName()
1238 nodes = utils.NiceSort(self.cfg.GetNodeList())
1239 instances = [self.cfg.GetInstanceInfo(name)
1240 for name in self.cfg.GetInstanceList()]
1243 for inst in instances:
1245 if (not inst.admin_up or
1246 inst.disk_template not in constants.DTS_NET_MIRROR):
1248 inst.MapLVsByNode(inst_lvs)
1249 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1250 for node, vol_list in inst_lvs.iteritems():
1251 for vol in vol_list:
1252 nv_dict[(node, vol)] = inst
1257 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1262 lvs = node_lvs[node]
1265 self.LogWarning("Connection to node %s failed: %s" %
1269 if isinstance(lvs, basestring):
1270 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1271 res_nlvm[node] = lvs
1272 elif not isinstance(lvs, dict):
1273 logging.warning("Connection to node %s failed or invalid data"
1275 res_nodes.append(node)
1278 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1279 inst = nv_dict.pop((node, lv_name), None)
1280 if (not lv_online and inst is not None
1281 and inst.name not in res_instances):
1282 res_instances.append(inst.name)
1284 # any leftover items in nv_dict are missing LVs, let's arrange the
1286 for key, inst in nv_dict.iteritems():
1287 if inst.name not in res_missing:
1288 res_missing[inst.name] = []
1289 res_missing[inst.name].append(key)
1294 class LURenameCluster(LogicalUnit):
1295 """Rename the cluster.
1298 HPATH = "cluster-rename"
1299 HTYPE = constants.HTYPE_CLUSTER
1302 def BuildHooksEnv(self):
1307 "OP_TARGET": self.cfg.GetClusterName(),
1308 "NEW_NAME": self.op.name,
1310 mn = self.cfg.GetMasterNode()
1311 return env, [mn], [mn]
1313 def CheckPrereq(self):
1314 """Verify that the passed name is a valid one.
1317 hostname = utils.HostInfo(self.op.name)
1319 new_name = hostname.name
1320 self.ip = new_ip = hostname.ip
1321 old_name = self.cfg.GetClusterName()
1322 old_ip = self.cfg.GetMasterIP()
1323 if new_name == old_name and new_ip == old_ip:
1324 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1325 " cluster has changed")
1326 if new_ip != old_ip:
1327 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1328 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1329 " reachable on the network. Aborting." %
1332 self.op.name = new_name
1334 def Exec(self, feedback_fn):
1335 """Rename the cluster.
1338 clustername = self.op.name
1341 # shutdown the master IP
1342 master = self.cfg.GetMasterNode()
1343 result = self.rpc.call_node_stop_master(master, False)
1344 if result.failed or not result.data:
1345 raise errors.OpExecError("Could not disable the master role")
1348 cluster = self.cfg.GetClusterInfo()
1349 cluster.cluster_name = clustername
1350 cluster.master_ip = ip
1351 self.cfg.Update(cluster)
1353 # update the known hosts file
1354 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1355 node_list = self.cfg.GetNodeList()
1357 node_list.remove(master)
1360 result = self.rpc.call_upload_file(node_list,
1361 constants.SSH_KNOWN_HOSTS_FILE)
1362 for to_node, to_result in result.iteritems():
1363 if to_result.failed or not to_result.data:
1364 logging.error("Copy of file %s to node %s failed",
1365 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1368 result = self.rpc.call_node_start_master(master, False)
1369 if result.failed or not result.data:
1370 self.LogWarning("Could not re-enable the master role on"
1371 " the master, please restart manually.")
1374 def _RecursiveCheckIfLVMBased(disk):
1375 """Check if the given disk or its children are lvm-based.
1377 @type disk: L{objects.Disk}
1378 @param disk: the disk to check
1380 @return: boolean indicating whether a LD_LV dev_type was found or not
1384 for chdisk in disk.children:
1385 if _RecursiveCheckIfLVMBased(chdisk):
1387 return disk.dev_type == constants.LD_LV
1390 class LUSetClusterParams(LogicalUnit):
1391 """Change the parameters of the cluster.
1394 HPATH = "cluster-modify"
1395 HTYPE = constants.HTYPE_CLUSTER
1399 def CheckParameters(self):
1403 if not hasattr(self.op, "candidate_pool_size"):
1404 self.op.candidate_pool_size = None
1405 if self.op.candidate_pool_size is not None:
1407 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1408 except ValueError, err:
1409 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1411 if self.op.candidate_pool_size < 1:
1412 raise errors.OpPrereqError("At least one master candidate needed")
1414 def ExpandNames(self):
1415 # FIXME: in the future maybe other cluster params won't require checking on
1416 # all nodes to be modified.
1417 self.needed_locks = {
1418 locking.LEVEL_NODE: locking.ALL_SET,
1420 self.share_locks[locking.LEVEL_NODE] = 1
1422 def BuildHooksEnv(self):
1427 "OP_TARGET": self.cfg.GetClusterName(),
1428 "NEW_VG_NAME": self.op.vg_name,
1430 mn = self.cfg.GetMasterNode()
1431 return env, [mn], [mn]
1433 def CheckPrereq(self):
1434 """Check prerequisites.
1436 This checks whether the given params don't conflict and
1437 if the given volume group is valid.
1440 if self.op.vg_name is not None and not self.op.vg_name:
1441 instances = self.cfg.GetAllInstancesInfo().values()
1442 for inst in instances:
1443 for disk in inst.disks:
1444 if _RecursiveCheckIfLVMBased(disk):
1445 raise errors.OpPrereqError("Cannot disable lvm storage while"
1446 " lvm-based instances exist")
1448 node_list = self.acquired_locks[locking.LEVEL_NODE]
1450 # if vg_name not None, checks given volume group on all nodes
1452 vglist = self.rpc.call_vg_list(node_list)
1453 for node in node_list:
1454 if vglist[node].failed:
1455 # ignoring down node
1456 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1458 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1460 constants.MIN_VG_SIZE)
1462 raise errors.OpPrereqError("Error on node '%s': %s" %
1465 self.cluster = cluster = self.cfg.GetClusterInfo()
1466 # validate beparams changes
1467 if self.op.beparams:
1468 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1469 self.new_beparams = cluster.FillDict(
1470 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1472 # hypervisor list/parameters
1473 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1474 if self.op.hvparams:
1475 if not isinstance(self.op.hvparams, dict):
1476 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1477 for hv_name, hv_dict in self.op.hvparams.items():
1478 if hv_name not in self.new_hvparams:
1479 self.new_hvparams[hv_name] = hv_dict
1481 self.new_hvparams[hv_name].update(hv_dict)
1483 if self.op.enabled_hypervisors is not None:
1484 self.hv_list = self.op.enabled_hypervisors
1486 self.hv_list = cluster.enabled_hypervisors
1488 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1489 # either the enabled list has changed, or the parameters have, validate
1490 for hv_name, hv_params in self.new_hvparams.items():
1491 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1492 (self.op.enabled_hypervisors and
1493 hv_name in self.op.enabled_hypervisors)):
1494 # either this is a new hypervisor, or its parameters have changed
1495 hv_class = hypervisor.GetHypervisor(hv_name)
1496 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1497 hv_class.CheckParameterSyntax(hv_params)
1498 _CheckHVParams(self, node_list, hv_name, hv_params)
1500 def Exec(self, feedback_fn):
1501 """Change the parameters of the cluster.
1504 if self.op.vg_name is not None:
1505 if self.op.vg_name != self.cfg.GetVGName():
1506 self.cfg.SetVGName(self.op.vg_name)
1508 feedback_fn("Cluster LVM configuration already in desired"
1509 " state, not changing")
1510 if self.op.hvparams:
1511 self.cluster.hvparams = self.new_hvparams
1512 if self.op.enabled_hypervisors is not None:
1513 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1514 if self.op.beparams:
1515 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1516 if self.op.candidate_pool_size is not None:
1517 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1519 self.cfg.Update(self.cluster)
1521 # we want to update nodes after the cluster so that if any errors
1522 # happen, we have recorded and saved the cluster info
1523 if self.op.candidate_pool_size is not None:
1524 _AdjustCandidatePool(self)
1527 class LURedistributeConfig(NoHooksLU):
1528 """Force the redistribution of cluster configuration.
1530 This is a very simple LU.
1536 def ExpandNames(self):
1537 self.needed_locks = {
1538 locking.LEVEL_NODE: locking.ALL_SET,
1540 self.share_locks[locking.LEVEL_NODE] = 1
1542 def CheckPrereq(self):
1543 """Check prerequisites.
1547 def Exec(self, feedback_fn):
1548 """Redistribute the configuration.
1551 self.cfg.Update(self.cfg.GetClusterInfo())
1554 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1555 """Sleep and poll for an instance's disk to sync.
1558 if not instance.disks:
1562 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1564 node = instance.primary_node
1566 for dev in instance.disks:
1567 lu.cfg.SetDiskID(dev, node)
1573 cumul_degraded = False
1574 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1575 if rstats.failed or not rstats.data:
1576 lu.LogWarning("Can't get any data from node %s", node)
1579 raise errors.RemoteError("Can't contact node %s for mirror data,"
1580 " aborting." % node)
1583 rstats = rstats.data
1585 for i, mstat in enumerate(rstats):
1587 lu.LogWarning("Can't compute data for node %s/%s",
1588 node, instance.disks[i].iv_name)
1590 # we ignore the ldisk parameter
1591 perc_done, est_time, is_degraded, _ = mstat
1592 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1593 if perc_done is not None:
1595 if est_time is not None:
1596 rem_time = "%d estimated seconds remaining" % est_time
1599 rem_time = "no time estimate"
1600 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1601 (instance.disks[i].iv_name, perc_done, rem_time))
1605 time.sleep(min(60, max_time))
1608 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1609 return not cumul_degraded
1612 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1613 """Check that mirrors are not degraded.
1615 The ldisk parameter, if True, will change the test from the
1616 is_degraded attribute (which represents overall non-ok status for
1617 the device(s)) to the ldisk (representing the local storage status).
1620 lu.cfg.SetDiskID(dev, node)
1627 if on_primary or dev.AssembleOnSecondary():
1628 rstats = lu.rpc.call_blockdev_find(node, dev)
1629 msg = rstats.RemoteFailMsg()
1631 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1633 elif not rstats.payload:
1634 lu.LogWarning("Can't find disk on node %s", node)
1637 result = result and (not rstats.payload[idx])
1639 for child in dev.children:
1640 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1645 class LUDiagnoseOS(NoHooksLU):
1646 """Logical unit for OS diagnose/query.
1649 _OP_REQP = ["output_fields", "names"]
1651 _FIELDS_STATIC = utils.FieldSet()
1652 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1654 def ExpandNames(self):
1656 raise errors.OpPrereqError("Selective OS query not supported")
1658 _CheckOutputFields(static=self._FIELDS_STATIC,
1659 dynamic=self._FIELDS_DYNAMIC,
1660 selected=self.op.output_fields)
1662 # Lock all nodes, in shared mode
1663 self.needed_locks = {}
1664 self.share_locks[locking.LEVEL_NODE] = 1
1665 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1667 def CheckPrereq(self):
1668 """Check prerequisites.
1673 def _DiagnoseByOS(node_list, rlist):
1674 """Remaps a per-node return list into an a per-os per-node dictionary
1676 @param node_list: a list with the names of all nodes
1677 @param rlist: a map with node names as keys and OS objects as values
1680 @return: a dictionary with osnames as keys and as value another map, with
1681 nodes as keys and list of OS objects as values, eg::
1683 {"debian-etch": {"node1": [<object>,...],
1684 "node2": [<object>,]}
1689 for node_name, nr in rlist.iteritems():
1690 if nr.failed or not nr.data:
1692 for os_obj in nr.data:
1693 if os_obj.name not in all_os:
1694 # build a list of nodes for this os containing empty lists
1695 # for each node in node_list
1696 all_os[os_obj.name] = {}
1697 for nname in node_list:
1698 all_os[os_obj.name][nname] = []
1699 all_os[os_obj.name][node_name].append(os_obj)
1702 def Exec(self, feedback_fn):
1703 """Compute the list of OSes.
1706 node_list = self.acquired_locks[locking.LEVEL_NODE]
1707 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1708 if node in node_list]
1709 node_data = self.rpc.call_os_diagnose(valid_nodes)
1710 if node_data == False:
1711 raise errors.OpExecError("Can't gather the list of OSes")
1712 pol = self._DiagnoseByOS(valid_nodes, node_data)
1714 for os_name, os_data in pol.iteritems():
1716 for field in self.op.output_fields:
1719 elif field == "valid":
1720 val = utils.all([osl and osl[0] for osl in os_data.values()])
1721 elif field == "node_status":
1723 for node_name, nos_list in os_data.iteritems():
1724 val[node_name] = [(v.status, v.path) for v in nos_list]
1726 raise errors.ParameterError(field)
1733 class LURemoveNode(LogicalUnit):
1734 """Logical unit for removing a node.
1737 HPATH = "node-remove"
1738 HTYPE = constants.HTYPE_NODE
1739 _OP_REQP = ["node_name"]
1741 def BuildHooksEnv(self):
1744 This doesn't run on the target node in the pre phase as a failed
1745 node would then be impossible to remove.
1749 "OP_TARGET": self.op.node_name,
1750 "NODE_NAME": self.op.node_name,
1752 all_nodes = self.cfg.GetNodeList()
1753 all_nodes.remove(self.op.node_name)
1754 return env, all_nodes, all_nodes
1756 def CheckPrereq(self):
1757 """Check prerequisites.
1760 - the node exists in the configuration
1761 - it does not have primary or secondary instances
1762 - it's not the master
1764 Any errors are signalled by raising errors.OpPrereqError.
1767 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1769 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1771 instance_list = self.cfg.GetInstanceList()
1773 masternode = self.cfg.GetMasterNode()
1774 if node.name == masternode:
1775 raise errors.OpPrereqError("Node is the master node,"
1776 " you need to failover first.")
1778 for instance_name in instance_list:
1779 instance = self.cfg.GetInstanceInfo(instance_name)
1780 if node.name in instance.all_nodes:
1781 raise errors.OpPrereqError("Instance %s is still running on the node,"
1782 " please remove first." % instance_name)
1783 self.op.node_name = node.name
1786 def Exec(self, feedback_fn):
1787 """Removes the node from the cluster.
1791 logging.info("Stopping the node daemon and removing configs from node %s",
1794 self.context.RemoveNode(node.name)
1796 self.rpc.call_node_leave_cluster(node.name)
1798 # Promote nodes to master candidate as needed
1799 _AdjustCandidatePool(self)
1802 class LUQueryNodes(NoHooksLU):
1803 """Logical unit for querying nodes.
1806 _OP_REQP = ["output_fields", "names", "use_locking"]
1808 _FIELDS_DYNAMIC = utils.FieldSet(
1810 "mtotal", "mnode", "mfree",
1812 "ctotal", "cnodes", "csockets",
1815 _FIELDS_STATIC = utils.FieldSet(
1816 "name", "pinst_cnt", "sinst_cnt",
1817 "pinst_list", "sinst_list",
1818 "pip", "sip", "tags",
1826 def ExpandNames(self):
1827 _CheckOutputFields(static=self._FIELDS_STATIC,
1828 dynamic=self._FIELDS_DYNAMIC,
1829 selected=self.op.output_fields)
1831 self.needed_locks = {}
1832 self.share_locks[locking.LEVEL_NODE] = 1
1835 self.wanted = _GetWantedNodes(self, self.op.names)
1837 self.wanted = locking.ALL_SET
1839 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1840 self.do_locking = self.do_node_query and self.op.use_locking
1842 # if we don't request only static fields, we need to lock the nodes
1843 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1846 def CheckPrereq(self):
1847 """Check prerequisites.
1850 # The validation of the node list is done in the _GetWantedNodes,
1851 # if non empty, and if empty, there's no validation to do
1854 def Exec(self, feedback_fn):
1855 """Computes the list of nodes and their attributes.
1858 all_info = self.cfg.GetAllNodesInfo()
1860 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1861 elif self.wanted != locking.ALL_SET:
1862 nodenames = self.wanted
1863 missing = set(nodenames).difference(all_info.keys())
1865 raise errors.OpExecError(
1866 "Some nodes were removed before retrieving their data: %s" % missing)
1868 nodenames = all_info.keys()
1870 nodenames = utils.NiceSort(nodenames)
1871 nodelist = [all_info[name] for name in nodenames]
1873 # begin data gathering
1875 if self.do_node_query:
1877 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1878 self.cfg.GetHypervisorType())
1879 for name in nodenames:
1880 nodeinfo = node_data[name]
1881 if not nodeinfo.failed and nodeinfo.data:
1882 nodeinfo = nodeinfo.data
1883 fn = utils.TryConvert
1885 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1886 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1887 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1888 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1889 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1890 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1891 "bootid": nodeinfo.get('bootid', None),
1892 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1893 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1896 live_data[name] = {}
1898 live_data = dict.fromkeys(nodenames, {})
1900 node_to_primary = dict([(name, set()) for name in nodenames])
1901 node_to_secondary = dict([(name, set()) for name in nodenames])
1903 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1904 "sinst_cnt", "sinst_list"))
1905 if inst_fields & frozenset(self.op.output_fields):
1906 instancelist = self.cfg.GetInstanceList()
1908 for instance_name in instancelist:
1909 inst = self.cfg.GetInstanceInfo(instance_name)
1910 if inst.primary_node in node_to_primary:
1911 node_to_primary[inst.primary_node].add(inst.name)
1912 for secnode in inst.secondary_nodes:
1913 if secnode in node_to_secondary:
1914 node_to_secondary[secnode].add(inst.name)
1916 master_node = self.cfg.GetMasterNode()
1918 # end data gathering
1921 for node in nodelist:
1923 for field in self.op.output_fields:
1926 elif field == "pinst_list":
1927 val = list(node_to_primary[node.name])
1928 elif field == "sinst_list":
1929 val = list(node_to_secondary[node.name])
1930 elif field == "pinst_cnt":
1931 val = len(node_to_primary[node.name])
1932 elif field == "sinst_cnt":
1933 val = len(node_to_secondary[node.name])
1934 elif field == "pip":
1935 val = node.primary_ip
1936 elif field == "sip":
1937 val = node.secondary_ip
1938 elif field == "tags":
1939 val = list(node.GetTags())
1940 elif field == "serial_no":
1941 val = node.serial_no
1942 elif field == "master_candidate":
1943 val = node.master_candidate
1944 elif field == "master":
1945 val = node.name == master_node
1946 elif field == "offline":
1948 elif field == "drained":
1950 elif self._FIELDS_DYNAMIC.Matches(field):
1951 val = live_data[node.name].get(field, None)
1953 raise errors.ParameterError(field)
1954 node_output.append(val)
1955 output.append(node_output)
1960 class LUQueryNodeVolumes(NoHooksLU):
1961 """Logical unit for getting volumes on node(s).
1964 _OP_REQP = ["nodes", "output_fields"]
1966 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1967 _FIELDS_STATIC = utils.FieldSet("node")
1969 def ExpandNames(self):
1970 _CheckOutputFields(static=self._FIELDS_STATIC,
1971 dynamic=self._FIELDS_DYNAMIC,
1972 selected=self.op.output_fields)
1974 self.needed_locks = {}
1975 self.share_locks[locking.LEVEL_NODE] = 1
1976 if not self.op.nodes:
1977 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1979 self.needed_locks[locking.LEVEL_NODE] = \
1980 _GetWantedNodes(self, self.op.nodes)
1982 def CheckPrereq(self):
1983 """Check prerequisites.
1985 This checks that the fields required are valid output fields.
1988 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1990 def Exec(self, feedback_fn):
1991 """Computes the list of nodes and their attributes.
1994 nodenames = self.nodes
1995 volumes = self.rpc.call_node_volumes(nodenames)
1997 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1998 in self.cfg.GetInstanceList()]
2000 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2003 for node in nodenames:
2004 if node not in volumes or volumes[node].failed or not volumes[node].data:
2007 node_vols = volumes[node].data[:]
2008 node_vols.sort(key=lambda vol: vol['dev'])
2010 for vol in node_vols:
2012 for field in self.op.output_fields:
2015 elif field == "phys":
2019 elif field == "name":
2021 elif field == "size":
2022 val = int(float(vol['size']))
2023 elif field == "instance":
2025 if node not in lv_by_node[inst]:
2027 if vol['name'] in lv_by_node[inst][node]:
2033 raise errors.ParameterError(field)
2034 node_output.append(str(val))
2036 output.append(node_output)
2041 class LUAddNode(LogicalUnit):
2042 """Logical unit for adding node to the cluster.
2046 HTYPE = constants.HTYPE_NODE
2047 _OP_REQP = ["node_name"]
2049 def BuildHooksEnv(self):
2052 This will run on all nodes before, and on all nodes + the new node after.
2056 "OP_TARGET": self.op.node_name,
2057 "NODE_NAME": self.op.node_name,
2058 "NODE_PIP": self.op.primary_ip,
2059 "NODE_SIP": self.op.secondary_ip,
2061 nodes_0 = self.cfg.GetNodeList()
2062 nodes_1 = nodes_0 + [self.op.node_name, ]
2063 return env, nodes_0, nodes_1
2065 def CheckPrereq(self):
2066 """Check prerequisites.
2069 - the new node is not already in the config
2071 - its parameters (single/dual homed) matches the cluster
2073 Any errors are signalled by raising errors.OpPrereqError.
2076 node_name = self.op.node_name
2079 dns_data = utils.HostInfo(node_name)
2081 node = dns_data.name
2082 primary_ip = self.op.primary_ip = dns_data.ip
2083 secondary_ip = getattr(self.op, "secondary_ip", None)
2084 if secondary_ip is None:
2085 secondary_ip = primary_ip
2086 if not utils.IsValidIP(secondary_ip):
2087 raise errors.OpPrereqError("Invalid secondary IP given")
2088 self.op.secondary_ip = secondary_ip
2090 node_list = cfg.GetNodeList()
2091 if not self.op.readd and node in node_list:
2092 raise errors.OpPrereqError("Node %s is already in the configuration" %
2094 elif self.op.readd and node not in node_list:
2095 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2097 for existing_node_name in node_list:
2098 existing_node = cfg.GetNodeInfo(existing_node_name)
2100 if self.op.readd and node == existing_node_name:
2101 if (existing_node.primary_ip != primary_ip or
2102 existing_node.secondary_ip != secondary_ip):
2103 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2104 " address configuration as before")
2107 if (existing_node.primary_ip == primary_ip or
2108 existing_node.secondary_ip == primary_ip or
2109 existing_node.primary_ip == secondary_ip or
2110 existing_node.secondary_ip == secondary_ip):
2111 raise errors.OpPrereqError("New node ip address(es) conflict with"
2112 " existing node %s" % existing_node.name)
2114 # check that the type of the node (single versus dual homed) is the
2115 # same as for the master
2116 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2117 master_singlehomed = myself.secondary_ip == myself.primary_ip
2118 newbie_singlehomed = secondary_ip == primary_ip
2119 if master_singlehomed != newbie_singlehomed:
2120 if master_singlehomed:
2121 raise errors.OpPrereqError("The master has no private ip but the"
2122 " new node has one")
2124 raise errors.OpPrereqError("The master has a private ip but the"
2125 " new node doesn't have one")
2127 # checks reachablity
2128 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2129 raise errors.OpPrereqError("Node not reachable by ping")
2131 if not newbie_singlehomed:
2132 # check reachability from my secondary ip to newbie's secondary ip
2133 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2134 source=myself.secondary_ip):
2135 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2136 " based ping to noded port")
2138 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2139 mc_now, _ = self.cfg.GetMasterCandidateStats()
2140 master_candidate = mc_now < cp_size
2142 self.new_node = objects.Node(name=node,
2143 primary_ip=primary_ip,
2144 secondary_ip=secondary_ip,
2145 master_candidate=master_candidate,
2146 offline=False, drained=False)
2148 def Exec(self, feedback_fn):
2149 """Adds the new node to the cluster.
2152 new_node = self.new_node
2153 node = new_node.name
2155 # check connectivity
2156 result = self.rpc.call_version([node])[node]
2159 if constants.PROTOCOL_VERSION == result.data:
2160 logging.info("Communication to node %s fine, sw version %s match",
2163 raise errors.OpExecError("Version mismatch master version %s,"
2164 " node version %s" %
2165 (constants.PROTOCOL_VERSION, result.data))
2167 raise errors.OpExecError("Cannot get version from the new node")
2170 logging.info("Copy ssh key to node %s", node)
2171 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2173 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2174 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2180 keyarray.append(f.read())
2184 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2186 keyarray[3], keyarray[4], keyarray[5])
2188 msg = result.RemoteFailMsg()
2190 raise errors.OpExecError("Cannot transfer ssh keys to the"
2191 " new node: %s" % msg)
2193 # Add node to our /etc/hosts, and add key to known_hosts
2194 utils.AddHostToEtcHosts(new_node.name)
2196 if new_node.secondary_ip != new_node.primary_ip:
2197 result = self.rpc.call_node_has_ip_address(new_node.name,
2198 new_node.secondary_ip)
2199 if result.failed or not result.data:
2200 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2201 " you gave (%s). Please fix and re-run this"
2202 " command." % new_node.secondary_ip)
2204 node_verify_list = [self.cfg.GetMasterNode()]
2205 node_verify_param = {
2207 # TODO: do a node-net-test as well?
2210 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2211 self.cfg.GetClusterName())
2212 for verifier in node_verify_list:
2213 if result[verifier].failed or not result[verifier].data:
2214 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2215 " for remote verification" % verifier)
2216 if result[verifier].data['nodelist']:
2217 for failed in result[verifier].data['nodelist']:
2218 feedback_fn("ssh/hostname verification failed %s -> %s" %
2219 (verifier, result[verifier].data['nodelist'][failed]))
2220 raise errors.OpExecError("ssh/hostname verification failed.")
2222 # Distribute updated /etc/hosts and known_hosts to all nodes,
2223 # including the node just added
2224 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2225 dist_nodes = self.cfg.GetNodeList()
2226 if not self.op.readd:
2227 dist_nodes.append(node)
2228 if myself.name in dist_nodes:
2229 dist_nodes.remove(myself.name)
2231 logging.debug("Copying hosts and known_hosts to all nodes")
2232 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2233 result = self.rpc.call_upload_file(dist_nodes, fname)
2234 for to_node, to_result in result.iteritems():
2235 if to_result.failed or not to_result.data:
2236 logging.error("Copy of file %s to node %s failed", fname, to_node)
2239 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2240 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2241 to_copy.append(constants.VNC_PASSWORD_FILE)
2243 for fname in to_copy:
2244 result = self.rpc.call_upload_file([node], fname)
2245 if result[node].failed or not result[node]:
2246 logging.error("Could not copy file %s to node %s", fname, node)
2249 self.context.ReaddNode(new_node)
2251 self.context.AddNode(new_node)
2254 class LUSetNodeParams(LogicalUnit):
2255 """Modifies the parameters of a node.
2258 HPATH = "node-modify"
2259 HTYPE = constants.HTYPE_NODE
2260 _OP_REQP = ["node_name"]
2263 def CheckArguments(self):
2264 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2265 if node_name is None:
2266 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2267 self.op.node_name = node_name
2268 _CheckBooleanOpField(self.op, 'master_candidate')
2269 _CheckBooleanOpField(self.op, 'offline')
2270 _CheckBooleanOpField(self.op, 'drained')
2271 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2272 if all_mods.count(None) == 3:
2273 raise errors.OpPrereqError("Please pass at least one modification")
2274 if all_mods.count(True) > 1:
2275 raise errors.OpPrereqError("Can't set the node into more than one"
2276 " state at the same time")
2278 def ExpandNames(self):
2279 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2281 def BuildHooksEnv(self):
2284 This runs on the master node.
2288 "OP_TARGET": self.op.node_name,
2289 "MASTER_CANDIDATE": str(self.op.master_candidate),
2290 "OFFLINE": str(self.op.offline),
2291 "DRAINED": str(self.op.drained),
2293 nl = [self.cfg.GetMasterNode(),
2297 def CheckPrereq(self):
2298 """Check prerequisites.
2300 This only checks the instance list against the existing names.
2303 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2305 if ((self.op.master_candidate == False or self.op.offline == True or
2306 self.op.drained == True) and node.master_candidate):
2307 # we will demote the node from master_candidate
2308 if self.op.node_name == self.cfg.GetMasterNode():
2309 raise errors.OpPrereqError("The master node has to be a"
2310 " master candidate, online and not drained")
2311 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2312 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2313 if num_candidates <= cp_size:
2314 msg = ("Not enough master candidates (desired"
2315 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2317 self.LogWarning(msg)
2319 raise errors.OpPrereqError(msg)
2321 if (self.op.master_candidate == True and
2322 ((node.offline and not self.op.offline == False) or
2323 (node.drained and not self.op.drained == False))):
2324 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2325 " to master_candidate")
2329 def Exec(self, feedback_fn):
2338 if self.op.offline is not None:
2339 node.offline = self.op.offline
2340 result.append(("offline", str(self.op.offline)))
2341 if self.op.offline == True:
2342 if node.master_candidate:
2343 node.master_candidate = False
2345 result.append(("master_candidate", "auto-demotion due to offline"))
2347 node.drained = False
2348 result.append(("drained", "clear drained status due to offline"))
2350 if self.op.master_candidate is not None:
2351 node.master_candidate = self.op.master_candidate
2353 result.append(("master_candidate", str(self.op.master_candidate)))
2354 if self.op.master_candidate == False:
2355 rrc = self.rpc.call_node_demote_from_mc(node.name)
2356 msg = rrc.RemoteFailMsg()
2358 self.LogWarning("Node failed to demote itself: %s" % msg)
2360 if self.op.drained is not None:
2361 node.drained = self.op.drained
2362 result.append(("drained", str(self.op.drained)))
2363 if self.op.drained == True:
2364 if node.master_candidate:
2365 node.master_candidate = False
2367 result.append(("master_candidate", "auto-demotion due to drain"))
2369 node.offline = False
2370 result.append(("offline", "clear offline status due to drain"))
2372 # this will trigger configuration file update, if needed
2373 self.cfg.Update(node)
2374 # this will trigger job queue propagation or cleanup
2376 self.context.ReaddNode(node)
2381 class LUQueryClusterInfo(NoHooksLU):
2382 """Query cluster configuration.
2388 def ExpandNames(self):
2389 self.needed_locks = {}
2391 def CheckPrereq(self):
2392 """No prerequsites needed for this LU.
2397 def Exec(self, feedback_fn):
2398 """Return cluster config.
2401 cluster = self.cfg.GetClusterInfo()
2403 "software_version": constants.RELEASE_VERSION,
2404 "protocol_version": constants.PROTOCOL_VERSION,
2405 "config_version": constants.CONFIG_VERSION,
2406 "os_api_version": constants.OS_API_VERSION,
2407 "export_version": constants.EXPORT_VERSION,
2408 "architecture": (platform.architecture()[0], platform.machine()),
2409 "name": cluster.cluster_name,
2410 "master": cluster.master_node,
2411 "default_hypervisor": cluster.default_hypervisor,
2412 "enabled_hypervisors": cluster.enabled_hypervisors,
2413 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2414 for hypervisor in cluster.enabled_hypervisors]),
2415 "beparams": cluster.beparams,
2416 "candidate_pool_size": cluster.candidate_pool_size,
2422 class LUQueryConfigValues(NoHooksLU):
2423 """Return configuration values.
2428 _FIELDS_DYNAMIC = utils.FieldSet()
2429 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2431 def ExpandNames(self):
2432 self.needed_locks = {}
2434 _CheckOutputFields(static=self._FIELDS_STATIC,
2435 dynamic=self._FIELDS_DYNAMIC,
2436 selected=self.op.output_fields)
2438 def CheckPrereq(self):
2439 """No prerequisites.
2444 def Exec(self, feedback_fn):
2445 """Dump a representation of the cluster config to the standard output.
2449 for field in self.op.output_fields:
2450 if field == "cluster_name":
2451 entry = self.cfg.GetClusterName()
2452 elif field == "master_node":
2453 entry = self.cfg.GetMasterNode()
2454 elif field == "drain_flag":
2455 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2457 raise errors.ParameterError(field)
2458 values.append(entry)
2462 class LUActivateInstanceDisks(NoHooksLU):
2463 """Bring up an instance's disks.
2466 _OP_REQP = ["instance_name"]
2469 def ExpandNames(self):
2470 self._ExpandAndLockInstance()
2471 self.needed_locks[locking.LEVEL_NODE] = []
2472 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2474 def DeclareLocks(self, level):
2475 if level == locking.LEVEL_NODE:
2476 self._LockInstancesNodes()
2478 def CheckPrereq(self):
2479 """Check prerequisites.
2481 This checks that the instance is in the cluster.
2484 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2485 assert self.instance is not None, \
2486 "Cannot retrieve locked instance %s" % self.op.instance_name
2487 _CheckNodeOnline(self, self.instance.primary_node)
2489 def Exec(self, feedback_fn):
2490 """Activate the disks.
2493 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2495 raise errors.OpExecError("Cannot activate block devices")
2500 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2501 """Prepare the block devices for an instance.
2503 This sets up the block devices on all nodes.
2505 @type lu: L{LogicalUnit}
2506 @param lu: the logical unit on whose behalf we execute
2507 @type instance: L{objects.Instance}
2508 @param instance: the instance for whose disks we assemble
2509 @type ignore_secondaries: boolean
2510 @param ignore_secondaries: if true, errors on secondary nodes
2511 won't result in an error return from the function
2512 @return: False if the operation failed, otherwise a list of
2513 (host, instance_visible_name, node_visible_name)
2514 with the mapping from node devices to instance devices
2519 iname = instance.name
2520 # With the two passes mechanism we try to reduce the window of
2521 # opportunity for the race condition of switching DRBD to primary
2522 # before handshaking occured, but we do not eliminate it
2524 # The proper fix would be to wait (with some limits) until the
2525 # connection has been made and drbd transitions from WFConnection
2526 # into any other network-connected state (Connected, SyncTarget,
2529 # 1st pass, assemble on all nodes in secondary mode
2530 for inst_disk in instance.disks:
2531 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2532 lu.cfg.SetDiskID(node_disk, node)
2533 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2534 msg = result.RemoteFailMsg()
2536 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2537 " (is_primary=False, pass=1): %s",
2538 inst_disk.iv_name, node, msg)
2539 if not ignore_secondaries:
2542 # FIXME: race condition on drbd migration to primary
2544 # 2nd pass, do only the primary node
2545 for inst_disk in instance.disks:
2546 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2547 if node != instance.primary_node:
2549 lu.cfg.SetDiskID(node_disk, node)
2550 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2551 msg = result.RemoteFailMsg()
2553 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2554 " (is_primary=True, pass=2): %s",
2555 inst_disk.iv_name, node, msg)
2557 device_info.append((instance.primary_node, inst_disk.iv_name,
2560 # leave the disks configured for the primary node
2561 # this is a workaround that would be fixed better by
2562 # improving the logical/physical id handling
2563 for disk in instance.disks:
2564 lu.cfg.SetDiskID(disk, instance.primary_node)
2566 return disks_ok, device_info
2569 def _StartInstanceDisks(lu, instance, force):
2570 """Start the disks of an instance.
2573 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2574 ignore_secondaries=force)
2576 _ShutdownInstanceDisks(lu, instance)
2577 if force is not None and not force:
2578 lu.proc.LogWarning("", hint="If the message above refers to a"
2580 " you can retry the operation using '--force'.")
2581 raise errors.OpExecError("Disk consistency error")
2584 class LUDeactivateInstanceDisks(NoHooksLU):
2585 """Shutdown an instance's disks.
2588 _OP_REQP = ["instance_name"]
2591 def ExpandNames(self):
2592 self._ExpandAndLockInstance()
2593 self.needed_locks[locking.LEVEL_NODE] = []
2594 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2596 def DeclareLocks(self, level):
2597 if level == locking.LEVEL_NODE:
2598 self._LockInstancesNodes()
2600 def CheckPrereq(self):
2601 """Check prerequisites.
2603 This checks that the instance is in the cluster.
2606 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2607 assert self.instance is not None, \
2608 "Cannot retrieve locked instance %s" % self.op.instance_name
2610 def Exec(self, feedback_fn):
2611 """Deactivate the disks
2614 instance = self.instance
2615 _SafeShutdownInstanceDisks(self, instance)
2618 def _SafeShutdownInstanceDisks(lu, instance):
2619 """Shutdown block devices of an instance.
2621 This function checks if an instance is running, before calling
2622 _ShutdownInstanceDisks.
2625 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2626 [instance.hypervisor])
2627 ins_l = ins_l[instance.primary_node]
2628 if ins_l.failed or not isinstance(ins_l.data, list):
2629 raise errors.OpExecError("Can't contact node '%s'" %
2630 instance.primary_node)
2632 if instance.name in ins_l.data:
2633 raise errors.OpExecError("Instance is running, can't shutdown"
2636 _ShutdownInstanceDisks(lu, instance)
2639 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2640 """Shutdown block devices of an instance.
2642 This does the shutdown on all nodes of the instance.
2644 If the ignore_primary is false, errors on the primary node are
2649 for disk in instance.disks:
2650 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2651 lu.cfg.SetDiskID(top_disk, node)
2652 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2653 msg = result.RemoteFailMsg()
2655 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2656 disk.iv_name, node, msg)
2657 if not ignore_primary or node != instance.primary_node:
2662 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2663 """Checks if a node has enough free memory.
2665 This function check if a given node has the needed amount of free
2666 memory. In case the node has less memory or we cannot get the
2667 information from the node, this function raise an OpPrereqError
2670 @type lu: C{LogicalUnit}
2671 @param lu: a logical unit from which we get configuration data
2673 @param node: the node to check
2674 @type reason: C{str}
2675 @param reason: string to use in the error message
2676 @type requested: C{int}
2677 @param requested: the amount of memory in MiB to check for
2678 @type hypervisor_name: C{str}
2679 @param hypervisor_name: the hypervisor to ask for memory stats
2680 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2681 we cannot check the node
2684 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2685 nodeinfo[node].Raise()
2686 free_mem = nodeinfo[node].data.get('memory_free')
2687 if not isinstance(free_mem, int):
2688 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2689 " was '%s'" % (node, free_mem))
2690 if requested > free_mem:
2691 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2692 " needed %s MiB, available %s MiB" %
2693 (node, reason, requested, free_mem))
2696 class LUStartupInstance(LogicalUnit):
2697 """Starts an instance.
2700 HPATH = "instance-start"
2701 HTYPE = constants.HTYPE_INSTANCE
2702 _OP_REQP = ["instance_name", "force"]
2705 def ExpandNames(self):
2706 self._ExpandAndLockInstance()
2708 def BuildHooksEnv(self):
2711 This runs on master, primary and secondary nodes of the instance.
2715 "FORCE": self.op.force,
2717 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2718 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2721 def CheckPrereq(self):
2722 """Check prerequisites.
2724 This checks that the instance is in the cluster.
2727 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2728 assert self.instance is not None, \
2729 "Cannot retrieve locked instance %s" % self.op.instance_name
2731 _CheckNodeOnline(self, instance.primary_node)
2733 bep = self.cfg.GetClusterInfo().FillBE(instance)
2734 # check bridges existance
2735 _CheckInstanceBridgesExist(self, instance)
2737 _CheckNodeFreeMemory(self, instance.primary_node,
2738 "starting instance %s" % instance.name,
2739 bep[constants.BE_MEMORY], instance.hypervisor)
2741 def Exec(self, feedback_fn):
2742 """Start the instance.
2745 instance = self.instance
2746 force = self.op.force
2748 self.cfg.MarkInstanceUp(instance.name)
2750 node_current = instance.primary_node
2752 _StartInstanceDisks(self, instance, force)
2754 result = self.rpc.call_instance_start(node_current, instance)
2755 msg = result.RemoteFailMsg()
2757 _ShutdownInstanceDisks(self, instance)
2758 raise errors.OpExecError("Could not start instance: %s" % msg)
2761 class LURebootInstance(LogicalUnit):
2762 """Reboot an instance.
2765 HPATH = "instance-reboot"
2766 HTYPE = constants.HTYPE_INSTANCE
2767 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2770 def ExpandNames(self):
2771 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2772 constants.INSTANCE_REBOOT_HARD,
2773 constants.INSTANCE_REBOOT_FULL]:
2774 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2775 (constants.INSTANCE_REBOOT_SOFT,
2776 constants.INSTANCE_REBOOT_HARD,
2777 constants.INSTANCE_REBOOT_FULL))
2778 self._ExpandAndLockInstance()
2780 def BuildHooksEnv(self):
2783 This runs on master, primary and secondary nodes of the instance.
2787 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2788 "REBOOT_TYPE": self.op.reboot_type,
2790 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2791 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2794 def CheckPrereq(self):
2795 """Check prerequisites.
2797 This checks that the instance is in the cluster.
2800 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2801 assert self.instance is not None, \
2802 "Cannot retrieve locked instance %s" % self.op.instance_name
2804 _CheckNodeOnline(self, instance.primary_node)
2806 # check bridges existance
2807 _CheckInstanceBridgesExist(self, instance)
2809 def Exec(self, feedback_fn):
2810 """Reboot the instance.
2813 instance = self.instance
2814 ignore_secondaries = self.op.ignore_secondaries
2815 reboot_type = self.op.reboot_type
2817 node_current = instance.primary_node
2819 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2820 constants.INSTANCE_REBOOT_HARD]:
2821 for disk in instance.disks:
2822 self.cfg.SetDiskID(disk, node_current)
2823 result = self.rpc.call_instance_reboot(node_current, instance,
2825 msg = result.RemoteFailMsg()
2827 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2829 result = self.rpc.call_instance_shutdown(node_current, instance)
2830 msg = result.RemoteFailMsg()
2832 raise errors.OpExecError("Could not shutdown instance for"
2833 " full reboot: %s" % msg)
2834 _ShutdownInstanceDisks(self, instance)
2835 _StartInstanceDisks(self, instance, ignore_secondaries)
2836 result = self.rpc.call_instance_start(node_current, instance)
2837 msg = result.RemoteFailMsg()
2839 _ShutdownInstanceDisks(self, instance)
2840 raise errors.OpExecError("Could not start instance for"
2841 " full reboot: %s" % msg)
2843 self.cfg.MarkInstanceUp(instance.name)
2846 class LUShutdownInstance(LogicalUnit):
2847 """Shutdown an instance.
2850 HPATH = "instance-stop"
2851 HTYPE = constants.HTYPE_INSTANCE
2852 _OP_REQP = ["instance_name"]
2855 def ExpandNames(self):
2856 self._ExpandAndLockInstance()
2858 def BuildHooksEnv(self):
2861 This runs on master, primary and secondary nodes of the instance.
2864 env = _BuildInstanceHookEnvByObject(self, self.instance)
2865 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2868 def CheckPrereq(self):
2869 """Check prerequisites.
2871 This checks that the instance is in the cluster.
2874 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875 assert self.instance is not None, \
2876 "Cannot retrieve locked instance %s" % self.op.instance_name
2877 _CheckNodeOnline(self, self.instance.primary_node)
2879 def Exec(self, feedback_fn):
2880 """Shutdown the instance.
2883 instance = self.instance
2884 node_current = instance.primary_node
2885 self.cfg.MarkInstanceDown(instance.name)
2886 result = self.rpc.call_instance_shutdown(node_current, instance)
2887 msg = result.RemoteFailMsg()
2889 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2891 _ShutdownInstanceDisks(self, instance)
2894 class LUReinstallInstance(LogicalUnit):
2895 """Reinstall an instance.
2898 HPATH = "instance-reinstall"
2899 HTYPE = constants.HTYPE_INSTANCE
2900 _OP_REQP = ["instance_name"]
2903 def ExpandNames(self):
2904 self._ExpandAndLockInstance()
2906 def BuildHooksEnv(self):
2909 This runs on master, primary and secondary nodes of the instance.
2912 env = _BuildInstanceHookEnvByObject(self, self.instance)
2913 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2916 def CheckPrereq(self):
2917 """Check prerequisites.
2919 This checks that the instance is in the cluster and is not running.
2922 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2923 assert instance is not None, \
2924 "Cannot retrieve locked instance %s" % self.op.instance_name
2925 _CheckNodeOnline(self, instance.primary_node)
2927 if instance.disk_template == constants.DT_DISKLESS:
2928 raise errors.OpPrereqError("Instance '%s' has no disks" %
2929 self.op.instance_name)
2930 if instance.admin_up:
2931 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2932 self.op.instance_name)
2933 remote_info = self.rpc.call_instance_info(instance.primary_node,
2935 instance.hypervisor)
2936 if remote_info.failed or remote_info.data:
2937 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2938 (self.op.instance_name,
2939 instance.primary_node))
2941 self.op.os_type = getattr(self.op, "os_type", None)
2942 if self.op.os_type is not None:
2944 pnode = self.cfg.GetNodeInfo(
2945 self.cfg.ExpandNodeName(instance.primary_node))
2947 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2949 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2951 if not isinstance(result.data, objects.OS):
2952 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2953 " primary node" % self.op.os_type)
2955 self.instance = instance
2957 def Exec(self, feedback_fn):
2958 """Reinstall the instance.
2961 inst = self.instance
2963 if self.op.os_type is not None:
2964 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2965 inst.os = self.op.os_type
2966 self.cfg.Update(inst)
2968 _StartInstanceDisks(self, inst, None)
2970 feedback_fn("Running the instance OS create scripts...")
2971 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2972 msg = result.RemoteFailMsg()
2974 raise errors.OpExecError("Could not install OS for instance %s"
2976 (inst.name, inst.primary_node, msg))
2978 _ShutdownInstanceDisks(self, inst)
2981 class LURenameInstance(LogicalUnit):
2982 """Rename an instance.
2985 HPATH = "instance-rename"
2986 HTYPE = constants.HTYPE_INSTANCE
2987 _OP_REQP = ["instance_name", "new_name"]
2989 def BuildHooksEnv(self):
2992 This runs on master, primary and secondary nodes of the instance.
2995 env = _BuildInstanceHookEnvByObject(self, self.instance)
2996 env["INSTANCE_NEW_NAME"] = self.op.new_name
2997 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3000 def CheckPrereq(self):
3001 """Check prerequisites.
3003 This checks that the instance is in the cluster and is not running.
3006 instance = self.cfg.GetInstanceInfo(
3007 self.cfg.ExpandInstanceName(self.op.instance_name))
3008 if instance is None:
3009 raise errors.OpPrereqError("Instance '%s' not known" %
3010 self.op.instance_name)
3011 _CheckNodeOnline(self, instance.primary_node)
3013 if instance.admin_up:
3014 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3015 self.op.instance_name)
3016 remote_info = self.rpc.call_instance_info(instance.primary_node,
3018 instance.hypervisor)
3020 if remote_info.data:
3021 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3022 (self.op.instance_name,
3023 instance.primary_node))
3024 self.instance = instance
3026 # new name verification
3027 name_info = utils.HostInfo(self.op.new_name)
3029 self.op.new_name = new_name = name_info.name
3030 instance_list = self.cfg.GetInstanceList()
3031 if new_name in instance_list:
3032 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3035 if not getattr(self.op, "ignore_ip", False):
3036 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3037 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3038 (name_info.ip, new_name))
3041 def Exec(self, feedback_fn):
3042 """Reinstall the instance.
3045 inst = self.instance
3046 old_name = inst.name
3048 if inst.disk_template == constants.DT_FILE:
3049 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3051 self.cfg.RenameInstance(inst.name, self.op.new_name)
3052 # Change the instance lock. This is definitely safe while we hold the BGL
3053 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3054 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3056 # re-read the instance from the configuration after rename
3057 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3059 if inst.disk_template == constants.DT_FILE:
3060 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3061 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3062 old_file_storage_dir,
3063 new_file_storage_dir)
3066 raise errors.OpExecError("Could not connect to node '%s' to rename"
3067 " directory '%s' to '%s' (but the instance"
3068 " has been renamed in Ganeti)" % (
3069 inst.primary_node, old_file_storage_dir,
3070 new_file_storage_dir))
3072 if not result.data[0]:
3073 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3074 " (but the instance has been renamed in"
3075 " Ganeti)" % (old_file_storage_dir,
3076 new_file_storage_dir))
3078 _StartInstanceDisks(self, inst, None)
3080 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3082 msg = result.RemoteFailMsg()
3084 msg = ("Could not run OS rename script for instance %s on node %s"
3085 " (but the instance has been renamed in Ganeti): %s" %
3086 (inst.name, inst.primary_node, msg))
3087 self.proc.LogWarning(msg)
3089 _ShutdownInstanceDisks(self, inst)
3092 class LURemoveInstance(LogicalUnit):
3093 """Remove an instance.
3096 HPATH = "instance-remove"
3097 HTYPE = constants.HTYPE_INSTANCE
3098 _OP_REQP = ["instance_name", "ignore_failures"]
3101 def ExpandNames(self):
3102 self._ExpandAndLockInstance()
3103 self.needed_locks[locking.LEVEL_NODE] = []
3104 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3106 def DeclareLocks(self, level):
3107 if level == locking.LEVEL_NODE:
3108 self._LockInstancesNodes()
3110 def BuildHooksEnv(self):
3113 This runs on master, primary and secondary nodes of the instance.
3116 env = _BuildInstanceHookEnvByObject(self, self.instance)
3117 nl = [self.cfg.GetMasterNode()]
3120 def CheckPrereq(self):
3121 """Check prerequisites.
3123 This checks that the instance is in the cluster.
3126 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3127 assert self.instance is not None, \
3128 "Cannot retrieve locked instance %s" % self.op.instance_name
3130 def Exec(self, feedback_fn):
3131 """Remove the instance.
3134 instance = self.instance
3135 logging.info("Shutting down instance %s on node %s",
3136 instance.name, instance.primary_node)
3138 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3139 msg = result.RemoteFailMsg()
3141 if self.op.ignore_failures:
3142 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3144 raise errors.OpExecError("Could not shutdown instance %s on"
3146 (instance.name, instance.primary_node, msg))
3148 logging.info("Removing block devices for instance %s", instance.name)
3150 if not _RemoveDisks(self, instance):
3151 if self.op.ignore_failures:
3152 feedback_fn("Warning: can't remove instance's disks")
3154 raise errors.OpExecError("Can't remove instance's disks")
3156 logging.info("Removing instance %s out of cluster config", instance.name)
3158 self.cfg.RemoveInstance(instance.name)
3159 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3162 class LUQueryInstances(NoHooksLU):
3163 """Logical unit for querying instances.
3166 _OP_REQP = ["output_fields", "names", "use_locking"]
3168 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3170 "disk_template", "ip", "mac", "bridge",
3171 "sda_size", "sdb_size", "vcpus", "tags",
3172 "network_port", "beparams",
3173 r"(disk)\.(size)/([0-9]+)",
3174 r"(disk)\.(sizes)", "disk_usage",
3175 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3176 r"(nic)\.(macs|ips|bridges)",
3177 r"(disk|nic)\.(count)",
3178 "serial_no", "hypervisor", "hvparams",] +
3180 for name in constants.HVS_PARAMETERS] +
3182 for name in constants.BES_PARAMETERS])
3183 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3186 def ExpandNames(self):
3187 _CheckOutputFields(static=self._FIELDS_STATIC,
3188 dynamic=self._FIELDS_DYNAMIC,
3189 selected=self.op.output_fields)
3191 self.needed_locks = {}
3192 self.share_locks[locking.LEVEL_INSTANCE] = 1
3193 self.share_locks[locking.LEVEL_NODE] = 1
3196 self.wanted = _GetWantedInstances(self, self.op.names)
3198 self.wanted = locking.ALL_SET
3200 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3201 self.do_locking = self.do_node_query and self.op.use_locking
3203 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3204 self.needed_locks[locking.LEVEL_NODE] = []
3205 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3207 def DeclareLocks(self, level):
3208 if level == locking.LEVEL_NODE and self.do_locking:
3209 self._LockInstancesNodes()
3211 def CheckPrereq(self):
3212 """Check prerequisites.
3217 def Exec(self, feedback_fn):
3218 """Computes the list of nodes and their attributes.
3221 all_info = self.cfg.GetAllInstancesInfo()
3222 if self.wanted == locking.ALL_SET:
3223 # caller didn't specify instance names, so ordering is not important
3225 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3227 instance_names = all_info.keys()
3228 instance_names = utils.NiceSort(instance_names)
3230 # caller did specify names, so we must keep the ordering
3232 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3234 tgt_set = all_info.keys()
3235 missing = set(self.wanted).difference(tgt_set)
3237 raise errors.OpExecError("Some instances were removed before"
3238 " retrieving their data: %s" % missing)
3239 instance_names = self.wanted
3241 instance_list = [all_info[iname] for iname in instance_names]
3243 # begin data gathering
3245 nodes = frozenset([inst.primary_node for inst in instance_list])
3246 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3250 if self.do_node_query:
3252 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3254 result = node_data[name]
3256 # offline nodes will be in both lists
3257 off_nodes.append(name)
3259 bad_nodes.append(name)
3262 live_data.update(result.data)
3263 # else no instance is alive
3265 live_data = dict([(name, {}) for name in instance_names])
3267 # end data gathering
3272 for instance in instance_list:
3274 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3275 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3276 for field in self.op.output_fields:
3277 st_match = self._FIELDS_STATIC.Matches(field)
3282 elif field == "pnode":
3283 val = instance.primary_node
3284 elif field == "snodes":
3285 val = list(instance.secondary_nodes)
3286 elif field == "admin_state":
3287 val = instance.admin_up
3288 elif field == "oper_state":
3289 if instance.primary_node in bad_nodes:
3292 val = bool(live_data.get(instance.name))
3293 elif field == "status":
3294 if instance.primary_node in off_nodes:
3295 val = "ERROR_nodeoffline"
3296 elif instance.primary_node in bad_nodes:
3297 val = "ERROR_nodedown"
3299 running = bool(live_data.get(instance.name))
3301 if instance.admin_up:
3306 if instance.admin_up:
3310 elif field == "oper_ram":
3311 if instance.primary_node in bad_nodes:
3313 elif instance.name in live_data:
3314 val = live_data[instance.name].get("memory", "?")
3317 elif field == "disk_template":
3318 val = instance.disk_template
3320 val = instance.nics[0].ip
3321 elif field == "bridge":
3322 val = instance.nics[0].bridge
3323 elif field == "mac":
3324 val = instance.nics[0].mac
3325 elif field == "sda_size" or field == "sdb_size":
3326 idx = ord(field[2]) - ord('a')
3328 val = instance.FindDisk(idx).size
3329 except errors.OpPrereqError:
3331 elif field == "disk_usage": # total disk usage per node
3332 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3333 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3334 elif field == "tags":
3335 val = list(instance.GetTags())
3336 elif field == "serial_no":
3337 val = instance.serial_no
3338 elif field == "network_port":
3339 val = instance.network_port
3340 elif field == "hypervisor":
3341 val = instance.hypervisor
3342 elif field == "hvparams":
3344 elif (field.startswith(HVPREFIX) and
3345 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3346 val = i_hv.get(field[len(HVPREFIX):], None)
3347 elif field == "beparams":
3349 elif (field.startswith(BEPREFIX) and
3350 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3351 val = i_be.get(field[len(BEPREFIX):], None)
3352 elif st_match and st_match.groups():
3353 # matches a variable list
3354 st_groups = st_match.groups()
3355 if st_groups and st_groups[0] == "disk":
3356 if st_groups[1] == "count":
3357 val = len(instance.disks)
3358 elif st_groups[1] == "sizes":
3359 val = [disk.size for disk in instance.disks]
3360 elif st_groups[1] == "size":
3362 val = instance.FindDisk(st_groups[2]).size
3363 except errors.OpPrereqError:
3366 assert False, "Unhandled disk parameter"
3367 elif st_groups[0] == "nic":
3368 if st_groups[1] == "count":
3369 val = len(instance.nics)
3370 elif st_groups[1] == "macs":
3371 val = [nic.mac for nic in instance.nics]
3372 elif st_groups[1] == "ips":
3373 val = [nic.ip for nic in instance.nics]
3374 elif st_groups[1] == "bridges":
3375 val = [nic.bridge for nic in instance.nics]
3378 nic_idx = int(st_groups[2])
3379 if nic_idx >= len(instance.nics):
3382 if st_groups[1] == "mac":
3383 val = instance.nics[nic_idx].mac
3384 elif st_groups[1] == "ip":
3385 val = instance.nics[nic_idx].ip
3386 elif st_groups[1] == "bridge":
3387 val = instance.nics[nic_idx].bridge
3389 assert False, "Unhandled NIC parameter"
3391 assert False, "Unhandled variable parameter"
3393 raise errors.ParameterError(field)
3400 class LUFailoverInstance(LogicalUnit):
3401 """Failover an instance.
3404 HPATH = "instance-failover"
3405 HTYPE = constants.HTYPE_INSTANCE
3406 _OP_REQP = ["instance_name", "ignore_consistency"]
3409 def ExpandNames(self):
3410 self._ExpandAndLockInstance()
3411 self.needed_locks[locking.LEVEL_NODE] = []
3412 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3414 def DeclareLocks(self, level):
3415 if level == locking.LEVEL_NODE:
3416 self._LockInstancesNodes()
3418 def BuildHooksEnv(self):
3421 This runs on master, primary and secondary nodes of the instance.
3425 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3427 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3428 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3431 def CheckPrereq(self):
3432 """Check prerequisites.
3434 This checks that the instance is in the cluster.
3437 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3438 assert self.instance is not None, \
3439 "Cannot retrieve locked instance %s" % self.op.instance_name
3441 bep = self.cfg.GetClusterInfo().FillBE(instance)
3442 if instance.disk_template not in constants.DTS_NET_MIRROR:
3443 raise errors.OpPrereqError("Instance's disk layout is not"
3444 " network mirrored, cannot failover.")
3446 secondary_nodes = instance.secondary_nodes
3447 if not secondary_nodes:
3448 raise errors.ProgrammerError("no secondary node but using "
3449 "a mirrored disk template")
3451 target_node = secondary_nodes[0]
3452 _CheckNodeOnline(self, target_node)
3453 _CheckNodeNotDrained(self, target_node)
3454 # check memory requirements on the secondary node
3455 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3456 instance.name, bep[constants.BE_MEMORY],
3457 instance.hypervisor)
3459 # check bridge existance
3460 brlist = [nic.bridge for nic in instance.nics]
3461 result = self.rpc.call_bridges_exist(target_node, brlist)
3464 raise errors.OpPrereqError("One or more target bridges %s does not"
3465 " exist on destination node '%s'" %
3466 (brlist, target_node))
3468 def Exec(self, feedback_fn):
3469 """Failover an instance.
3471 The failover is done by shutting it down on its present node and
3472 starting it on the secondary.
3475 instance = self.instance
3477 source_node = instance.primary_node
3478 target_node = instance.secondary_nodes[0]
3480 feedback_fn("* checking disk consistency between source and target")
3481 for dev in instance.disks:
3482 # for drbd, these are drbd over lvm
3483 if not _CheckDiskConsistency(self, dev, target_node, False):
3484 if instance.admin_up and not self.op.ignore_consistency:
3485 raise errors.OpExecError("Disk %s is degraded on target node,"
3486 " aborting failover." % dev.iv_name)
3488 feedback_fn("* shutting down instance on source node")
3489 logging.info("Shutting down instance %s on node %s",
3490 instance.name, source_node)
3492 result = self.rpc.call_instance_shutdown(source_node, instance)
3493 msg = result.RemoteFailMsg()
3495 if self.op.ignore_consistency:
3496 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3497 " Proceeding anyway. Please make sure node"
3498 " %s is down. Error details: %s",
3499 instance.name, source_node, source_node, msg)
3501 raise errors.OpExecError("Could not shutdown instance %s on"
3503 (instance.name, source_node, msg))
3505 feedback_fn("* deactivating the instance's disks on source node")
3506 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3507 raise errors.OpExecError("Can't shut down the instance's disks.")
3509 instance.primary_node = target_node
3510 # distribute new instance config to the other nodes
3511 self.cfg.Update(instance)
3513 # Only start the instance if it's marked as up
3514 if instance.admin_up:
3515 feedback_fn("* activating the instance's disks on target node")
3516 logging.info("Starting instance %s on node %s",
3517 instance.name, target_node)
3519 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3520 ignore_secondaries=True)
3522 _ShutdownInstanceDisks(self, instance)
3523 raise errors.OpExecError("Can't activate the instance's disks")
3525 feedback_fn("* starting the instance on the target node")
3526 result = self.rpc.call_instance_start(target_node, instance)
3527 msg = result.RemoteFailMsg()
3529 _ShutdownInstanceDisks(self, instance)
3530 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3531 (instance.name, target_node, msg))
3534 class LUMigrateInstance(LogicalUnit):
3535 """Migrate an instance.
3537 This is migration without shutting down, compared to the failover,
3538 which is done with shutdown.
3541 HPATH = "instance-migrate"
3542 HTYPE = constants.HTYPE_INSTANCE
3543 _OP_REQP = ["instance_name", "live", "cleanup"]
3547 def ExpandNames(self):
3548 self._ExpandAndLockInstance()
3549 self.needed_locks[locking.LEVEL_NODE] = []
3550 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3552 def DeclareLocks(self, level):
3553 if level == locking.LEVEL_NODE:
3554 self._LockInstancesNodes()
3556 def BuildHooksEnv(self):
3559 This runs on master, primary and secondary nodes of the instance.
3562 env = _BuildInstanceHookEnvByObject(self, self.instance)
3563 env["MIGRATE_LIVE"] = self.op.live
3564 env["MIGRATE_CLEANUP"] = self.op.cleanup
3565 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3568 def CheckPrereq(self):
3569 """Check prerequisites.
3571 This checks that the instance is in the cluster.
3574 instance = self.cfg.GetInstanceInfo(
3575 self.cfg.ExpandInstanceName(self.op.instance_name))
3576 if instance is None:
3577 raise errors.OpPrereqError("Instance '%s' not known" %
3578 self.op.instance_name)
3580 if instance.disk_template != constants.DT_DRBD8:
3581 raise errors.OpPrereqError("Instance's disk layout is not"
3582 " drbd8, cannot migrate.")
3584 secondary_nodes = instance.secondary_nodes
3585 if not secondary_nodes:
3586 raise errors.ConfigurationError("No secondary node but using"
3587 " drbd8 disk template")
3589 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3591 target_node = secondary_nodes[0]
3592 # check memory requirements on the secondary node
3593 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3594 instance.name, i_be[constants.BE_MEMORY],
3595 instance.hypervisor)
3597 # check bridge existance
3598 brlist = [nic.bridge for nic in instance.nics]
3599 result = self.rpc.call_bridges_exist(target_node, brlist)
3600 if result.failed or not result.data:
3601 raise errors.OpPrereqError("One or more target bridges %s does not"
3602 " exist on destination node '%s'" %
3603 (brlist, target_node))
3605 if not self.op.cleanup:
3606 _CheckNodeNotDrained(self, target_node)
3607 result = self.rpc.call_instance_migratable(instance.primary_node,
3609 msg = result.RemoteFailMsg()
3611 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3614 self.instance = instance
3616 def _WaitUntilSync(self):
3617 """Poll with custom rpc for disk sync.
3619 This uses our own step-based rpc call.
3622 self.feedback_fn("* wait until resync is done")
3626 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3628 self.instance.disks)
3630 for node, nres in result.items():
3631 msg = nres.RemoteFailMsg()
3633 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3635 node_done, node_percent = nres.payload
3636 all_done = all_done and node_done
3637 if node_percent is not None:
3638 min_percent = min(min_percent, node_percent)
3640 if min_percent < 100:
3641 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3644 def _EnsureSecondary(self, node):
3645 """Demote a node to secondary.
3648 self.feedback_fn("* switching node %s to secondary mode" % node)
3650 for dev in self.instance.disks:
3651 self.cfg.SetDiskID(dev, node)
3653 result = self.rpc.call_blockdev_close(node, self.instance.name,
3654 self.instance.disks)
3655 msg = result.RemoteFailMsg()
3657 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3658 " error %s" % (node, msg))
3660 def _GoStandalone(self):
3661 """Disconnect from the network.
3664 self.feedback_fn("* changing into standalone mode")
3665 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3666 self.instance.disks)
3667 for node, nres in result.items():
3668 msg = nres.RemoteFailMsg()
3670 raise errors.OpExecError("Cannot disconnect disks node %s,"
3671 " error %s" % (node, msg))
3673 def _GoReconnect(self, multimaster):
3674 """Reconnect to the network.
3680 msg = "single-master"
3681 self.feedback_fn("* changing disks into %s mode" % msg)
3682 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3683 self.instance.disks,
3684 self.instance.name, multimaster)
3685 for node, nres in result.items():
3686 msg = nres.RemoteFailMsg()
3688 raise errors.OpExecError("Cannot change disks config on node %s,"
3689 " error: %s" % (node, msg))
3691 def _ExecCleanup(self):
3692 """Try to cleanup after a failed migration.
3694 The cleanup is done by:
3695 - check that the instance is running only on one node
3696 (and update the config if needed)
3697 - change disks on its secondary node to secondary
3698 - wait until disks are fully synchronized
3699 - disconnect from the network
3700 - change disks into single-master mode
3701 - wait again until disks are fully synchronized
3704 instance = self.instance
3705 target_node = self.target_node
3706 source_node = self.source_node
3708 # check running on only one node
3709 self.feedback_fn("* checking where the instance actually runs"
3710 " (if this hangs, the hypervisor might be in"
3712 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3713 for node, result in ins_l.items():
3715 if not isinstance(result.data, list):
3716 raise errors.OpExecError("Can't contact node '%s'" % node)
3718 runningon_source = instance.name in ins_l[source_node].data
3719 runningon_target = instance.name in ins_l[target_node].data
3721 if runningon_source and runningon_target:
3722 raise errors.OpExecError("Instance seems to be running on two nodes,"
3723 " or the hypervisor is confused. You will have"
3724 " to ensure manually that it runs only on one"
3725 " and restart this operation.")
3727 if not (runningon_source or runningon_target):
3728 raise errors.OpExecError("Instance does not seem to be running at all."
3729 " In this case, it's safer to repair by"
3730 " running 'gnt-instance stop' to ensure disk"
3731 " shutdown, and then restarting it.")
3733 if runningon_target:
3734 # the migration has actually succeeded, we need to update the config
3735 self.feedback_fn("* instance running on secondary node (%s),"
3736 " updating config" % target_node)
3737 instance.primary_node = target_node
3738 self.cfg.Update(instance)
3739 demoted_node = source_node
3741 self.feedback_fn("* instance confirmed to be running on its"
3742 " primary node (%s)" % source_node)
3743 demoted_node = target_node
3745 self._EnsureSecondary(demoted_node)
3747 self._WaitUntilSync()
3748 except errors.OpExecError:
3749 # we ignore here errors, since if the device is standalone, it
3750 # won't be able to sync
3752 self._GoStandalone()
3753 self._GoReconnect(False)
3754 self._WaitUntilSync()
3756 self.feedback_fn("* done")
3758 def _RevertDiskStatus(self):
3759 """Try to revert the disk status after a failed migration.
3762 target_node = self.target_node
3764 self._EnsureSecondary(target_node)
3765 self._GoStandalone()
3766 self._GoReconnect(False)
3767 self._WaitUntilSync()
3768 except errors.OpExecError, err:
3769 self.LogWarning("Migration failed and I can't reconnect the"
3770 " drives: error '%s'\n"
3771 "Please look and recover the instance status" %
3774 def _AbortMigration(self):
3775 """Call the hypervisor code to abort a started migration.
3778 instance = self.instance
3779 target_node = self.target_node
3780 migration_info = self.migration_info
3782 abort_result = self.rpc.call_finalize_migration(target_node,
3786 abort_msg = abort_result.RemoteFailMsg()
3788 logging.error("Aborting migration failed on target node %s: %s" %
3789 (target_node, abort_msg))
3790 # Don't raise an exception here, as we stil have to try to revert the
3791 # disk status, even if this step failed.
3793 def _ExecMigration(self):
3794 """Migrate an instance.
3796 The migrate is done by:
3797 - change the disks into dual-master mode
3798 - wait until disks are fully synchronized again
3799 - migrate the instance
3800 - change disks on the new secondary node (the old primary) to secondary
3801 - wait until disks are fully synchronized
3802 - change disks into single-master mode
3805 instance = self.instance
3806 target_node = self.target_node
3807 source_node = self.source_node
3809 self.feedback_fn("* checking disk consistency between source and target")
3810 for dev in instance.disks:
3811 if not _CheckDiskConsistency(self, dev, target_node, False):
3812 raise errors.OpExecError("Disk %s is degraded or not fully"
3813 " synchronized on target node,"
3814 " aborting migrate." % dev.iv_name)
3816 # First get the migration information from the remote node
3817 result = self.rpc.call_migration_info(source_node, instance)
3818 msg = result.RemoteFailMsg()
3820 log_err = ("Failed fetching source migration information from %s: %s" %
3822 logging.error(log_err)
3823 raise errors.OpExecError(log_err)
3825 self.migration_info = migration_info = result.payload
3827 # Then switch the disks to master/master mode
3828 self._EnsureSecondary(target_node)
3829 self._GoStandalone()
3830 self._GoReconnect(True)
3831 self._WaitUntilSync()
3833 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3834 result = self.rpc.call_accept_instance(target_node,
3837 self.nodes_ip[target_node])
3839 msg = result.RemoteFailMsg()
3841 logging.error("Instance pre-migration failed, trying to revert"
3842 " disk status: %s", msg)
3843 self._AbortMigration()
3844 self._RevertDiskStatus()
3845 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3846 (instance.name, msg))
3848 self.feedback_fn("* migrating instance to %s" % target_node)
3850 result = self.rpc.call_instance_migrate(source_node, instance,
3851 self.nodes_ip[target_node],
3853 msg = result.RemoteFailMsg()
3855 logging.error("Instance migration failed, trying to revert"
3856 " disk status: %s", msg)
3857 self._AbortMigration()
3858 self._RevertDiskStatus()
3859 raise errors.OpExecError("Could not migrate instance %s: %s" %
3860 (instance.name, msg))
3863 instance.primary_node = target_node
3864 # distribute new instance config to the other nodes
3865 self.cfg.Update(instance)
3867 result = self.rpc.call_finalize_migration(target_node,
3871 msg = result.RemoteFailMsg()
3873 logging.error("Instance migration succeeded, but finalization failed:"
3875 raise errors.OpExecError("Could not finalize instance migration: %s" %
3878 self._EnsureSecondary(source_node)
3879 self._WaitUntilSync()
3880 self._GoStandalone()
3881 self._GoReconnect(False)
3882 self._WaitUntilSync()
3884 self.feedback_fn("* done")
3886 def Exec(self, feedback_fn):
3887 """Perform the migration.
3890 self.feedback_fn = feedback_fn
3892 self.source_node = self.instance.primary_node
3893 self.target_node = self.instance.secondary_nodes[0]
3894 self.all_nodes = [self.source_node, self.target_node]
3896 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3897 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3900 return self._ExecCleanup()
3902 return self._ExecMigration()
3905 def _CreateBlockDev(lu, node, instance, device, force_create,
3907 """Create a tree of block devices on a given node.
3909 If this device type has to be created on secondaries, create it and
3912 If not, just recurse to children keeping the same 'force' value.
3914 @param lu: the lu on whose behalf we execute
3915 @param node: the node on which to create the device
3916 @type instance: L{objects.Instance}
3917 @param instance: the instance which owns the device
3918 @type device: L{objects.Disk}
3919 @param device: the device to create
3920 @type force_create: boolean
3921 @param force_create: whether to force creation of this device; this
3922 will be change to True whenever we find a device which has
3923 CreateOnSecondary() attribute
3924 @param info: the extra 'metadata' we should attach to the device
3925 (this will be represented as a LVM tag)
3926 @type force_open: boolean
3927 @param force_open: this parameter will be passes to the
3928 L{backend.BlockdevCreate} function where it specifies
3929 whether we run on primary or not, and it affects both
3930 the child assembly and the device own Open() execution
3933 if device.CreateOnSecondary():
3937 for child in device.children:
3938 _CreateBlockDev(lu, node, instance, child, force_create,
3941 if not force_create:
3944 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3947 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3948 """Create a single block device on a given node.
3950 This will not recurse over children of the device, so they must be
3953 @param lu: the lu on whose behalf we execute
3954 @param node: the node on which to create the device
3955 @type instance: L{objects.Instance}
3956 @param instance: the instance which owns the device
3957 @type device: L{objects.Disk}
3958 @param device: the device to create
3959 @param info: the extra 'metadata' we should attach to the device
3960 (this will be represented as a LVM tag)
3961 @type force_open: boolean
3962 @param force_open: this parameter will be passes to the
3963 L{backend.BlockdevCreate} function where it specifies
3964 whether we run on primary or not, and it affects both
3965 the child assembly and the device own Open() execution
3968 lu.cfg.SetDiskID(device, node)
3969 result = lu.rpc.call_blockdev_create(node, device, device.size,
3970 instance.name, force_open, info)
3971 msg = result.RemoteFailMsg()
3973 raise errors.OpExecError("Can't create block device %s on"
3974 " node %s for instance %s: %s" %
3975 (device, node, instance.name, msg))
3976 if device.physical_id is None:
3977 device.physical_id = result.payload
3980 def _GenerateUniqueNames(lu, exts):
3981 """Generate a suitable LV name.
3983 This will generate a logical volume name for the given instance.
3988 new_id = lu.cfg.GenerateUniqueID()
3989 results.append("%s%s" % (new_id, val))
3993 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3995 """Generate a drbd8 device complete with its children.
3998 port = lu.cfg.AllocatePort()
3999 vgname = lu.cfg.GetVGName()
4000 shared_secret = lu.cfg.GenerateDRBDSecret()
4001 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4002 logical_id=(vgname, names[0]))
4003 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4004 logical_id=(vgname, names[1]))
4005 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4006 logical_id=(primary, secondary, port,
4009 children=[dev_data, dev_meta],
4014 def _GenerateDiskTemplate(lu, template_name,
4015 instance_name, primary_node,
4016 secondary_nodes, disk_info,
4017 file_storage_dir, file_driver,
4019 """Generate the entire disk layout for a given template type.
4022 #TODO: compute space requirements
4024 vgname = lu.cfg.GetVGName()
4025 disk_count = len(disk_info)
4027 if template_name == constants.DT_DISKLESS:
4029 elif template_name == constants.DT_PLAIN:
4030 if len(secondary_nodes) != 0:
4031 raise errors.ProgrammerError("Wrong template configuration")
4033 names = _GenerateUniqueNames(lu, [".disk%d" % i
4034 for i in range(disk_count)])
4035 for idx, disk in enumerate(disk_info):
4036 disk_index = idx + base_index
4037 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4038 logical_id=(vgname, names[idx]),
4039 iv_name="disk/%d" % disk_index,
4041 disks.append(disk_dev)
4042 elif template_name == constants.DT_DRBD8:
4043 if len(secondary_nodes) != 1:
4044 raise errors.ProgrammerError("Wrong template configuration")
4045 remote_node = secondary_nodes[0]
4046 minors = lu.cfg.AllocateDRBDMinor(
4047 [primary_node, remote_node] * len(disk_info), instance_name)
4050 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4051 for i in range(disk_count)]):
4052 names.append(lv_prefix + "_data")
4053 names.append(lv_prefix + "_meta")
4054 for idx, disk in enumerate(disk_info):
4055 disk_index = idx + base_index
4056 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4057 disk["size"], names[idx*2:idx*2+2],
4058 "disk/%d" % disk_index,
4059 minors[idx*2], minors[idx*2+1])
4060 disk_dev.mode = disk["mode"]
4061 disks.append(disk_dev)
4062 elif template_name == constants.DT_FILE:
4063 if len(secondary_nodes) != 0:
4064 raise errors.ProgrammerError("Wrong template configuration")
4066 for idx, disk in enumerate(disk_info):
4067 disk_index = idx + base_index
4068 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4069 iv_name="disk/%d" % disk_index,
4070 logical_id=(file_driver,
4071 "%s/disk%d" % (file_storage_dir,
4074 disks.append(disk_dev)
4076 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4080 def _GetInstanceInfoText(instance):
4081 """Compute that text that should be added to the disk's metadata.
4084 return "originstname+%s" % instance.name
4087 def _CreateDisks(lu, instance):
4088 """Create all disks for an instance.
4090 This abstracts away some work from AddInstance.
4092 @type lu: L{LogicalUnit}
4093 @param lu: the logical unit on whose behalf we execute
4094 @type instance: L{objects.Instance}
4095 @param instance: the instance whose disks we should create
4097 @return: the success of the creation
4100 info = _GetInstanceInfoText(instance)
4101 pnode = instance.primary_node
4103 if instance.disk_template == constants.DT_FILE:
4104 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4105 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4107 if result.failed or not result.data:
4108 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4110 if not result.data[0]:
4111 raise errors.OpExecError("Failed to create directory '%s'" %
4114 # Note: this needs to be kept in sync with adding of disks in
4115 # LUSetInstanceParams
4116 for device in instance.disks:
4117 logging.info("Creating volume %s for instance %s",
4118 device.iv_name, instance.name)
4120 for node in instance.all_nodes:
4121 f_create = node == pnode
4122 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4125 def _RemoveDisks(lu, instance):
4126 """Remove all disks for an instance.
4128 This abstracts away some work from `AddInstance()` and
4129 `RemoveInstance()`. Note that in case some of the devices couldn't
4130 be removed, the removal will continue with the other ones (compare
4131 with `_CreateDisks()`).
4133 @type lu: L{LogicalUnit}
4134 @param lu: the logical unit on whose behalf we execute
4135 @type instance: L{objects.Instance}
4136 @param instance: the instance whose disks we should remove
4138 @return: the success of the removal
4141 logging.info("Removing block devices for instance %s", instance.name)
4144 for device in instance.disks:
4145 for node, disk in device.ComputeNodeTree(instance.primary_node):
4146 lu.cfg.SetDiskID(disk, node)
4147 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4149 lu.LogWarning("Could not remove block device %s on node %s,"
4150 " continuing anyway: %s", device.iv_name, node, msg)
4153 if instance.disk_template == constants.DT_FILE:
4154 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4155 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4157 if result.failed or not result.data:
4158 logging.error("Could not remove directory '%s'", file_storage_dir)
4164 def _ComputeDiskSize(disk_template, disks):
4165 """Compute disk size requirements in the volume group
4168 # Required free disk space as a function of disk and swap space
4170 constants.DT_DISKLESS: None,
4171 constants.DT_PLAIN: sum(d["size"] for d in disks),
4172 # 128 MB are added for drbd metadata for each disk
4173 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4174 constants.DT_FILE: None,
4177 if disk_template not in req_size_dict:
4178 raise errors.ProgrammerError("Disk template '%s' size requirement"
4179 " is unknown" % disk_template)
4181 return req_size_dict[disk_template]
4184 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4185 """Hypervisor parameter validation.
4187 This function abstract the hypervisor parameter validation to be
4188 used in both instance create and instance modify.
4190 @type lu: L{LogicalUnit}
4191 @param lu: the logical unit for which we check
4192 @type nodenames: list
4193 @param nodenames: the list of nodes on which we should check
4194 @type hvname: string
4195 @param hvname: the name of the hypervisor we should use
4196 @type hvparams: dict
4197 @param hvparams: the parameters which we need to check
4198 @raise errors.OpPrereqError: if the parameters are not valid
4201 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4204 for node in nodenames:
4208 msg = info.RemoteFailMsg()
4210 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4214 class LUCreateInstance(LogicalUnit):
4215 """Create an instance.
4218 HPATH = "instance-add"
4219 HTYPE = constants.HTYPE_INSTANCE
4220 _OP_REQP = ["instance_name", "disks", "disk_template",
4222 "wait_for_sync", "ip_check", "nics",
4223 "hvparams", "beparams"]
4226 def _ExpandNode(self, node):
4227 """Expands and checks one node name.
4230 node_full = self.cfg.ExpandNodeName(node)
4231 if node_full is None:
4232 raise errors.OpPrereqError("Unknown node %s" % node)
4235 def ExpandNames(self):
4236 """ExpandNames for CreateInstance.
4238 Figure out the right locks for instance creation.
4241 self.needed_locks = {}
4243 # set optional parameters to none if they don't exist
4244 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4245 if not hasattr(self.op, attr):
4246 setattr(self.op, attr, None)
4248 # cheap checks, mostly valid constants given
4250 # verify creation mode
4251 if self.op.mode not in (constants.INSTANCE_CREATE,
4252 constants.INSTANCE_IMPORT):
4253 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4256 # disk template and mirror node verification
4257 if self.op.disk_template not in constants.DISK_TEMPLATES:
4258 raise errors.OpPrereqError("Invalid disk template name")
4260 if self.op.hypervisor is None:
4261 self.op.hypervisor = self.cfg.GetHypervisorType()
4263 cluster = self.cfg.GetClusterInfo()
4264 enabled_hvs = cluster.enabled_hypervisors
4265 if self.op.hypervisor not in enabled_hvs:
4266 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4267 " cluster (%s)" % (self.op.hypervisor,
4268 ",".join(enabled_hvs)))
4270 # check hypervisor parameter syntax (locally)
4271 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4272 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4274 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4275 hv_type.CheckParameterSyntax(filled_hvp)
4277 # fill and remember the beparams dict
4278 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4279 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4282 #### instance parameters check
4284 # instance name verification
4285 hostname1 = utils.HostInfo(self.op.instance_name)
4286 self.op.instance_name = instance_name = hostname1.name
4288 # this is just a preventive check, but someone might still add this
4289 # instance in the meantime, and creation will fail at lock-add time
4290 if instance_name in self.cfg.GetInstanceList():
4291 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4294 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4298 for nic in self.op.nics:
4299 # ip validity checks
4300 ip = nic.get("ip", None)
4301 if ip is None or ip.lower() == "none":
4303 elif ip.lower() == constants.VALUE_AUTO:
4304 nic_ip = hostname1.ip
4306 if not utils.IsValidIP(ip):
4307 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4308 " like a valid IP" % ip)
4311 # MAC address verification
4312 mac = nic.get("mac", constants.VALUE_AUTO)
4313 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4314 if not utils.IsValidMac(mac.lower()):
4315 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4317 # bridge verification
4318 bridge = nic.get("bridge", None)
4320 bridge = self.cfg.GetDefBridge()
4321 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4323 # disk checks/pre-build
4325 for disk in self.op.disks:
4326 mode = disk.get("mode", constants.DISK_RDWR)
4327 if mode not in constants.DISK_ACCESS_SET:
4328 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4330 size = disk.get("size", None)
4332 raise errors.OpPrereqError("Missing disk size")
4336 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4337 self.disks.append({"size": size, "mode": mode})
4339 # used in CheckPrereq for ip ping check
4340 self.check_ip = hostname1.ip
4342 # file storage checks
4343 if (self.op.file_driver and
4344 not self.op.file_driver in constants.FILE_DRIVER):
4345 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4346 self.op.file_driver)
4348 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4349 raise errors.OpPrereqError("File storage directory path not absolute")
4351 ### Node/iallocator related checks
4352 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4353 raise errors.OpPrereqError("One and only one of iallocator and primary"
4354 " node must be given")
4356 if self.op.iallocator:
4357 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4359 self.op.pnode = self._ExpandNode(self.op.pnode)
4360 nodelist = [self.op.pnode]
4361 if self.op.snode is not None:
4362 self.op.snode = self._ExpandNode(self.op.snode)
4363 nodelist.append(self.op.snode)
4364 self.needed_locks[locking.LEVEL_NODE] = nodelist
4366 # in case of import lock the source node too
4367 if self.op.mode == constants.INSTANCE_IMPORT:
4368 src_node = getattr(self.op, "src_node", None)
4369 src_path = getattr(self.op, "src_path", None)
4371 if src_path is None:
4372 self.op.src_path = src_path = self.op.instance_name
4374 if src_node is None:
4375 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4376 self.op.src_node = None
4377 if os.path.isabs(src_path):
4378 raise errors.OpPrereqError("Importing an instance from an absolute"
4379 " path requires a source node option.")
4381 self.op.src_node = src_node = self._ExpandNode(src_node)
4382 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4383 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4384 if not os.path.isabs(src_path):
4385 self.op.src_path = src_path = \
4386 os.path.join(constants.EXPORT_DIR, src_path)
4388 else: # INSTANCE_CREATE
4389 if getattr(self.op, "os_type", None) is None:
4390 raise errors.OpPrereqError("No guest OS specified")
4392 def _RunAllocator(self):
4393 """Run the allocator based on input opcode.
4396 nics = [n.ToDict() for n in self.nics]
4397 ial = IAllocator(self,
4398 mode=constants.IALLOCATOR_MODE_ALLOC,
4399 name=self.op.instance_name,
4400 disk_template=self.op.disk_template,
4403 vcpus=self.be_full[constants.BE_VCPUS],
4404 mem_size=self.be_full[constants.BE_MEMORY],
4407 hypervisor=self.op.hypervisor,
4410 ial.Run(self.op.iallocator)
4413 raise errors.OpPrereqError("Can't compute nodes using"
4414 " iallocator '%s': %s" % (self.op.iallocator,
4416 if len(ial.nodes) != ial.required_nodes:
4417 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4418 " of nodes (%s), required %s" %
4419 (self.op.iallocator, len(ial.nodes),
4420 ial.required_nodes))
4421 self.op.pnode = ial.nodes[0]
4422 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4423 self.op.instance_name, self.op.iallocator,
4424 ", ".join(ial.nodes))
4425 if ial.required_nodes == 2:
4426 self.op.snode = ial.nodes[1]
4428 def BuildHooksEnv(self):
4431 This runs on master, primary and secondary nodes of the instance.
4435 "ADD_MODE": self.op.mode,
4437 if self.op.mode == constants.INSTANCE_IMPORT:
4438 env["SRC_NODE"] = self.op.src_node
4439 env["SRC_PATH"] = self.op.src_path
4440 env["SRC_IMAGES"] = self.src_images
4442 env.update(_BuildInstanceHookEnv(
4443 name=self.op.instance_name,
4444 primary_node=self.op.pnode,
4445 secondary_nodes=self.secondaries,
4446 status=self.op.start,
4447 os_type=self.op.os_type,
4448 memory=self.be_full[constants.BE_MEMORY],
4449 vcpus=self.be_full[constants.BE_VCPUS],
4450 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4451 disk_template=self.op.disk_template,
4452 disks=[(d["size"], d["mode"]) for d in self.disks],
4455 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4460 def CheckPrereq(self):
4461 """Check prerequisites.
4464 if (not self.cfg.GetVGName() and
4465 self.op.disk_template not in constants.DTS_NOT_LVM):
4466 raise errors.OpPrereqError("Cluster does not support lvm-based"
4469 if self.op.mode == constants.INSTANCE_IMPORT:
4470 src_node = self.op.src_node
4471 src_path = self.op.src_path
4473 if src_node is None:
4474 exp_list = self.rpc.call_export_list(
4475 self.acquired_locks[locking.LEVEL_NODE])
4477 for node in exp_list:
4478 if not exp_list[node].failed and src_path in exp_list[node].data:
4480 self.op.src_node = src_node = node
4481 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4485 raise errors.OpPrereqError("No export found for relative path %s" %
4488 _CheckNodeOnline(self, src_node)
4489 result = self.rpc.call_export_info(src_node, src_path)
4492 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4494 export_info = result.data
4495 if not export_info.has_section(constants.INISECT_EXP):
4496 raise errors.ProgrammerError("Corrupted export config")
4498 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4499 if (int(ei_version) != constants.EXPORT_VERSION):
4500 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4501 (ei_version, constants.EXPORT_VERSION))
4503 # Check that the new instance doesn't have less disks than the export
4504 instance_disks = len(self.disks)
4505 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4506 if instance_disks < export_disks:
4507 raise errors.OpPrereqError("Not enough disks to import."
4508 " (instance: %d, export: %d)" %
4509 (instance_disks, export_disks))
4511 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4513 for idx in range(export_disks):
4514 option = 'disk%d_dump' % idx
4515 if export_info.has_option(constants.INISECT_INS, option):
4516 # FIXME: are the old os-es, disk sizes, etc. useful?
4517 export_name = export_info.get(constants.INISECT_INS, option)
4518 image = os.path.join(src_path, export_name)
4519 disk_images.append(image)
4521 disk_images.append(False)
4523 self.src_images = disk_images
4525 old_name = export_info.get(constants.INISECT_INS, 'name')
4526 # FIXME: int() here could throw a ValueError on broken exports
4527 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4528 if self.op.instance_name == old_name:
4529 for idx, nic in enumerate(self.nics):
4530 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4531 nic_mac_ini = 'nic%d_mac' % idx
4532 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4534 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4535 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4536 if self.op.start and not self.op.ip_check:
4537 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4538 " adding an instance in start mode")
4540 if self.op.ip_check:
4541 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4542 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4543 (self.check_ip, self.op.instance_name))
4545 #### mac address generation
4546 # By generating here the mac address both the allocator and the hooks get
4547 # the real final mac address rather than the 'auto' or 'generate' value.
4548 # There is a race condition between the generation and the instance object
4549 # creation, which means that we know the mac is valid now, but we're not
4550 # sure it will be when we actually add the instance. If things go bad
4551 # adding the instance will abort because of a duplicate mac, and the
4552 # creation job will fail.
4553 for nic in self.nics:
4554 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4555 nic.mac = self.cfg.GenerateMAC()
4559 if self.op.iallocator is not None:
4560 self._RunAllocator()
4562 #### node related checks
4564 # check primary node
4565 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4566 assert self.pnode is not None, \
4567 "Cannot retrieve locked node %s" % self.op.pnode
4569 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4572 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4575 self.secondaries = []
4577 # mirror node verification
4578 if self.op.disk_template in constants.DTS_NET_MIRROR:
4579 if self.op.snode is None:
4580 raise errors.OpPrereqError("The networked disk templates need"
4582 if self.op.snode == pnode.name:
4583 raise errors.OpPrereqError("The secondary node cannot be"
4584 " the primary node.")
4585 _CheckNodeOnline(self, self.op.snode)
4586 _CheckNodeNotDrained(self, self.op.snode)
4587 self.secondaries.append(self.op.snode)
4589 nodenames = [pnode.name] + self.secondaries
4591 req_size = _ComputeDiskSize(self.op.disk_template,
4594 # Check lv size requirements
4595 if req_size is not None:
4596 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4598 for node in nodenames:
4599 info = nodeinfo[node]
4603 raise errors.OpPrereqError("Cannot get current information"
4604 " from node '%s'" % node)
4605 vg_free = info.get('vg_free', None)
4606 if not isinstance(vg_free, int):
4607 raise errors.OpPrereqError("Can't compute free disk space on"
4609 if req_size > info['vg_free']:
4610 raise errors.OpPrereqError("Not enough disk space on target node %s."
4611 " %d MB available, %d MB required" %
4612 (node, info['vg_free'], req_size))
4614 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4617 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4619 if not isinstance(result.data, objects.OS):
4620 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4621 " primary node" % self.op.os_type)
4623 # bridge check on primary node
4624 bridges = [n.bridge for n in self.nics]
4625 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4628 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4629 " exist on destination node '%s'" %
4630 (",".join(bridges), pnode.name))
4632 # memory check on primary node
4634 _CheckNodeFreeMemory(self, self.pnode.name,
4635 "creating instance %s" % self.op.instance_name,
4636 self.be_full[constants.BE_MEMORY],
4639 def Exec(self, feedback_fn):
4640 """Create and add the instance to the cluster.
4643 instance = self.op.instance_name
4644 pnode_name = self.pnode.name
4646 ht_kind = self.op.hypervisor
4647 if ht_kind in constants.HTS_REQ_PORT:
4648 network_port = self.cfg.AllocatePort()
4652 ##if self.op.vnc_bind_address is None:
4653 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4655 # this is needed because os.path.join does not accept None arguments
4656 if self.op.file_storage_dir is None:
4657 string_file_storage_dir = ""
4659 string_file_storage_dir = self.op.file_storage_dir
4661 # build the full file storage dir path
4662 file_storage_dir = os.path.normpath(os.path.join(
4663 self.cfg.GetFileStorageDir(),
4664 string_file_storage_dir, instance))
4667 disks = _GenerateDiskTemplate(self,
4668 self.op.disk_template,
4669 instance, pnode_name,
4673 self.op.file_driver,
4676 iobj = objects.Instance(name=instance, os=self.op.os_type,
4677 primary_node=pnode_name,
4678 nics=self.nics, disks=disks,
4679 disk_template=self.op.disk_template,
4681 network_port=network_port,
4682 beparams=self.op.beparams,
4683 hvparams=self.op.hvparams,
4684 hypervisor=self.op.hypervisor,
4687 feedback_fn("* creating instance disks...")
4689 _CreateDisks(self, iobj)
4690 except errors.OpExecError:
4691 self.LogWarning("Device creation failed, reverting...")
4693 _RemoveDisks(self, iobj)
4695 self.cfg.ReleaseDRBDMinors(instance)
4698 feedback_fn("adding instance %s to cluster config" % instance)
4700 self.cfg.AddInstance(iobj)
4701 # Declare that we don't want to remove the instance lock anymore, as we've
4702 # added the instance to the config
4703 del self.remove_locks[locking.LEVEL_INSTANCE]
4704 # Unlock all the nodes
4705 if self.op.mode == constants.INSTANCE_IMPORT:
4706 nodes_keep = [self.op.src_node]
4707 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4708 if node != self.op.src_node]
4709 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4710 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4712 self.context.glm.release(locking.LEVEL_NODE)
4713 del self.acquired_locks[locking.LEVEL_NODE]
4715 if self.op.wait_for_sync:
4716 disk_abort = not _WaitForSync(self, iobj)
4717 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4718 # make sure the disks are not degraded (still sync-ing is ok)
4720 feedback_fn("* checking mirrors status")
4721 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4726 _RemoveDisks(self, iobj)
4727 self.cfg.RemoveInstance(iobj.name)
4728 # Make sure the instance lock gets removed
4729 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4730 raise errors.OpExecError("There are some degraded disks for"
4733 feedback_fn("creating os for instance %s on node %s" %
4734 (instance, pnode_name))
4736 if iobj.disk_template != constants.DT_DISKLESS:
4737 if self.op.mode == constants.INSTANCE_CREATE:
4738 feedback_fn("* running the instance OS create scripts...")
4739 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4740 msg = result.RemoteFailMsg()
4742 raise errors.OpExecError("Could not add os for instance %s"
4744 (instance, pnode_name, msg))
4746 elif self.op.mode == constants.INSTANCE_IMPORT:
4747 feedback_fn("* running the instance OS import scripts...")
4748 src_node = self.op.src_node
4749 src_images = self.src_images
4750 cluster_name = self.cfg.GetClusterName()
4751 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4752 src_node, src_images,
4754 import_result.Raise()
4755 for idx, result in enumerate(import_result.data):
4757 self.LogWarning("Could not import the image %s for instance"
4758 " %s, disk %d, on node %s" %
4759 (src_images[idx], instance, idx, pnode_name))
4761 # also checked in the prereq part
4762 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4766 iobj.admin_up = True
4767 self.cfg.Update(iobj)
4768 logging.info("Starting instance %s on node %s", instance, pnode_name)
4769 feedback_fn("* starting instance...")
4770 result = self.rpc.call_instance_start(pnode_name, iobj)
4771 msg = result.RemoteFailMsg()
4773 raise errors.OpExecError("Could not start instance: %s" % msg)
4776 class LUConnectConsole(NoHooksLU):
4777 """Connect to an instance's console.
4779 This is somewhat special in that it returns the command line that
4780 you need to run on the master node in order to connect to the
4784 _OP_REQP = ["instance_name"]
4787 def ExpandNames(self):
4788 self._ExpandAndLockInstance()
4790 def CheckPrereq(self):
4791 """Check prerequisites.
4793 This checks that the instance is in the cluster.
4796 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4797 assert self.instance is not None, \
4798 "Cannot retrieve locked instance %s" % self.op.instance_name
4799 _CheckNodeOnline(self, self.instance.primary_node)
4801 def Exec(self, feedback_fn):
4802 """Connect to the console of an instance
4805 instance = self.instance
4806 node = instance.primary_node
4808 node_insts = self.rpc.call_instance_list([node],
4809 [instance.hypervisor])[node]
4812 if instance.name not in node_insts.data:
4813 raise errors.OpExecError("Instance %s is not running." % instance.name)
4815 logging.debug("Connecting to console of %s on %s", instance.name, node)
4817 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4818 cluster = self.cfg.GetClusterInfo()
4819 # beparams and hvparams are passed separately, to avoid editing the
4820 # instance and then saving the defaults in the instance itself.
4821 hvparams = cluster.FillHV(instance)
4822 beparams = cluster.FillBE(instance)
4823 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4826 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4829 class LUReplaceDisks(LogicalUnit):
4830 """Replace the disks of an instance.
4833 HPATH = "mirrors-replace"
4834 HTYPE = constants.HTYPE_INSTANCE
4835 _OP_REQP = ["instance_name", "mode", "disks"]
4838 def CheckArguments(self):
4839 if not hasattr(self.op, "remote_node"):
4840 self.op.remote_node = None
4841 if not hasattr(self.op, "iallocator"):
4842 self.op.iallocator = None
4844 # check for valid parameter combination
4845 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4846 if self.op.mode == constants.REPLACE_DISK_CHG:
4848 raise errors.OpPrereqError("When changing the secondary either an"
4849 " iallocator script must be used or the"
4852 raise errors.OpPrereqError("Give either the iallocator or the new"
4853 " secondary, not both")
4854 else: # not replacing the secondary
4856 raise errors.OpPrereqError("The iallocator and new node options can"
4857 " be used only when changing the"
4860 def ExpandNames(self):
4861 self._ExpandAndLockInstance()
4863 if self.op.iallocator is not None:
4864 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4865 elif self.op.remote_node is not None:
4866 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4867 if remote_node is None:
4868 raise errors.OpPrereqError("Node '%s' not known" %
4869 self.op.remote_node)
4870 self.op.remote_node = remote_node
4871 # Warning: do not remove the locking of the new secondary here
4872 # unless DRBD8.AddChildren is changed to work in parallel;
4873 # currently it doesn't since parallel invocations of
4874 # FindUnusedMinor will conflict
4875 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4876 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4878 self.needed_locks[locking.LEVEL_NODE] = []
4879 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4881 def DeclareLocks(self, level):
4882 # If we're not already locking all nodes in the set we have to declare the
4883 # instance's primary/secondary nodes.
4884 if (level == locking.LEVEL_NODE and
4885 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4886 self._LockInstancesNodes()
4888 def _RunAllocator(self):
4889 """Compute a new secondary node using an IAllocator.
4892 ial = IAllocator(self,
4893 mode=constants.IALLOCATOR_MODE_RELOC,
4894 name=self.op.instance_name,
4895 relocate_from=[self.sec_node])
4897 ial.Run(self.op.iallocator)
4900 raise errors.OpPrereqError("Can't compute nodes using"
4901 " iallocator '%s': %s" % (self.op.iallocator,
4903 if len(ial.nodes) != ial.required_nodes:
4904 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4905 " of nodes (%s), required %s" %
4906 (len(ial.nodes), ial.required_nodes))
4907 self.op.remote_node = ial.nodes[0]
4908 self.LogInfo("Selected new secondary for the instance: %s",
4909 self.op.remote_node)
4911 def BuildHooksEnv(self):
4914 This runs on the master, the primary and all the secondaries.
4918 "MODE": self.op.mode,
4919 "NEW_SECONDARY": self.op.remote_node,
4920 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4922 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4924 self.cfg.GetMasterNode(),
4925 self.instance.primary_node,
4927 if self.op.remote_node is not None:
4928 nl.append(self.op.remote_node)
4931 def CheckPrereq(self):
4932 """Check prerequisites.
4934 This checks that the instance is in the cluster.
4937 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4938 assert instance is not None, \
4939 "Cannot retrieve locked instance %s" % self.op.instance_name
4940 self.instance = instance
4942 if instance.disk_template != constants.DT_DRBD8:
4943 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4946 if len(instance.secondary_nodes) != 1:
4947 raise errors.OpPrereqError("The instance has a strange layout,"
4948 " expected one secondary but found %d" %
4949 len(instance.secondary_nodes))
4951 self.sec_node = instance.secondary_nodes[0]
4953 if self.op.iallocator is not None:
4954 self._RunAllocator()
4956 remote_node = self.op.remote_node
4957 if remote_node is not None:
4958 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4959 assert self.remote_node_info is not None, \
4960 "Cannot retrieve locked node %s" % remote_node
4962 self.remote_node_info = None
4963 if remote_node == instance.primary_node:
4964 raise errors.OpPrereqError("The specified node is the primary node of"
4966 elif remote_node == self.sec_node:
4967 raise errors.OpPrereqError("The specified node is already the"
4968 " secondary node of the instance.")
4970 if self.op.mode == constants.REPLACE_DISK_PRI:
4971 n1 = self.tgt_node = instance.primary_node
4972 n2 = self.oth_node = self.sec_node
4973 elif self.op.mode == constants.REPLACE_DISK_SEC:
4974 n1 = self.tgt_node = self.sec_node
4975 n2 = self.oth_node = instance.primary_node
4976 elif self.op.mode == constants.REPLACE_DISK_CHG:
4977 n1 = self.new_node = remote_node
4978 n2 = self.oth_node = instance.primary_node
4979 self.tgt_node = self.sec_node
4980 _CheckNodeNotDrained(self, remote_node)
4982 raise errors.ProgrammerError("Unhandled disk replace mode")
4984 _CheckNodeOnline(self, n1)
4985 _CheckNodeOnline(self, n2)
4987 if not self.op.disks:
4988 self.op.disks = range(len(instance.disks))
4990 for disk_idx in self.op.disks:
4991 instance.FindDisk(disk_idx)
4993 def _ExecD8DiskOnly(self, feedback_fn):
4994 """Replace a disk on the primary or secondary for dbrd8.
4996 The algorithm for replace is quite complicated:
4998 1. for each disk to be replaced:
5000 1. create new LVs on the target node with unique names
5001 1. detach old LVs from the drbd device
5002 1. rename old LVs to name_replaced.<time_t>
5003 1. rename new LVs to old LVs
5004 1. attach the new LVs (with the old names now) to the drbd device
5006 1. wait for sync across all devices
5008 1. for each modified disk:
5010 1. remove old LVs (which have the name name_replaces.<time_t>)
5012 Failures are not very well handled.
5016 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5017 instance = self.instance
5019 vgname = self.cfg.GetVGName()
5022 tgt_node = self.tgt_node
5023 oth_node = self.oth_node
5025 # Step: check device activation
5026 self.proc.LogStep(1, steps_total, "check device existence")
5027 info("checking volume groups")
5028 my_vg = cfg.GetVGName()
5029 results = self.rpc.call_vg_list([oth_node, tgt_node])
5031 raise errors.OpExecError("Can't list volume groups on the nodes")
5032 for node in oth_node, tgt_node:
5034 if res.failed or not res.data or my_vg not in res.data:
5035 raise errors.OpExecError("Volume group '%s' not found on %s" %
5037 for idx, dev in enumerate(instance.disks):
5038 if idx not in self.op.disks:
5040 for node in tgt_node, oth_node:
5041 info("checking disk/%d on %s" % (idx, node))
5042 cfg.SetDiskID(dev, node)
5043 result = self.rpc.call_blockdev_find(node, dev)
5044 msg = result.RemoteFailMsg()
5045 if not msg and not result.payload:
5046 msg = "disk not found"
5048 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5051 # Step: check other node consistency
5052 self.proc.LogStep(2, steps_total, "check peer consistency")
5053 for idx, dev in enumerate(instance.disks):
5054 if idx not in self.op.disks:
5056 info("checking disk/%d consistency on %s" % (idx, oth_node))
5057 if not _CheckDiskConsistency(self, dev, oth_node,
5058 oth_node==instance.primary_node):
5059 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5060 " to replace disks on this node (%s)" %
5061 (oth_node, tgt_node))
5063 # Step: create new storage
5064 self.proc.LogStep(3, steps_total, "allocate new storage")
5065 for idx, dev in enumerate(instance.disks):
5066 if idx not in self.op.disks:
5069 cfg.SetDiskID(dev, tgt_node)
5070 lv_names = [".disk%d_%s" % (idx, suf)
5071 for suf in ["data", "meta"]]
5072 names = _GenerateUniqueNames(self, lv_names)
5073 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5074 logical_id=(vgname, names[0]))
5075 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5076 logical_id=(vgname, names[1]))
5077 new_lvs = [lv_data, lv_meta]
5078 old_lvs = dev.children
5079 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5080 info("creating new local storage on %s for %s" %
5081 (tgt_node, dev.iv_name))
5082 # we pass force_create=True to force the LVM creation
5083 for new_lv in new_lvs:
5084 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5085 _GetInstanceInfoText(instance), False)
5087 # Step: for each lv, detach+rename*2+attach
5088 self.proc.LogStep(4, steps_total, "change drbd configuration")
5089 for dev, old_lvs, new_lvs in iv_names.itervalues():
5090 info("detaching %s drbd from local storage" % dev.iv_name)
5091 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5094 raise errors.OpExecError("Can't detach drbd from local storage on node"
5095 " %s for device %s" % (tgt_node, dev.iv_name))
5097 #cfg.Update(instance)
5099 # ok, we created the new LVs, so now we know we have the needed
5100 # storage; as such, we proceed on the target node to rename
5101 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5102 # using the assumption that logical_id == physical_id (which in
5103 # turn is the unique_id on that node)
5105 # FIXME(iustin): use a better name for the replaced LVs
5106 temp_suffix = int(time.time())
5107 ren_fn = lambda d, suff: (d.physical_id[0],
5108 d.physical_id[1] + "_replaced-%s" % suff)
5109 # build the rename list based on what LVs exist on the node
5111 for to_ren in old_lvs:
5112 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5113 if not result.RemoteFailMsg() and result.payload:
5115 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5117 info("renaming the old LVs on the target node")
5118 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5121 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5122 # now we rename the new LVs to the old LVs
5123 info("renaming the new LVs on the target node")
5124 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5125 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5128 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5130 for old, new in zip(old_lvs, new_lvs):
5131 new.logical_id = old.logical_id
5132 cfg.SetDiskID(new, tgt_node)
5134 for disk in old_lvs:
5135 disk.logical_id = ren_fn(disk, temp_suffix)
5136 cfg.SetDiskID(disk, tgt_node)
5138 # now that the new lvs have the old name, we can add them to the device
5139 info("adding new mirror component on %s" % tgt_node)
5140 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5141 if result.failed or not result.data:
5142 for new_lv in new_lvs:
5143 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5145 warning("Can't rollback device %s: %s", dev, msg,
5146 hint="cleanup manually the unused logical volumes")
5147 raise errors.OpExecError("Can't add local storage to drbd")
5149 dev.children = new_lvs
5150 cfg.Update(instance)
5152 # Step: wait for sync
5154 # this can fail as the old devices are degraded and _WaitForSync
5155 # does a combined result over all disks, so we don't check its
5157 self.proc.LogStep(5, steps_total, "sync devices")
5158 _WaitForSync(self, instance, unlock=True)
5160 # so check manually all the devices
5161 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5162 cfg.SetDiskID(dev, instance.primary_node)
5163 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5164 msg = result.RemoteFailMsg()
5165 if not msg and not result.payload:
5166 msg = "disk not found"
5168 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5170 if result.payload[5]:
5171 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5173 # Step: remove old storage
5174 self.proc.LogStep(6, steps_total, "removing old storage")
5175 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5176 info("remove logical volumes for %s" % name)
5178 cfg.SetDiskID(lv, tgt_node)
5179 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5181 warning("Can't remove old LV: %s" % msg,
5182 hint="manually remove unused LVs")
5185 def _ExecD8Secondary(self, feedback_fn):
5186 """Replace the secondary node for drbd8.
5188 The algorithm for replace is quite complicated:
5189 - for all disks of the instance:
5190 - create new LVs on the new node with same names
5191 - shutdown the drbd device on the old secondary
5192 - disconnect the drbd network on the primary
5193 - create the drbd device on the new secondary
5194 - network attach the drbd on the primary, using an artifice:
5195 the drbd code for Attach() will connect to the network if it
5196 finds a device which is connected to the good local disks but
5198 - wait for sync across all devices
5199 - remove all disks from the old secondary
5201 Failures are not very well handled.
5205 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5206 instance = self.instance
5210 old_node = self.tgt_node
5211 new_node = self.new_node
5212 pri_node = instance.primary_node
5214 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5215 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5216 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5219 # Step: check device activation
5220 self.proc.LogStep(1, steps_total, "check device existence")
5221 info("checking volume groups")
5222 my_vg = cfg.GetVGName()
5223 results = self.rpc.call_vg_list([pri_node, new_node])
5224 for node in pri_node, new_node:
5226 if res.failed or not res.data or my_vg not in res.data:
5227 raise errors.OpExecError("Volume group '%s' not found on %s" %
5229 for idx, dev in enumerate(instance.disks):
5230 if idx not in self.op.disks:
5232 info("checking disk/%d on %s" % (idx, pri_node))
5233 cfg.SetDiskID(dev, pri_node)
5234 result = self.rpc.call_blockdev_find(pri_node, dev)
5235 msg = result.RemoteFailMsg()
5236 if not msg and not result.payload:
5237 msg = "disk not found"
5239 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5240 (idx, pri_node, msg))
5242 # Step: check other node consistency
5243 self.proc.LogStep(2, steps_total, "check peer consistency")
5244 for idx, dev in enumerate(instance.disks):
5245 if idx not in self.op.disks:
5247 info("checking disk/%d consistency on %s" % (idx, pri_node))
5248 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5249 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5250 " unsafe to replace the secondary" %
5253 # Step: create new storage
5254 self.proc.LogStep(3, steps_total, "allocate new storage")
5255 for idx, dev in enumerate(instance.disks):
5256 info("adding new local storage on %s for disk/%d" %
5258 # we pass force_create=True to force LVM creation
5259 for new_lv in dev.children:
5260 _CreateBlockDev(self, new_node, instance, new_lv, True,
5261 _GetInstanceInfoText(instance), False)
5263 # Step 4: dbrd minors and drbd setups changes
5264 # after this, we must manually remove the drbd minors on both the
5265 # error and the success paths
5266 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5268 logging.debug("Allocated minors %s" % (minors,))
5269 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5270 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5272 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5273 # create new devices on new_node; note that we create two IDs:
5274 # one without port, so the drbd will be activated without
5275 # networking information on the new node at this stage, and one
5276 # with network, for the latter activation in step 4
5277 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5278 if pri_node == o_node1:
5283 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5284 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5286 iv_names[idx] = (dev, dev.children, new_net_id)
5287 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5289 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5290 logical_id=new_alone_id,
5291 children=dev.children)
5293 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5294 _GetInstanceInfoText(instance), False)
5295 except errors.BlockDeviceError:
5296 self.cfg.ReleaseDRBDMinors(instance.name)
5299 for idx, dev in enumerate(instance.disks):
5300 # we have new devices, shutdown the drbd on the old secondary
5301 info("shutting down drbd for disk/%d on old node" % idx)
5302 cfg.SetDiskID(dev, old_node)
5303 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5305 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5307 hint="Please cleanup this device manually as soon as possible")
5309 info("detaching primary drbds from the network (=> standalone)")
5310 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5311 instance.disks)[pri_node]
5313 msg = result.RemoteFailMsg()
5315 # detaches didn't succeed (unlikely)
5316 self.cfg.ReleaseDRBDMinors(instance.name)
5317 raise errors.OpExecError("Can't detach the disks from the network on"
5318 " old node: %s" % (msg,))
5320 # if we managed to detach at least one, we update all the disks of
5321 # the instance to point to the new secondary
5322 info("updating instance configuration")
5323 for dev, _, new_logical_id in iv_names.itervalues():
5324 dev.logical_id = new_logical_id
5325 cfg.SetDiskID(dev, pri_node)
5326 cfg.Update(instance)
5328 # and now perform the drbd attach
5329 info("attaching primary drbds to new secondary (standalone => connected)")
5330 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5331 instance.disks, instance.name,
5333 for to_node, to_result in result.items():
5334 msg = to_result.RemoteFailMsg()
5336 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5337 hint="please do a gnt-instance info to see the"
5340 # this can fail as the old devices are degraded and _WaitForSync
5341 # does a combined result over all disks, so we don't check its
5343 self.proc.LogStep(5, steps_total, "sync devices")
5344 _WaitForSync(self, instance, unlock=True)
5346 # so check manually all the devices
5347 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5348 cfg.SetDiskID(dev, pri_node)
5349 result = self.rpc.call_blockdev_find(pri_node, dev)
5350 msg = result.RemoteFailMsg()
5351 if not msg and not result.payload:
5352 msg = "disk not found"
5354 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5356 if result.payload[5]:
5357 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5359 self.proc.LogStep(6, steps_total, "removing old storage")
5360 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5361 info("remove logical volumes for disk/%d" % idx)
5363 cfg.SetDiskID(lv, old_node)
5364 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5366 warning("Can't remove LV on old secondary: %s", msg,
5367 hint="Cleanup stale volumes by hand")
5369 def Exec(self, feedback_fn):
5370 """Execute disk replacement.
5372 This dispatches the disk replacement to the appropriate handler.
5375 instance = self.instance
5377 # Activate the instance disks if we're replacing them on a down instance
5378 if not instance.admin_up:
5379 _StartInstanceDisks(self, instance, True)
5381 if self.op.mode == constants.REPLACE_DISK_CHG:
5382 fn = self._ExecD8Secondary
5384 fn = self._ExecD8DiskOnly
5386 ret = fn(feedback_fn)
5388 # Deactivate the instance disks if we're replacing them on a down instance
5389 if not instance.admin_up:
5390 _SafeShutdownInstanceDisks(self, instance)
5395 class LUGrowDisk(LogicalUnit):
5396 """Grow a disk of an instance.
5400 HTYPE = constants.HTYPE_INSTANCE
5401 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5404 def ExpandNames(self):
5405 self._ExpandAndLockInstance()
5406 self.needed_locks[locking.LEVEL_NODE] = []
5407 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5409 def DeclareLocks(self, level):
5410 if level == locking.LEVEL_NODE:
5411 self._LockInstancesNodes()
5413 def BuildHooksEnv(self):
5416 This runs on the master, the primary and all the secondaries.
5420 "DISK": self.op.disk,
5421 "AMOUNT": self.op.amount,
5423 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5425 self.cfg.GetMasterNode(),
5426 self.instance.primary_node,
5430 def CheckPrereq(self):
5431 """Check prerequisites.
5433 This checks that the instance is in the cluster.
5436 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5437 assert instance is not None, \
5438 "Cannot retrieve locked instance %s" % self.op.instance_name
5439 nodenames = list(instance.all_nodes)
5440 for node in nodenames:
5441 _CheckNodeOnline(self, node)
5444 self.instance = instance
5446 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5447 raise errors.OpPrereqError("Instance's disk layout does not support"
5450 self.disk = instance.FindDisk(self.op.disk)
5452 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5453 instance.hypervisor)
5454 for node in nodenames:
5455 info = nodeinfo[node]
5456 if info.failed or not info.data:
5457 raise errors.OpPrereqError("Cannot get current information"
5458 " from node '%s'" % node)
5459 vg_free = info.data.get('vg_free', None)
5460 if not isinstance(vg_free, int):
5461 raise errors.OpPrereqError("Can't compute free disk space on"
5463 if self.op.amount > vg_free:
5464 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5465 " %d MiB available, %d MiB required" %
5466 (node, vg_free, self.op.amount))
5468 def Exec(self, feedback_fn):
5469 """Execute disk grow.
5472 instance = self.instance
5474 for node in instance.all_nodes:
5475 self.cfg.SetDiskID(disk, node)
5476 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5477 msg = result.RemoteFailMsg()
5479 raise errors.OpExecError("Grow request failed to node %s: %s" %
5481 disk.RecordGrow(self.op.amount)
5482 self.cfg.Update(instance)
5483 if self.op.wait_for_sync:
5484 disk_abort = not _WaitForSync(self, instance)
5486 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5487 " status.\nPlease check the instance.")
5490 class LUQueryInstanceData(NoHooksLU):
5491 """Query runtime instance data.
5494 _OP_REQP = ["instances", "static"]
5497 def ExpandNames(self):
5498 self.needed_locks = {}
5499 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5501 if not isinstance(self.op.instances, list):
5502 raise errors.OpPrereqError("Invalid argument type 'instances'")
5504 if self.op.instances:
5505 self.wanted_names = []
5506 for name in self.op.instances:
5507 full_name = self.cfg.ExpandInstanceName(name)
5508 if full_name is None:
5509 raise errors.OpPrereqError("Instance '%s' not known" % name)
5510 self.wanted_names.append(full_name)
5511 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5513 self.wanted_names = None
5514 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5516 self.needed_locks[locking.LEVEL_NODE] = []
5517 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5519 def DeclareLocks(self, level):
5520 if level == locking.LEVEL_NODE:
5521 self._LockInstancesNodes()
5523 def CheckPrereq(self):
5524 """Check prerequisites.
5526 This only checks the optional instance list against the existing names.
5529 if self.wanted_names is None:
5530 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5532 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5533 in self.wanted_names]
5536 def _ComputeDiskStatus(self, instance, snode, dev):
5537 """Compute block device status.
5540 static = self.op.static
5542 self.cfg.SetDiskID(dev, instance.primary_node)
5543 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5544 if dev_pstatus.offline:
5547 msg = dev_pstatus.RemoteFailMsg()
5549 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5550 (instance.name, msg))
5551 dev_pstatus = dev_pstatus.payload
5555 if dev.dev_type in constants.LDS_DRBD:
5556 # we change the snode then (otherwise we use the one passed in)
5557 if dev.logical_id[0] == instance.primary_node:
5558 snode = dev.logical_id[1]
5560 snode = dev.logical_id[0]
5562 if snode and not static:
5563 self.cfg.SetDiskID(dev, snode)
5564 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5565 if dev_sstatus.offline:
5568 msg = dev_sstatus.RemoteFailMsg()
5570 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5571 (instance.name, msg))
5572 dev_sstatus = dev_sstatus.payload
5577 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5578 for child in dev.children]
5583 "iv_name": dev.iv_name,
5584 "dev_type": dev.dev_type,
5585 "logical_id": dev.logical_id,
5586 "physical_id": dev.physical_id,
5587 "pstatus": dev_pstatus,
5588 "sstatus": dev_sstatus,
5589 "children": dev_children,
5595 def Exec(self, feedback_fn):
5596 """Gather and return data"""
5599 cluster = self.cfg.GetClusterInfo()
5601 for instance in self.wanted_instances:
5602 if not self.op.static:
5603 remote_info = self.rpc.call_instance_info(instance.primary_node,
5605 instance.hypervisor)
5607 remote_info = remote_info.data
5608 if remote_info and "state" in remote_info:
5611 remote_state = "down"
5614 if instance.admin_up:
5617 config_state = "down"
5619 disks = [self._ComputeDiskStatus(instance, None, device)
5620 for device in instance.disks]
5623 "name": instance.name,
5624 "config_state": config_state,
5625 "run_state": remote_state,
5626 "pnode": instance.primary_node,
5627 "snodes": instance.secondary_nodes,
5629 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5631 "hypervisor": instance.hypervisor,
5632 "network_port": instance.network_port,
5633 "hv_instance": instance.hvparams,
5634 "hv_actual": cluster.FillHV(instance),
5635 "be_instance": instance.beparams,
5636 "be_actual": cluster.FillBE(instance),
5639 result[instance.name] = idict
5644 class LUSetInstanceParams(LogicalUnit):
5645 """Modifies an instances's parameters.
5648 HPATH = "instance-modify"
5649 HTYPE = constants.HTYPE_INSTANCE
5650 _OP_REQP = ["instance_name"]
5653 def CheckArguments(self):
5654 if not hasattr(self.op, 'nics'):
5656 if not hasattr(self.op, 'disks'):
5658 if not hasattr(self.op, 'beparams'):
5659 self.op.beparams = {}
5660 if not hasattr(self.op, 'hvparams'):
5661 self.op.hvparams = {}
5662 self.op.force = getattr(self.op, "force", False)
5663 if not (self.op.nics or self.op.disks or
5664 self.op.hvparams or self.op.beparams):
5665 raise errors.OpPrereqError("No changes submitted")
5669 for disk_op, disk_dict in self.op.disks:
5670 if disk_op == constants.DDM_REMOVE:
5673 elif disk_op == constants.DDM_ADD:
5676 if not isinstance(disk_op, int):
5677 raise errors.OpPrereqError("Invalid disk index")
5678 if disk_op == constants.DDM_ADD:
5679 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5680 if mode not in constants.DISK_ACCESS_SET:
5681 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5682 size = disk_dict.get('size', None)
5684 raise errors.OpPrereqError("Required disk parameter size missing")
5687 except ValueError, err:
5688 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5690 disk_dict['size'] = size
5692 # modification of disk
5693 if 'size' in disk_dict:
5694 raise errors.OpPrereqError("Disk size change not possible, use"
5697 if disk_addremove > 1:
5698 raise errors.OpPrereqError("Only one disk add or remove operation"
5699 " supported at a time")
5703 for nic_op, nic_dict in self.op.nics:
5704 if nic_op == constants.DDM_REMOVE:
5707 elif nic_op == constants.DDM_ADD:
5710 if not isinstance(nic_op, int):
5711 raise errors.OpPrereqError("Invalid nic index")
5713 # nic_dict should be a dict
5714 nic_ip = nic_dict.get('ip', None)
5715 if nic_ip is not None:
5716 if nic_ip.lower() == constants.VALUE_NONE:
5717 nic_dict['ip'] = None
5719 if not utils.IsValidIP(nic_ip):
5720 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5722 if nic_op == constants.DDM_ADD:
5723 nic_bridge = nic_dict.get('bridge', None)
5724 if nic_bridge is None:
5725 nic_dict['bridge'] = self.cfg.GetDefBridge()
5726 nic_mac = nic_dict.get('mac', None)
5728 nic_dict['mac'] = constants.VALUE_AUTO
5730 if 'mac' in nic_dict:
5731 nic_mac = nic_dict['mac']
5732 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5733 if not utils.IsValidMac(nic_mac):
5734 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5735 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5736 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5737 " modifying an existing nic")
5739 if nic_addremove > 1:
5740 raise errors.OpPrereqError("Only one NIC add or remove operation"
5741 " supported at a time")
5743 def ExpandNames(self):
5744 self._ExpandAndLockInstance()
5745 self.needed_locks[locking.LEVEL_NODE] = []
5746 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5748 def DeclareLocks(self, level):
5749 if level == locking.LEVEL_NODE:
5750 self._LockInstancesNodes()
5752 def BuildHooksEnv(self):
5755 This runs on the master, primary and secondaries.
5759 if constants.BE_MEMORY in self.be_new:
5760 args['memory'] = self.be_new[constants.BE_MEMORY]
5761 if constants.BE_VCPUS in self.be_new:
5762 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5763 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5764 # information at all.
5767 nic_override = dict(self.op.nics)
5768 for idx, nic in enumerate(self.instance.nics):
5769 if idx in nic_override:
5770 this_nic_override = nic_override[idx]
5772 this_nic_override = {}
5773 if 'ip' in this_nic_override:
5774 ip = this_nic_override['ip']
5777 if 'bridge' in this_nic_override:
5778 bridge = this_nic_override['bridge']
5781 if 'mac' in this_nic_override:
5782 mac = this_nic_override['mac']
5785 args['nics'].append((ip, bridge, mac))
5786 if constants.DDM_ADD in nic_override:
5787 ip = nic_override[constants.DDM_ADD].get('ip', None)
5788 bridge = nic_override[constants.DDM_ADD]['bridge']
5789 mac = nic_override[constants.DDM_ADD]['mac']
5790 args['nics'].append((ip, bridge, mac))
5791 elif constants.DDM_REMOVE in nic_override:
5792 del args['nics'][-1]
5794 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5795 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5798 def CheckPrereq(self):
5799 """Check prerequisites.
5801 This only checks the instance list against the existing names.
5804 force = self.force = self.op.force
5806 # checking the new params on the primary/secondary nodes
5808 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5809 assert self.instance is not None, \
5810 "Cannot retrieve locked instance %s" % self.op.instance_name
5811 pnode = instance.primary_node
5812 nodelist = list(instance.all_nodes)
5814 # hvparams processing
5815 if self.op.hvparams:
5816 i_hvdict = copy.deepcopy(instance.hvparams)
5817 for key, val in self.op.hvparams.iteritems():
5818 if val == constants.VALUE_DEFAULT:
5825 cluster = self.cfg.GetClusterInfo()
5826 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5827 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5830 hypervisor.GetHypervisor(
5831 instance.hypervisor).CheckParameterSyntax(hv_new)
5832 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5833 self.hv_new = hv_new # the new actual values
5834 self.hv_inst = i_hvdict # the new dict (without defaults)
5836 self.hv_new = self.hv_inst = {}
5838 # beparams processing
5839 if self.op.beparams:
5840 i_bedict = copy.deepcopy(instance.beparams)
5841 for key, val in self.op.beparams.iteritems():
5842 if val == constants.VALUE_DEFAULT:
5849 cluster = self.cfg.GetClusterInfo()
5850 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5851 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5853 self.be_new = be_new # the new actual values
5854 self.be_inst = i_bedict # the new dict (without defaults)
5856 self.be_new = self.be_inst = {}
5860 if constants.BE_MEMORY in self.op.beparams and not self.force:
5861 mem_check_list = [pnode]
5862 if be_new[constants.BE_AUTO_BALANCE]:
5863 # either we changed auto_balance to yes or it was from before
5864 mem_check_list.extend(instance.secondary_nodes)
5865 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5866 instance.hypervisor)
5867 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5868 instance.hypervisor)
5869 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5870 # Assume the primary node is unreachable and go ahead
5871 self.warn.append("Can't get info from primary node %s" % pnode)
5873 if not instance_info.failed and instance_info.data:
5874 current_mem = instance_info.data['memory']
5876 # Assume instance not running
5877 # (there is a slight race condition here, but it's not very probable,
5878 # and we have no other way to check)
5880 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5881 nodeinfo[pnode].data['memory_free'])
5883 raise errors.OpPrereqError("This change will prevent the instance"
5884 " from starting, due to %d MB of memory"
5885 " missing on its primary node" % miss_mem)
5887 if be_new[constants.BE_AUTO_BALANCE]:
5888 for node, nres in nodeinfo.iteritems():
5889 if node not in instance.secondary_nodes:
5891 if nres.failed or not isinstance(nres.data, dict):
5892 self.warn.append("Can't get info from secondary node %s" % node)
5893 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5894 self.warn.append("Not enough memory to failover instance to"
5895 " secondary node %s" % node)
5898 for nic_op, nic_dict in self.op.nics:
5899 if nic_op == constants.DDM_REMOVE:
5900 if not instance.nics:
5901 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5903 if nic_op != constants.DDM_ADD:
5905 if nic_op < 0 or nic_op >= len(instance.nics):
5906 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5908 (nic_op, len(instance.nics)))
5909 if 'bridge' in nic_dict:
5910 nic_bridge = nic_dict['bridge']
5911 if nic_bridge is None:
5912 raise errors.OpPrereqError('Cannot set the nic bridge to None')
5913 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5914 msg = ("Bridge '%s' doesn't exist on one of"
5915 " the instance nodes" % nic_bridge)
5917 self.warn.append(msg)
5919 raise errors.OpPrereqError(msg)
5920 if 'mac' in nic_dict:
5921 nic_mac = nic_dict['mac']
5923 raise errors.OpPrereqError('Cannot set the nic mac to None')
5924 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5925 # otherwise generate the mac
5926 nic_dict['mac'] = self.cfg.GenerateMAC()
5928 # or validate/reserve the current one
5929 if self.cfg.IsMacInUse(nic_mac):
5930 raise errors.OpPrereqError("MAC address %s already in use"
5931 " in cluster" % nic_mac)
5934 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5935 raise errors.OpPrereqError("Disk operations not supported for"
5936 " diskless instances")
5937 for disk_op, disk_dict in self.op.disks:
5938 if disk_op == constants.DDM_REMOVE:
5939 if len(instance.disks) == 1:
5940 raise errors.OpPrereqError("Cannot remove the last disk of"
5942 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5943 ins_l = ins_l[pnode]
5944 if ins_l.failed or not isinstance(ins_l.data, list):
5945 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5946 if instance.name in ins_l.data:
5947 raise errors.OpPrereqError("Instance is running, can't remove"
5950 if (disk_op == constants.DDM_ADD and
5951 len(instance.nics) >= constants.MAX_DISKS):
5952 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5953 " add more" % constants.MAX_DISKS)
5954 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5956 if disk_op < 0 or disk_op >= len(instance.disks):
5957 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5959 (disk_op, len(instance.disks)))
5963 def Exec(self, feedback_fn):
5964 """Modifies an instance.
5966 All parameters take effect only at the next restart of the instance.
5969 # Process here the warnings from CheckPrereq, as we don't have a
5970 # feedback_fn there.
5971 for warn in self.warn:
5972 feedback_fn("WARNING: %s" % warn)
5975 instance = self.instance
5977 for disk_op, disk_dict in self.op.disks:
5978 if disk_op == constants.DDM_REMOVE:
5979 # remove the last disk
5980 device = instance.disks.pop()
5981 device_idx = len(instance.disks)
5982 for node, disk in device.ComputeNodeTree(instance.primary_node):
5983 self.cfg.SetDiskID(disk, node)
5984 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5986 self.LogWarning("Could not remove disk/%d on node %s: %s,"
5987 " continuing anyway", device_idx, node, msg)
5988 result.append(("disk/%d" % device_idx, "remove"))
5989 elif disk_op == constants.DDM_ADD:
5991 if instance.disk_template == constants.DT_FILE:
5992 file_driver, file_path = instance.disks[0].logical_id
5993 file_path = os.path.dirname(file_path)
5995 file_driver = file_path = None
5996 disk_idx_base = len(instance.disks)
5997 new_disk = _GenerateDiskTemplate(self,
5998 instance.disk_template,
5999 instance.name, instance.primary_node,
6000 instance.secondary_nodes,
6005 instance.disks.append(new_disk)
6006 info = _GetInstanceInfoText(instance)
6008 logging.info("Creating volume %s for instance %s",
6009 new_disk.iv_name, instance.name)
6010 # Note: this needs to be kept in sync with _CreateDisks
6012 for node in instance.all_nodes:
6013 f_create = node == instance.primary_node
6015 _CreateBlockDev(self, node, instance, new_disk,
6016 f_create, info, f_create)
6017 except errors.OpExecError, err:
6018 self.LogWarning("Failed to create volume %s (%s) on"
6020 new_disk.iv_name, new_disk, node, err)
6021 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6022 (new_disk.size, new_disk.mode)))
6024 # change a given disk
6025 instance.disks[disk_op].mode = disk_dict['mode']
6026 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6028 for nic_op, nic_dict in self.op.nics:
6029 if nic_op == constants.DDM_REMOVE:
6030 # remove the last nic
6031 del instance.nics[-1]
6032 result.append(("nic.%d" % len(instance.nics), "remove"))
6033 elif nic_op == constants.DDM_ADD:
6034 # mac and bridge should be set, by now
6035 mac = nic_dict['mac']
6036 bridge = nic_dict['bridge']
6037 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6039 instance.nics.append(new_nic)
6040 result.append(("nic.%d" % (len(instance.nics) - 1),
6041 "add:mac=%s,ip=%s,bridge=%s" %
6042 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6044 # change a given nic
6045 for key in 'mac', 'ip', 'bridge':
6047 setattr(instance.nics[nic_op], key, nic_dict[key])
6048 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6051 if self.op.hvparams:
6052 instance.hvparams = self.hv_inst
6053 for key, val in self.op.hvparams.iteritems():
6054 result.append(("hv/%s" % key, val))
6057 if self.op.beparams:
6058 instance.beparams = self.be_inst
6059 for key, val in self.op.beparams.iteritems():
6060 result.append(("be/%s" % key, val))
6062 self.cfg.Update(instance)
6067 class LUQueryExports(NoHooksLU):
6068 """Query the exports list
6071 _OP_REQP = ['nodes']
6074 def ExpandNames(self):
6075 self.needed_locks = {}
6076 self.share_locks[locking.LEVEL_NODE] = 1
6077 if not self.op.nodes:
6078 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6080 self.needed_locks[locking.LEVEL_NODE] = \
6081 _GetWantedNodes(self, self.op.nodes)
6083 def CheckPrereq(self):
6084 """Check prerequisites.
6087 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6089 def Exec(self, feedback_fn):
6090 """Compute the list of all the exported system images.
6093 @return: a dictionary with the structure node->(export-list)
6094 where export-list is a list of the instances exported on
6098 rpcresult = self.rpc.call_export_list(self.nodes)
6100 for node in rpcresult:
6101 if rpcresult[node].failed:
6102 result[node] = False
6104 result[node] = rpcresult[node].data
6109 class LUExportInstance(LogicalUnit):
6110 """Export an instance to an image in the cluster.
6113 HPATH = "instance-export"
6114 HTYPE = constants.HTYPE_INSTANCE
6115 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6118 def ExpandNames(self):
6119 self._ExpandAndLockInstance()
6120 # FIXME: lock only instance primary and destination node
6122 # Sad but true, for now we have do lock all nodes, as we don't know where
6123 # the previous export might be, and and in this LU we search for it and
6124 # remove it from its current node. In the future we could fix this by:
6125 # - making a tasklet to search (share-lock all), then create the new one,
6126 # then one to remove, after
6127 # - removing the removal operation altoghether
6128 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6130 def DeclareLocks(self, level):
6131 """Last minute lock declaration."""
6132 # All nodes are locked anyway, so nothing to do here.
6134 def BuildHooksEnv(self):
6137 This will run on the master, primary node and target node.
6141 "EXPORT_NODE": self.op.target_node,
6142 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6144 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6145 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6146 self.op.target_node]
6149 def CheckPrereq(self):
6150 """Check prerequisites.
6152 This checks that the instance and node names are valid.
6155 instance_name = self.op.instance_name
6156 self.instance = self.cfg.GetInstanceInfo(instance_name)
6157 assert self.instance is not None, \
6158 "Cannot retrieve locked instance %s" % self.op.instance_name
6159 _CheckNodeOnline(self, self.instance.primary_node)
6161 self.dst_node = self.cfg.GetNodeInfo(
6162 self.cfg.ExpandNodeName(self.op.target_node))
6164 if self.dst_node is None:
6165 # This is wrong node name, not a non-locked node
6166 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6167 _CheckNodeOnline(self, self.dst_node.name)
6168 _CheckNodeNotDrained(self, self.dst_node.name)
6170 # instance disk type verification
6171 for disk in self.instance.disks:
6172 if disk.dev_type == constants.LD_FILE:
6173 raise errors.OpPrereqError("Export not supported for instances with"
6174 " file-based disks")
6176 def Exec(self, feedback_fn):
6177 """Export an instance to an image in the cluster.
6180 instance = self.instance
6181 dst_node = self.dst_node
6182 src_node = instance.primary_node
6183 if self.op.shutdown:
6184 # shutdown the instance, but not the disks
6185 result = self.rpc.call_instance_shutdown(src_node, instance)
6186 msg = result.RemoteFailMsg()
6188 raise errors.OpExecError("Could not shutdown instance %s on"
6190 (instance.name, src_node, msg))
6192 vgname = self.cfg.GetVGName()
6196 # set the disks ID correctly since call_instance_start needs the
6197 # correct drbd minor to create the symlinks
6198 for disk in instance.disks:
6199 self.cfg.SetDiskID(disk, src_node)
6202 for disk in instance.disks:
6203 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6204 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6205 if new_dev_name.failed or not new_dev_name.data:
6206 self.LogWarning("Could not snapshot block device %s on node %s",
6207 disk.logical_id[1], src_node)
6208 snap_disks.append(False)
6210 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6211 logical_id=(vgname, new_dev_name.data),
6212 physical_id=(vgname, new_dev_name.data),
6213 iv_name=disk.iv_name)
6214 snap_disks.append(new_dev)
6217 if self.op.shutdown and instance.admin_up:
6218 result = self.rpc.call_instance_start(src_node, instance)
6219 msg = result.RemoteFailMsg()
6221 _ShutdownInstanceDisks(self, instance)
6222 raise errors.OpExecError("Could not start instance: %s" % msg)
6224 # TODO: check for size
6226 cluster_name = self.cfg.GetClusterName()
6227 for idx, dev in enumerate(snap_disks):
6229 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6230 instance, cluster_name, idx)
6231 if result.failed or not result.data:
6232 self.LogWarning("Could not export block device %s from node %s to"
6233 " node %s", dev.logical_id[1], src_node,
6235 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6237 self.LogWarning("Could not remove snapshot block device %s from node"
6238 " %s: %s", dev.logical_id[1], src_node, msg)
6240 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6241 if result.failed or not result.data:
6242 self.LogWarning("Could not finalize export for instance %s on node %s",
6243 instance.name, dst_node.name)
6245 nodelist = self.cfg.GetNodeList()
6246 nodelist.remove(dst_node.name)
6248 # on one-node clusters nodelist will be empty after the removal
6249 # if we proceed the backup would be removed because OpQueryExports
6250 # substitutes an empty list with the full cluster node list.
6252 exportlist = self.rpc.call_export_list(nodelist)
6253 for node in exportlist:
6254 if exportlist[node].failed:
6256 if instance.name in exportlist[node].data:
6257 if not self.rpc.call_export_remove(node, instance.name):
6258 self.LogWarning("Could not remove older export for instance %s"
6259 " on node %s", instance.name, node)
6262 class LURemoveExport(NoHooksLU):
6263 """Remove exports related to the named instance.
6266 _OP_REQP = ["instance_name"]
6269 def ExpandNames(self):
6270 self.needed_locks = {}
6271 # We need all nodes to be locked in order for RemoveExport to work, but we
6272 # don't need to lock the instance itself, as nothing will happen to it (and
6273 # we can remove exports also for a removed instance)
6274 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6276 def CheckPrereq(self):
6277 """Check prerequisites.
6281 def Exec(self, feedback_fn):
6282 """Remove any export.
6285 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6286 # If the instance was not found we'll try with the name that was passed in.
6287 # This will only work if it was an FQDN, though.
6289 if not instance_name:
6291 instance_name = self.op.instance_name
6293 exportlist = self.rpc.call_export_list(self.acquired_locks[
6294 locking.LEVEL_NODE])
6296 for node in exportlist:
6297 if exportlist[node].failed:
6298 self.LogWarning("Failed to query node %s, continuing" % node)
6300 if instance_name in exportlist[node].data:
6302 result = self.rpc.call_export_remove(node, instance_name)
6303 if result.failed or not result.data:
6304 logging.error("Could not remove export for instance %s"
6305 " on node %s", instance_name, node)
6307 if fqdn_warn and not found:
6308 feedback_fn("Export not found. If trying to remove an export belonging"
6309 " to a deleted instance please use its Fully Qualified"
6313 class TagsLU(NoHooksLU):
6316 This is an abstract class which is the parent of all the other tags LUs.
6320 def ExpandNames(self):
6321 self.needed_locks = {}
6322 if self.op.kind == constants.TAG_NODE:
6323 name = self.cfg.ExpandNodeName(self.op.name)
6325 raise errors.OpPrereqError("Invalid node name (%s)" %
6328 self.needed_locks[locking.LEVEL_NODE] = name
6329 elif self.op.kind == constants.TAG_INSTANCE:
6330 name = self.cfg.ExpandInstanceName(self.op.name)
6332 raise errors.OpPrereqError("Invalid instance name (%s)" %
6335 self.needed_locks[locking.LEVEL_INSTANCE] = name
6337 def CheckPrereq(self):
6338 """Check prerequisites.
6341 if self.op.kind == constants.TAG_CLUSTER:
6342 self.target = self.cfg.GetClusterInfo()
6343 elif self.op.kind == constants.TAG_NODE:
6344 self.target = self.cfg.GetNodeInfo(self.op.name)
6345 elif self.op.kind == constants.TAG_INSTANCE:
6346 self.target = self.cfg.GetInstanceInfo(self.op.name)
6348 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6352 class LUGetTags(TagsLU):
6353 """Returns the tags of a given object.
6356 _OP_REQP = ["kind", "name"]
6359 def Exec(self, feedback_fn):
6360 """Returns the tag list.
6363 return list(self.target.GetTags())
6366 class LUSearchTags(NoHooksLU):
6367 """Searches the tags for a given pattern.
6370 _OP_REQP = ["pattern"]
6373 def ExpandNames(self):
6374 self.needed_locks = {}
6376 def CheckPrereq(self):
6377 """Check prerequisites.
6379 This checks the pattern passed for validity by compiling it.
6383 self.re = re.compile(self.op.pattern)
6384 except re.error, err:
6385 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6386 (self.op.pattern, err))
6388 def Exec(self, feedback_fn):
6389 """Returns the tag list.
6393 tgts = [("/cluster", cfg.GetClusterInfo())]
6394 ilist = cfg.GetAllInstancesInfo().values()
6395 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6396 nlist = cfg.GetAllNodesInfo().values()
6397 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6399 for path, target in tgts:
6400 for tag in target.GetTags():
6401 if self.re.search(tag):
6402 results.append((path, tag))
6406 class LUAddTags(TagsLU):
6407 """Sets a tag on a given object.
6410 _OP_REQP = ["kind", "name", "tags"]
6413 def CheckPrereq(self):
6414 """Check prerequisites.
6416 This checks the type and length of the tag name and value.
6419 TagsLU.CheckPrereq(self)
6420 for tag in self.op.tags:
6421 objects.TaggableObject.ValidateTag(tag)
6423 def Exec(self, feedback_fn):
6428 for tag in self.op.tags:
6429 self.target.AddTag(tag)
6430 except errors.TagError, err:
6431 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6433 self.cfg.Update(self.target)
6434 except errors.ConfigurationError:
6435 raise errors.OpRetryError("There has been a modification to the"
6436 " config file and the operation has been"
6437 " aborted. Please retry.")
6440 class LUDelTags(TagsLU):
6441 """Delete a list of tags from a given object.
6444 _OP_REQP = ["kind", "name", "tags"]
6447 def CheckPrereq(self):
6448 """Check prerequisites.
6450 This checks that we have the given tag.
6453 TagsLU.CheckPrereq(self)
6454 for tag in self.op.tags:
6455 objects.TaggableObject.ValidateTag(tag)
6456 del_tags = frozenset(self.op.tags)
6457 cur_tags = self.target.GetTags()
6458 if not del_tags <= cur_tags:
6459 diff_tags = del_tags - cur_tags
6460 diff_names = ["'%s'" % tag for tag in diff_tags]
6462 raise errors.OpPrereqError("Tag(s) %s not found" %
6463 (",".join(diff_names)))
6465 def Exec(self, feedback_fn):
6466 """Remove the tag from the object.
6469 for tag in self.op.tags:
6470 self.target.RemoveTag(tag)
6472 self.cfg.Update(self.target)
6473 except errors.ConfigurationError:
6474 raise errors.OpRetryError("There has been a modification to the"
6475 " config file and the operation has been"
6476 " aborted. Please retry.")
6479 class LUTestDelay(NoHooksLU):
6480 """Sleep for a specified amount of time.
6482 This LU sleeps on the master and/or nodes for a specified amount of
6486 _OP_REQP = ["duration", "on_master", "on_nodes"]
6489 def ExpandNames(self):
6490 """Expand names and set required locks.
6492 This expands the node list, if any.
6495 self.needed_locks = {}
6496 if self.op.on_nodes:
6497 # _GetWantedNodes can be used here, but is not always appropriate to use
6498 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6500 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6501 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6503 def CheckPrereq(self):
6504 """Check prerequisites.
6508 def Exec(self, feedback_fn):
6509 """Do the actual sleep.
6512 if self.op.on_master:
6513 if not utils.TestDelay(self.op.duration):
6514 raise errors.OpExecError("Error during master delay test")
6515 if self.op.on_nodes:
6516 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6518 raise errors.OpExecError("Complete failure from rpc call")
6519 for node, node_result in result.items():
6521 if not node_result.data:
6522 raise errors.OpExecError("Failure during rpc call to node %s,"
6523 " result: %s" % (node, node_result.data))
6526 class IAllocator(object):
6527 """IAllocator framework.
6529 An IAllocator instance has three sets of attributes:
6530 - cfg that is needed to query the cluster
6531 - input data (all members of the _KEYS class attribute are required)
6532 - four buffer attributes (in|out_data|text), that represent the
6533 input (to the external script) in text and data structure format,
6534 and the output from it, again in two formats
6535 - the result variables from the script (success, info, nodes) for
6540 "mem_size", "disks", "disk_template",
6541 "os", "tags", "nics", "vcpus", "hypervisor",
6547 def __init__(self, lu, mode, name, **kwargs):
6549 # init buffer variables
6550 self.in_text = self.out_text = self.in_data = self.out_data = None
6551 # init all input fields so that pylint is happy
6554 self.mem_size = self.disks = self.disk_template = None
6555 self.os = self.tags = self.nics = self.vcpus = None
6556 self.hypervisor = None
6557 self.relocate_from = None
6559 self.required_nodes = None
6560 # init result fields
6561 self.success = self.info = self.nodes = None
6562 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6563 keyset = self._ALLO_KEYS
6564 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6565 keyset = self._RELO_KEYS
6567 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6568 " IAllocator" % self.mode)
6570 if key not in keyset:
6571 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6572 " IAllocator" % key)
6573 setattr(self, key, kwargs[key])
6575 if key not in kwargs:
6576 raise errors.ProgrammerError("Missing input parameter '%s' to"
6577 " IAllocator" % key)
6578 self._BuildInputData()
6580 def _ComputeClusterData(self):
6581 """Compute the generic allocator input data.
6583 This is the data that is independent of the actual operation.
6587 cluster_info = cfg.GetClusterInfo()
6590 "version": constants.IALLOCATOR_VERSION,
6591 "cluster_name": cfg.GetClusterName(),
6592 "cluster_tags": list(cluster_info.GetTags()),
6593 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6594 # we don't have job IDs
6596 iinfo = cfg.GetAllInstancesInfo().values()
6597 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6601 node_list = cfg.GetNodeList()
6603 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6604 hypervisor_name = self.hypervisor
6605 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6606 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6608 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6610 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6611 cluster_info.enabled_hypervisors)
6612 for nname, nresult in node_data.items():
6613 # first fill in static (config-based) values
6614 ninfo = cfg.GetNodeInfo(nname)
6616 "tags": list(ninfo.GetTags()),
6617 "primary_ip": ninfo.primary_ip,
6618 "secondary_ip": ninfo.secondary_ip,
6619 "offline": ninfo.offline,
6620 "drained": ninfo.drained,
6621 "master_candidate": ninfo.master_candidate,
6624 if not ninfo.offline:
6626 if not isinstance(nresult.data, dict):
6627 raise errors.OpExecError("Can't get data for node %s" % nname)
6628 remote_info = nresult.data
6629 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6630 'vg_size', 'vg_free', 'cpu_total']:
6631 if attr not in remote_info:
6632 raise errors.OpExecError("Node '%s' didn't return attribute"
6633 " '%s'" % (nname, attr))
6635 remote_info[attr] = int(remote_info[attr])
6636 except ValueError, err:
6637 raise errors.OpExecError("Node '%s' returned invalid value"
6638 " for '%s': %s" % (nname, attr, err))
6639 # compute memory used by primary instances
6640 i_p_mem = i_p_up_mem = 0
6641 for iinfo, beinfo in i_list:
6642 if iinfo.primary_node == nname:
6643 i_p_mem += beinfo[constants.BE_MEMORY]
6644 if iinfo.name not in node_iinfo[nname].data:
6647 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6648 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6649 remote_info['memory_free'] -= max(0, i_mem_diff)
6652 i_p_up_mem += beinfo[constants.BE_MEMORY]
6654 # compute memory used by instances
6656 "total_memory": remote_info['memory_total'],
6657 "reserved_memory": remote_info['memory_dom0'],
6658 "free_memory": remote_info['memory_free'],
6659 "total_disk": remote_info['vg_size'],
6660 "free_disk": remote_info['vg_free'],
6661 "total_cpus": remote_info['cpu_total'],
6662 "i_pri_memory": i_p_mem,
6663 "i_pri_up_memory": i_p_up_mem,
6667 node_results[nname] = pnr
6668 data["nodes"] = node_results
6672 for iinfo, beinfo in i_list:
6673 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6674 for n in iinfo.nics]
6676 "tags": list(iinfo.GetTags()),
6677 "admin_up": iinfo.admin_up,
6678 "vcpus": beinfo[constants.BE_VCPUS],
6679 "memory": beinfo[constants.BE_MEMORY],
6681 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6683 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6684 "disk_template": iinfo.disk_template,
6685 "hypervisor": iinfo.hypervisor,
6687 instance_data[iinfo.name] = pir
6689 data["instances"] = instance_data
6693 def _AddNewInstance(self):
6694 """Add new instance data to allocator structure.
6696 This in combination with _AllocatorGetClusterData will create the
6697 correct structure needed as input for the allocator.
6699 The checks for the completeness of the opcode must have already been
6705 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6707 if self.disk_template in constants.DTS_NET_MIRROR:
6708 self.required_nodes = 2
6710 self.required_nodes = 1
6714 "disk_template": self.disk_template,
6717 "vcpus": self.vcpus,
6718 "memory": self.mem_size,
6719 "disks": self.disks,
6720 "disk_space_total": disk_space,
6722 "required_nodes": self.required_nodes,
6724 data["request"] = request
6726 def _AddRelocateInstance(self):
6727 """Add relocate instance data to allocator structure.
6729 This in combination with _IAllocatorGetClusterData will create the
6730 correct structure needed as input for the allocator.
6732 The checks for the completeness of the opcode must have already been
6736 instance = self.lu.cfg.GetInstanceInfo(self.name)
6737 if instance is None:
6738 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6739 " IAllocator" % self.name)
6741 if instance.disk_template not in constants.DTS_NET_MIRROR:
6742 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6744 if len(instance.secondary_nodes) != 1:
6745 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6747 self.required_nodes = 1
6748 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6749 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6754 "disk_space_total": disk_space,
6755 "required_nodes": self.required_nodes,
6756 "relocate_from": self.relocate_from,
6758 self.in_data["request"] = request
6760 def _BuildInputData(self):
6761 """Build input data structures.
6764 self._ComputeClusterData()
6766 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6767 self._AddNewInstance()
6769 self._AddRelocateInstance()
6771 self.in_text = serializer.Dump(self.in_data)
6773 def Run(self, name, validate=True, call_fn=None):
6774 """Run an instance allocator and return the results.
6778 call_fn = self.lu.rpc.call_iallocator_runner
6781 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6784 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6785 raise errors.OpExecError("Invalid result from master iallocator runner")
6787 rcode, stdout, stderr, fail = result.data
6789 if rcode == constants.IARUN_NOTFOUND:
6790 raise errors.OpExecError("Can't find allocator '%s'" % name)
6791 elif rcode == constants.IARUN_FAILURE:
6792 raise errors.OpExecError("Instance allocator call failed: %s,"
6793 " output: %s" % (fail, stdout+stderr))
6794 self.out_text = stdout
6796 self._ValidateResult()
6798 def _ValidateResult(self):
6799 """Process the allocator results.
6801 This will process and if successful save the result in
6802 self.out_data and the other parameters.
6806 rdict = serializer.Load(self.out_text)
6807 except Exception, err:
6808 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6810 if not isinstance(rdict, dict):
6811 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6813 for key in "success", "info", "nodes":
6814 if key not in rdict:
6815 raise errors.OpExecError("Can't parse iallocator results:"
6816 " missing key '%s'" % key)
6817 setattr(self, key, rdict[key])
6819 if not isinstance(rdict["nodes"], list):
6820 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6822 self.out_data = rdict
6825 class LUTestAllocator(NoHooksLU):
6826 """Run allocator tests.
6828 This LU runs the allocator tests
6831 _OP_REQP = ["direction", "mode", "name"]
6833 def CheckPrereq(self):
6834 """Check prerequisites.
6836 This checks the opcode parameters depending on the director and mode test.
6839 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6840 for attr in ["name", "mem_size", "disks", "disk_template",
6841 "os", "tags", "nics", "vcpus"]:
6842 if not hasattr(self.op, attr):
6843 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6845 iname = self.cfg.ExpandInstanceName(self.op.name)
6846 if iname is not None:
6847 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6849 if not isinstance(self.op.nics, list):
6850 raise errors.OpPrereqError("Invalid parameter 'nics'")
6851 for row in self.op.nics:
6852 if (not isinstance(row, dict) or
6855 "bridge" not in row):
6856 raise errors.OpPrereqError("Invalid contents of the"
6857 " 'nics' parameter")
6858 if not isinstance(self.op.disks, list):
6859 raise errors.OpPrereqError("Invalid parameter 'disks'")
6860 for row in self.op.disks:
6861 if (not isinstance(row, dict) or
6862 "size" not in row or
6863 not isinstance(row["size"], int) or
6864 "mode" not in row or
6865 row["mode"] not in ['r', 'w']):
6866 raise errors.OpPrereqError("Invalid contents of the"
6867 " 'disks' parameter")
6868 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6869 self.op.hypervisor = self.cfg.GetHypervisorType()
6870 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6871 if not hasattr(self.op, "name"):
6872 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6873 fname = self.cfg.ExpandInstanceName(self.op.name)
6875 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6877 self.op.name = fname
6878 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6880 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6883 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6884 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6885 raise errors.OpPrereqError("Missing allocator name")
6886 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6887 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6890 def Exec(self, feedback_fn):
6891 """Run the allocator test.
6894 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6895 ial = IAllocator(self,
6898 mem_size=self.op.mem_size,
6899 disks=self.op.disks,
6900 disk_template=self.op.disk_template,
6904 vcpus=self.op.vcpus,
6905 hypervisor=self.op.hypervisor,
6908 ial = IAllocator(self,
6911 relocate_from=list(self.relocate_from),
6914 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6915 result = ial.in_text
6917 ial.Run(self.op.allocator, validate=False)
6918 result = ial.out_text