4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the nodes is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445 memory, vcpus, nics):
446 """Builds instance related env variables for hooks
448 This builds the hook environment from individual variables.
451 @param name: the name of the instance
452 @type primary_node: string
453 @param primary_node: the name of the instance's primary node
454 @type secondary_nodes: list
455 @param secondary_nodes: list of secondary nodes as strings
456 @type os_type: string
457 @param os_type: the name of the instance's OS
459 @param status: the desired status of the instances
461 @param memory: the memory size of the instance
463 @param vcpus: the count of VCPUs the instance has
465 @param nics: list of tuples (ip, bridge, mac) representing
466 the NICs the instance has
468 @return: the hook environment for this instance
473 "INSTANCE_NAME": name,
474 "INSTANCE_PRIMARY": primary_node,
475 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476 "INSTANCE_OS_TYPE": os_type,
477 "INSTANCE_STATUS": status,
478 "INSTANCE_MEMORY": memory,
479 "INSTANCE_VCPUS": vcpus,
483 nic_count = len(nics)
484 for idx, (ip, bridge, mac) in enumerate(nics):
487 env["INSTANCE_NIC%d_IP" % idx] = ip
488 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
493 env["INSTANCE_NIC_COUNT"] = nic_count
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499 """Builds instance related env variables for hooks from an object.
501 @type lu: L{LogicalUnit}
502 @param lu: the logical unit on whose behalf we execute
503 @type instance: L{objects.Instance}
504 @param instance: the instance for which we should build the
507 @param override: dictionary with key/values that will override
510 @return: the hook environment dictionary
513 bep = lu.cfg.GetClusterInfo().FillBE(instance)
515 'name': instance.name,
516 'primary_node': instance.primary_node,
517 'secondary_nodes': instance.secondary_nodes,
518 'os_type': instance.os,
519 'status': instance.os,
520 'memory': bep[constants.BE_MEMORY],
521 'vcpus': bep[constants.BE_VCPUS],
522 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
525 args.update(override)
526 return _BuildInstanceHookEnv(**args)
529 def _AdjustCandidatePool(lu):
530 """Adjust the candidate pool after node operations.
533 mod_list = lu.cfg.MaintainCandidatePool()
535 lu.LogInfo("Promoted nodes to master candidate role: %s",
536 ", ".join(node.name for node in mod_list))
537 for name in mod_list:
538 lu.context.ReaddNode(name)
539 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
541 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
545 def _CheckInstanceBridgesExist(lu, instance):
546 """Check that the brigdes needed by an instance exist.
549 # check bridges existance
550 brlist = [nic.bridge for nic in instance.nics]
551 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
554 raise errors.OpPrereqError("One or more target bridges %s does not"
555 " exist on destination node '%s'" %
556 (brlist, instance.primary_node))
559 class LUDestroyCluster(NoHooksLU):
560 """Logical unit for destroying the cluster.
565 def CheckPrereq(self):
566 """Check prerequisites.
568 This checks whether the cluster is empty.
570 Any errors are signalled by raising errors.OpPrereqError.
573 master = self.cfg.GetMasterNode()
575 nodelist = self.cfg.GetNodeList()
576 if len(nodelist) != 1 or nodelist[0] != master:
577 raise errors.OpPrereqError("There are still %d node(s) in"
578 " this cluster." % (len(nodelist) - 1))
579 instancelist = self.cfg.GetInstanceList()
581 raise errors.OpPrereqError("There are still %d instance(s) in"
582 " this cluster." % len(instancelist))
584 def Exec(self, feedback_fn):
585 """Destroys the cluster.
588 master = self.cfg.GetMasterNode()
589 result = self.rpc.call_node_stop_master(master, False)
592 raise errors.OpExecError("Could not disable the master role")
593 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594 utils.CreateBackup(priv_key)
595 utils.CreateBackup(pub_key)
599 class LUVerifyCluster(LogicalUnit):
600 """Verifies the cluster status.
603 HPATH = "cluster-verify"
604 HTYPE = constants.HTYPE_CLUSTER
605 _OP_REQP = ["skip_checks"]
608 def ExpandNames(self):
609 self.needed_locks = {
610 locking.LEVEL_NODE: locking.ALL_SET,
611 locking.LEVEL_INSTANCE: locking.ALL_SET,
613 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
615 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616 node_result, feedback_fn, master_files):
617 """Run multiple tests against a node.
621 - compares ganeti version
622 - checks vg existance and size > 20G
623 - checks config file checksum
624 - checks ssh to other nodes
626 @type nodeinfo: L{objects.Node}
627 @param nodeinfo: the node to check
628 @param file_list: required list of files
629 @param local_cksum: dictionary of local files and their checksums
630 @param node_result: the results from the node
631 @param feedback_fn: function used to accumulate results
632 @param master_files: list of files that only masters should have
637 # main result, node_result should be a non-empty dict
638 if not node_result or not isinstance(node_result, dict):
639 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
642 # compares ganeti version
643 local_version = constants.PROTOCOL_VERSION
644 remote_version = node_result.get('version', None)
645 if not remote_version:
646 feedback_fn(" - ERROR: connection to %s failed" % (node))
649 if local_version != remote_version:
650 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
651 (local_version, node, remote_version))
654 # checks vg existance and size > 20G
657 vglist = node_result.get(constants.NV_VGLIST, None)
659 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
663 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664 constants.MIN_VG_SIZE)
666 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
669 # checks config file checksum
671 remote_cksum = node_result.get(constants.NV_FILELIST, None)
672 if not isinstance(remote_cksum, dict):
674 feedback_fn(" - ERROR: node hasn't returned file checksum data")
676 for file_name in file_list:
677 node_is_mc = nodeinfo.master_candidate
678 must_have_file = file_name not in master_files
679 if file_name not in remote_cksum:
680 if node_is_mc or must_have_file:
682 feedback_fn(" - ERROR: file '%s' missing" % file_name)
683 elif remote_cksum[file_name] != local_cksum[file_name]:
684 if node_is_mc or must_have_file:
686 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
688 # not candidate and this is not a must-have file
690 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
693 # all good, except non-master/non-must have combination
694 if not node_is_mc and not must_have_file:
695 feedback_fn(" - ERROR: file '%s' should not exist on non master"
696 " candidates" % file_name)
700 if constants.NV_NODELIST not in node_result:
702 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
704 if node_result[constants.NV_NODELIST]:
706 for node in node_result[constants.NV_NODELIST]:
707 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
708 (node, node_result[constants.NV_NODELIST][node]))
710 if constants.NV_NODENETTEST not in node_result:
712 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
714 if node_result[constants.NV_NODENETTEST]:
716 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
718 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
719 (node, node_result[constants.NV_NODENETTEST][node]))
721 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722 if isinstance(hyp_result, dict):
723 for hv_name, hv_result in hyp_result.iteritems():
724 if hv_result is not None:
725 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
726 (hv_name, hv_result))
729 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730 node_instance, feedback_fn, n_offline):
731 """Verify an instance.
733 This function checks to see if the required block devices are
734 available on the instance's node.
739 node_current = instanceconfig.primary_node
742 instanceconfig.MapLVsByNode(node_vol_should)
744 for node in node_vol_should:
745 if node in n_offline:
746 # ignore missing volumes on offline nodes
748 for volume in node_vol_should[node]:
749 if node not in node_vol_is or volume not in node_vol_is[node]:
750 feedback_fn(" - ERROR: volume %s missing on node %s" %
754 if not instanceconfig.status == 'down':
755 if ((node_current not in node_instance or
756 not instance in node_instance[node_current]) and
757 node_current not in n_offline):
758 feedback_fn(" - ERROR: instance %s not running on node %s" %
759 (instance, node_current))
762 for node in node_instance:
763 if (not node == node_current):
764 if instance in node_instance[node]:
765 feedback_fn(" - ERROR: instance %s should not run on node %s" %
771 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772 """Verify if there are any unknown volumes in the cluster.
774 The .os, .swap and backup volumes are ignored. All other volumes are
780 for node in node_vol_is:
781 for volume in node_vol_is[node]:
782 if node not in node_vol_should or volume not in node_vol_should[node]:
783 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
788 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789 """Verify the list of running instances.
791 This checks what instances are running but unknown to the cluster.
795 for node in node_instance:
796 for runninginstance in node_instance[node]:
797 if runninginstance not in instancelist:
798 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
799 (runninginstance, node))
803 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804 """Verify N+1 Memory Resilience.
806 Check that if one single node dies we can still start all the instances it
812 for node, nodeinfo in node_info.iteritems():
813 # This code checks that every node which is now listed as secondary has
814 # enough memory to host all instances it is supposed to should a single
815 # other node in the cluster fail.
816 # FIXME: not ready for failover to an arbitrary node
817 # FIXME: does not support file-backed instances
818 # WARNING: we currently take into account down instances as well as up
819 # ones, considering that even if they're down someone might want to start
820 # them even in the event of a node failure.
821 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
823 for instance in instances:
824 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825 if bep[constants.BE_AUTO_BALANCE]:
826 needed_mem += bep[constants.BE_MEMORY]
827 if nodeinfo['mfree'] < needed_mem:
828 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
829 " failovers should node %s fail" % (node, prinode))
833 def CheckPrereq(self):
834 """Check prerequisites.
836 Transform the list of checks we're going to skip into a set and check that
837 all its members are valid.
840 self.skip_set = frozenset(self.op.skip_checks)
841 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842 raise errors.OpPrereqError("Invalid checks to be skipped specified")
844 def BuildHooksEnv(self):
847 Cluster-Verify hooks just rone in the post phase and their failure makes
848 the output be logged in the verify output and the verification to fail.
851 all_nodes = self.cfg.GetNodeList()
852 # TODO: populate the environment with useful information for verify hooks
854 return env, [], all_nodes
856 def Exec(self, feedback_fn):
857 """Verify integrity of cluster, performing various test on nodes.
861 feedback_fn("* Verifying global settings")
862 for msg in self.cfg.VerifyConfig():
863 feedback_fn(" - ERROR: %s" % msg)
865 vg_name = self.cfg.GetVGName()
866 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867 nodelist = utils.NiceSort(self.cfg.GetNodeList())
868 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870 i_non_redundant = [] # Non redundant instances
871 i_non_a_balanced = [] # Non auto-balanced instances
872 n_offline = [] # List of offline nodes
878 # FIXME: verify OS list
880 master_files = [constants.CLUSTER_CONF_FILE]
882 file_names = ssconf.SimpleStore().GetFileList()
883 file_names.append(constants.SSL_CERT_FILE)
884 file_names.extend(master_files)
886 local_checksums = utils.FingerprintFiles(file_names)
888 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
889 node_verify_param = {
890 constants.NV_FILELIST: file_names,
891 constants.NV_NODELIST: nodelist,
892 constants.NV_HYPERVISOR: hypervisors,
893 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
894 node.secondary_ip) for node in nodeinfo],
895 constants.NV_LVLIST: vg_name,
896 constants.NV_INSTANCELIST: hypervisors,
897 constants.NV_VGLIST: None,
898 constants.NV_VERSION: None,
899 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
901 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
902 self.cfg.GetClusterName())
904 cluster = self.cfg.GetClusterInfo()
905 master_node = self.cfg.GetMasterNode()
906 for node_i in nodeinfo:
908 nresult = all_nvinfo[node].data
911 feedback_fn("* Skipping offline node %s" % (node,))
912 n_offline.append(node)
915 if node == master_node:
917 elif node_i.master_candidate:
918 ntype = "master candidate"
921 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
923 if all_nvinfo[node].failed or not isinstance(nresult, dict):
924 feedback_fn(" - ERROR: connection to %s failed" % (node,))
928 result = self._VerifyNode(node_i, file_names, local_checksums,
929 nresult, feedback_fn, master_files)
932 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
933 if isinstance(lvdata, basestring):
934 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
935 (node, lvdata.encode('string_escape')))
937 node_volume[node] = {}
938 elif not isinstance(lvdata, dict):
939 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
943 node_volume[node] = lvdata
946 idata = nresult.get(constants.NV_INSTANCELIST, None)
947 if not isinstance(idata, list):
948 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
953 node_instance[node] = idata
956 nodeinfo = nresult.get(constants.NV_HVINFO, None)
957 if not isinstance(nodeinfo, dict):
958 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
964 "mfree": int(nodeinfo['memory_free']),
965 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
968 # dictionary holding all instances this node is secondary for,
969 # grouped by their primary node. Each key is a cluster node, and each
970 # value is a list of instances which have the key as primary and the
971 # current node as secondary. this is handy to calculate N+1 memory
972 # availability if you can only failover from a primary to its
974 "sinst-by-pnode": {},
977 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
983 for instance in instancelist:
984 feedback_fn("* Verifying instance %s" % instance)
985 inst_config = self.cfg.GetInstanceInfo(instance)
986 result = self._VerifyInstance(instance, inst_config, node_volume,
987 node_instance, feedback_fn, n_offline)
989 inst_nodes_offline = []
991 inst_config.MapLVsByNode(node_vol_should)
993 instance_cfg[instance] = inst_config
995 pnode = inst_config.primary_node
996 if pnode in node_info:
997 node_info[pnode]['pinst'].append(instance)
998 elif pnode not in n_offline:
999 feedback_fn(" - ERROR: instance %s, connection to primary node"
1000 " %s failed" % (instance, pnode))
1003 if pnode in n_offline:
1004 inst_nodes_offline.append(pnode)
1006 # If the instance is non-redundant we cannot survive losing its primary
1007 # node, so we are not N+1 compliant. On the other hand we have no disk
1008 # templates with more than one secondary so that situation is not well
1010 # FIXME: does not support file-backed instances
1011 if len(inst_config.secondary_nodes) == 0:
1012 i_non_redundant.append(instance)
1013 elif len(inst_config.secondary_nodes) > 1:
1014 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1017 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1018 i_non_a_balanced.append(instance)
1020 for snode in inst_config.secondary_nodes:
1021 if snode in node_info:
1022 node_info[snode]['sinst'].append(instance)
1023 if pnode not in node_info[snode]['sinst-by-pnode']:
1024 node_info[snode]['sinst-by-pnode'][pnode] = []
1025 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1026 elif snode not in n_offline:
1027 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1028 " %s failed" % (instance, snode))
1030 if snode in n_offline:
1031 inst_nodes_offline.append(snode)
1033 if inst_nodes_offline:
1034 # warn that the instance lives on offline nodes, and set bad=True
1035 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1036 ", ".join(inst_nodes_offline))
1039 feedback_fn("* Verifying orphan volumes")
1040 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1044 feedback_fn("* Verifying remaining instances")
1045 result = self._VerifyOrphanInstances(instancelist, node_instance,
1049 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1050 feedback_fn("* Verifying N+1 Memory redundancy")
1051 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1054 feedback_fn("* Other Notes")
1056 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1057 % len(i_non_redundant))
1059 if i_non_a_balanced:
1060 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1061 % len(i_non_a_balanced))
1064 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1068 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1069 """Analize the post-hooks' result
1071 This method analyses the hook result, handles it, and sends some
1072 nicely-formatted feedback back to the user.
1074 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1075 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1076 @param hooks_results: the results of the multi-node hooks rpc call
1077 @param feedback_fn: function used send feedback back to the caller
1078 @param lu_result: previous Exec result
1079 @return: the new Exec result, based on the previous result
1083 # We only really run POST phase hooks, and are only interested in
1085 if phase == constants.HOOKS_PHASE_POST:
1086 # Used to change hooks' output to proper indentation
1087 indent_re = re.compile('^', re.M)
1088 feedback_fn("* Hooks Results")
1089 if not hooks_results:
1090 feedback_fn(" - ERROR: general communication failure")
1093 for node_name in hooks_results:
1094 show_node_header = True
1095 res = hooks_results[node_name]
1096 if res.failed or res.data is False or not isinstance(res.data, list):
1098 # no need to warn or set fail return value
1100 feedback_fn(" Communication failure in hooks execution")
1103 for script, hkr, output in res.data:
1104 if hkr == constants.HKR_FAIL:
1105 # The node header is only shown once, if there are
1106 # failing hooks on that node
1107 if show_node_header:
1108 feedback_fn(" Node %s:" % node_name)
1109 show_node_header = False
1110 feedback_fn(" ERROR: Script %s failed, output:" % script)
1111 output = indent_re.sub(' ', output)
1112 feedback_fn("%s" % output)
1118 class LUVerifyDisks(NoHooksLU):
1119 """Verifies the cluster disks status.
1125 def ExpandNames(self):
1126 self.needed_locks = {
1127 locking.LEVEL_NODE: locking.ALL_SET,
1128 locking.LEVEL_INSTANCE: locking.ALL_SET,
1130 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1132 def CheckPrereq(self):
1133 """Check prerequisites.
1135 This has no prerequisites.
1140 def Exec(self, feedback_fn):
1141 """Verify integrity of cluster disks.
1144 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1146 vg_name = self.cfg.GetVGName()
1147 nodes = utils.NiceSort(self.cfg.GetNodeList())
1148 instances = [self.cfg.GetInstanceInfo(name)
1149 for name in self.cfg.GetInstanceList()]
1152 for inst in instances:
1154 if (inst.status != "up" or
1155 inst.disk_template not in constants.DTS_NET_MIRROR):
1157 inst.MapLVsByNode(inst_lvs)
1158 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1159 for node, vol_list in inst_lvs.iteritems():
1160 for vol in vol_list:
1161 nv_dict[(node, vol)] = inst
1166 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1171 lvs = node_lvs[node]
1174 self.LogWarning("Connection to node %s failed: %s" %
1178 if isinstance(lvs, basestring):
1179 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1180 res_nlvm[node] = lvs
1181 elif not isinstance(lvs, dict):
1182 logging.warning("Connection to node %s failed or invalid data"
1184 res_nodes.append(node)
1187 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1188 inst = nv_dict.pop((node, lv_name), None)
1189 if (not lv_online and inst is not None
1190 and inst.name not in res_instances):
1191 res_instances.append(inst.name)
1193 # any leftover items in nv_dict are missing LVs, let's arrange the
1195 for key, inst in nv_dict.iteritems():
1196 if inst.name not in res_missing:
1197 res_missing[inst.name] = []
1198 res_missing[inst.name].append(key)
1203 class LURenameCluster(LogicalUnit):
1204 """Rename the cluster.
1207 HPATH = "cluster-rename"
1208 HTYPE = constants.HTYPE_CLUSTER
1211 def BuildHooksEnv(self):
1216 "OP_TARGET": self.cfg.GetClusterName(),
1217 "NEW_NAME": self.op.name,
1219 mn = self.cfg.GetMasterNode()
1220 return env, [mn], [mn]
1222 def CheckPrereq(self):
1223 """Verify that the passed name is a valid one.
1226 hostname = utils.HostInfo(self.op.name)
1228 new_name = hostname.name
1229 self.ip = new_ip = hostname.ip
1230 old_name = self.cfg.GetClusterName()
1231 old_ip = self.cfg.GetMasterIP()
1232 if new_name == old_name and new_ip == old_ip:
1233 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1234 " cluster has changed")
1235 if new_ip != old_ip:
1236 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1237 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1238 " reachable on the network. Aborting." %
1241 self.op.name = new_name
1243 def Exec(self, feedback_fn):
1244 """Rename the cluster.
1247 clustername = self.op.name
1250 # shutdown the master IP
1251 master = self.cfg.GetMasterNode()
1252 result = self.rpc.call_node_stop_master(master, False)
1253 if result.failed or not result.data:
1254 raise errors.OpExecError("Could not disable the master role")
1257 cluster = self.cfg.GetClusterInfo()
1258 cluster.cluster_name = clustername
1259 cluster.master_ip = ip
1260 self.cfg.Update(cluster)
1262 # update the known hosts file
1263 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1264 node_list = self.cfg.GetNodeList()
1266 node_list.remove(master)
1269 result = self.rpc.call_upload_file(node_list,
1270 constants.SSH_KNOWN_HOSTS_FILE)
1271 for to_node, to_result in result.iteritems():
1272 if to_result.failed or not to_result.data:
1273 logging.error("Copy of file %s to node %s failed",
1274 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1277 result = self.rpc.call_node_start_master(master, False)
1278 if result.failed or not result.data:
1279 self.LogWarning("Could not re-enable the master role on"
1280 " the master, please restart manually.")
1283 def _RecursiveCheckIfLVMBased(disk):
1284 """Check if the given disk or its children are lvm-based.
1286 @type disk: L{objects.Disk}
1287 @param disk: the disk to check
1289 @return: boolean indicating whether a LD_LV dev_type was found or not
1293 for chdisk in disk.children:
1294 if _RecursiveCheckIfLVMBased(chdisk):
1296 return disk.dev_type == constants.LD_LV
1299 class LUSetClusterParams(LogicalUnit):
1300 """Change the parameters of the cluster.
1303 HPATH = "cluster-modify"
1304 HTYPE = constants.HTYPE_CLUSTER
1308 def CheckParameters(self):
1312 if not hasattr(self.op, "candidate_pool_size"):
1313 self.op.candidate_pool_size = None
1314 if self.op.candidate_pool_size is not None:
1316 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1317 except ValueError, err:
1318 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1320 if self.op.candidate_pool_size < 1:
1321 raise errors.OpPrereqError("At least one master candidate needed")
1323 def ExpandNames(self):
1324 # FIXME: in the future maybe other cluster params won't require checking on
1325 # all nodes to be modified.
1326 self.needed_locks = {
1327 locking.LEVEL_NODE: locking.ALL_SET,
1329 self.share_locks[locking.LEVEL_NODE] = 1
1331 def BuildHooksEnv(self):
1336 "OP_TARGET": self.cfg.GetClusterName(),
1337 "NEW_VG_NAME": self.op.vg_name,
1339 mn = self.cfg.GetMasterNode()
1340 return env, [mn], [mn]
1342 def CheckPrereq(self):
1343 """Check prerequisites.
1345 This checks whether the given params don't conflict and
1346 if the given volume group is valid.
1349 # FIXME: This only works because there is only one parameter that can be
1350 # changed or removed.
1351 if self.op.vg_name is not None and not self.op.vg_name:
1352 instances = self.cfg.GetAllInstancesInfo().values()
1353 for inst in instances:
1354 for disk in inst.disks:
1355 if _RecursiveCheckIfLVMBased(disk):
1356 raise errors.OpPrereqError("Cannot disable lvm storage while"
1357 " lvm-based instances exist")
1359 node_list = self.acquired_locks[locking.LEVEL_NODE]
1361 # if vg_name not None, checks given volume group on all nodes
1363 vglist = self.rpc.call_vg_list(node_list)
1364 for node in node_list:
1365 if vglist[node].failed:
1366 # ignoring down node
1367 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1369 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1371 constants.MIN_VG_SIZE)
1373 raise errors.OpPrereqError("Error on node '%s': %s" %
1376 self.cluster = cluster = self.cfg.GetClusterInfo()
1377 # validate beparams changes
1378 if self.op.beparams:
1379 utils.CheckBEParams(self.op.beparams)
1380 self.new_beparams = cluster.FillDict(
1381 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1383 # hypervisor list/parameters
1384 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1385 if self.op.hvparams:
1386 if not isinstance(self.op.hvparams, dict):
1387 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1388 for hv_name, hv_dict in self.op.hvparams.items():
1389 if hv_name not in self.new_hvparams:
1390 self.new_hvparams[hv_name] = hv_dict
1392 self.new_hvparams[hv_name].update(hv_dict)
1394 if self.op.enabled_hypervisors is not None:
1395 self.hv_list = self.op.enabled_hypervisors
1397 self.hv_list = cluster.enabled_hypervisors
1399 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1400 # either the enabled list has changed, or the parameters have, validate
1401 for hv_name, hv_params in self.new_hvparams.items():
1402 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1403 (self.op.enabled_hypervisors and
1404 hv_name in self.op.enabled_hypervisors)):
1405 # either this is a new hypervisor, or its parameters have changed
1406 hv_class = hypervisor.GetHypervisor(hv_name)
1407 hv_class.CheckParameterSyntax(hv_params)
1408 _CheckHVParams(self, node_list, hv_name, hv_params)
1410 def Exec(self, feedback_fn):
1411 """Change the parameters of the cluster.
1414 if self.op.vg_name is not None:
1415 if self.op.vg_name != self.cfg.GetVGName():
1416 self.cfg.SetVGName(self.op.vg_name)
1418 feedback_fn("Cluster LVM configuration already in desired"
1419 " state, not changing")
1420 if self.op.hvparams:
1421 self.cluster.hvparams = self.new_hvparams
1422 if self.op.enabled_hypervisors is not None:
1423 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1424 if self.op.beparams:
1425 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1426 if self.op.candidate_pool_size is not None:
1427 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1429 self.cfg.Update(self.cluster)
1431 # we want to update nodes after the cluster so that if any errors
1432 # happen, we have recorded and saved the cluster info
1433 if self.op.candidate_pool_size is not None:
1434 _AdjustCandidatePool(self)
1437 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1438 """Sleep and poll for an instance's disk to sync.
1441 if not instance.disks:
1445 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1447 node = instance.primary_node
1449 for dev in instance.disks:
1450 lu.cfg.SetDiskID(dev, node)
1456 cumul_degraded = False
1457 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1458 if rstats.failed or not rstats.data:
1459 lu.LogWarning("Can't get any data from node %s", node)
1462 raise errors.RemoteError("Can't contact node %s for mirror data,"
1463 " aborting." % node)
1466 rstats = rstats.data
1468 for i in range(len(rstats)):
1471 lu.LogWarning("Can't compute data for node %s/%s",
1472 node, instance.disks[i].iv_name)
1474 # we ignore the ldisk parameter
1475 perc_done, est_time, is_degraded, _ = mstat
1476 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1477 if perc_done is not None:
1479 if est_time is not None:
1480 rem_time = "%d estimated seconds remaining" % est_time
1483 rem_time = "no time estimate"
1484 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1485 (instance.disks[i].iv_name, perc_done, rem_time))
1489 time.sleep(min(60, max_time))
1492 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1493 return not cumul_degraded
1496 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1497 """Check that mirrors are not degraded.
1499 The ldisk parameter, if True, will change the test from the
1500 is_degraded attribute (which represents overall non-ok status for
1501 the device(s)) to the ldisk (representing the local storage status).
1504 lu.cfg.SetDiskID(dev, node)
1511 if on_primary or dev.AssembleOnSecondary():
1512 rstats = lu.rpc.call_blockdev_find(node, dev)
1513 if rstats.failed or not rstats.data:
1514 logging.warning("Node %s: disk degraded, not found or node down", node)
1517 result = result and (not rstats.data[idx])
1519 for child in dev.children:
1520 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1525 class LUDiagnoseOS(NoHooksLU):
1526 """Logical unit for OS diagnose/query.
1529 _OP_REQP = ["output_fields", "names"]
1531 _FIELDS_STATIC = utils.FieldSet()
1532 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1534 def ExpandNames(self):
1536 raise errors.OpPrereqError("Selective OS query not supported")
1538 _CheckOutputFields(static=self._FIELDS_STATIC,
1539 dynamic=self._FIELDS_DYNAMIC,
1540 selected=self.op.output_fields)
1542 # Lock all nodes, in shared mode
1543 self.needed_locks = {}
1544 self.share_locks[locking.LEVEL_NODE] = 1
1545 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1547 def CheckPrereq(self):
1548 """Check prerequisites.
1553 def _DiagnoseByOS(node_list, rlist):
1554 """Remaps a per-node return list into an a per-os per-node dictionary
1556 @param node_list: a list with the names of all nodes
1557 @param rlist: a map with node names as keys and OS objects as values
1560 @returns: a dictionary with osnames as keys and as value another map, with
1561 nodes as keys and list of OS objects as values, eg::
1563 {"debian-etch": {"node1": [<object>,...],
1564 "node2": [<object>,]}
1569 for node_name, nr in rlist.iteritems():
1570 if nr.failed or not nr.data:
1572 for os_obj in nr.data:
1573 if os_obj.name not in all_os:
1574 # build a list of nodes for this os containing empty lists
1575 # for each node in node_list
1576 all_os[os_obj.name] = {}
1577 for nname in node_list:
1578 all_os[os_obj.name][nname] = []
1579 all_os[os_obj.name][node_name].append(os_obj)
1582 def Exec(self, feedback_fn):
1583 """Compute the list of OSes.
1586 node_list = self.acquired_locks[locking.LEVEL_NODE]
1587 node_data = self.rpc.call_os_diagnose(node_list)
1588 if node_data == False:
1589 raise errors.OpExecError("Can't gather the list of OSes")
1590 pol = self._DiagnoseByOS(node_list, node_data)
1592 for os_name, os_data in pol.iteritems():
1594 for field in self.op.output_fields:
1597 elif field == "valid":
1598 val = utils.all([osl and osl[0] for osl in os_data.values()])
1599 elif field == "node_status":
1601 for node_name, nos_list in os_data.iteritems():
1602 val[node_name] = [(v.status, v.path) for v in nos_list]
1604 raise errors.ParameterError(field)
1611 class LURemoveNode(LogicalUnit):
1612 """Logical unit for removing a node.
1615 HPATH = "node-remove"
1616 HTYPE = constants.HTYPE_NODE
1617 _OP_REQP = ["node_name"]
1619 def BuildHooksEnv(self):
1622 This doesn't run on the target node in the pre phase as a failed
1623 node would then be impossible to remove.
1627 "OP_TARGET": self.op.node_name,
1628 "NODE_NAME": self.op.node_name,
1630 all_nodes = self.cfg.GetNodeList()
1631 all_nodes.remove(self.op.node_name)
1632 return env, all_nodes, all_nodes
1634 def CheckPrereq(self):
1635 """Check prerequisites.
1638 - the node exists in the configuration
1639 - it does not have primary or secondary instances
1640 - it's not the master
1642 Any errors are signalled by raising errors.OpPrereqError.
1645 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1647 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1649 instance_list = self.cfg.GetInstanceList()
1651 masternode = self.cfg.GetMasterNode()
1652 if node.name == masternode:
1653 raise errors.OpPrereqError("Node is the master node,"
1654 " you need to failover first.")
1656 for instance_name in instance_list:
1657 instance = self.cfg.GetInstanceInfo(instance_name)
1658 if node.name == instance.primary_node:
1659 raise errors.OpPrereqError("Instance %s still running on the node,"
1660 " please remove first." % instance_name)
1661 if node.name in instance.secondary_nodes:
1662 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1663 " please remove first." % instance_name)
1664 self.op.node_name = node.name
1667 def Exec(self, feedback_fn):
1668 """Removes the node from the cluster.
1672 logging.info("Stopping the node daemon and removing configs from node %s",
1675 self.context.RemoveNode(node.name)
1677 self.rpc.call_node_leave_cluster(node.name)
1679 # Promote nodes to master candidate as needed
1680 _AdjustCandidatePool(self)
1683 class LUQueryNodes(NoHooksLU):
1684 """Logical unit for querying nodes.
1687 _OP_REQP = ["output_fields", "names"]
1689 _FIELDS_DYNAMIC = utils.FieldSet(
1691 "mtotal", "mnode", "mfree",
1696 _FIELDS_STATIC = utils.FieldSet(
1697 "name", "pinst_cnt", "sinst_cnt",
1698 "pinst_list", "sinst_list",
1699 "pip", "sip", "tags",
1706 def ExpandNames(self):
1707 _CheckOutputFields(static=self._FIELDS_STATIC,
1708 dynamic=self._FIELDS_DYNAMIC,
1709 selected=self.op.output_fields)
1711 self.needed_locks = {}
1712 self.share_locks[locking.LEVEL_NODE] = 1
1715 self.wanted = _GetWantedNodes(self, self.op.names)
1717 self.wanted = locking.ALL_SET
1719 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1721 # if we don't request only static fields, we need to lock the nodes
1722 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1725 def CheckPrereq(self):
1726 """Check prerequisites.
1729 # The validation of the node list is done in the _GetWantedNodes,
1730 # if non empty, and if empty, there's no validation to do
1733 def Exec(self, feedback_fn):
1734 """Computes the list of nodes and their attributes.
1737 all_info = self.cfg.GetAllNodesInfo()
1739 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1740 elif self.wanted != locking.ALL_SET:
1741 nodenames = self.wanted
1742 missing = set(nodenames).difference(all_info.keys())
1744 raise errors.OpExecError(
1745 "Some nodes were removed before retrieving their data: %s" % missing)
1747 nodenames = all_info.keys()
1749 nodenames = utils.NiceSort(nodenames)
1750 nodelist = [all_info[name] for name in nodenames]
1752 # begin data gathering
1756 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1757 self.cfg.GetHypervisorType())
1758 for name in nodenames:
1759 nodeinfo = node_data[name]
1760 if not nodeinfo.failed and nodeinfo.data:
1761 nodeinfo = nodeinfo.data
1762 fn = utils.TryConvert
1764 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1765 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1766 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1767 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1768 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1769 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1770 "bootid": nodeinfo.get('bootid', None),
1773 live_data[name] = {}
1775 live_data = dict.fromkeys(nodenames, {})
1777 node_to_primary = dict([(name, set()) for name in nodenames])
1778 node_to_secondary = dict([(name, set()) for name in nodenames])
1780 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1781 "sinst_cnt", "sinst_list"))
1782 if inst_fields & frozenset(self.op.output_fields):
1783 instancelist = self.cfg.GetInstanceList()
1785 for instance_name in instancelist:
1786 inst = self.cfg.GetInstanceInfo(instance_name)
1787 if inst.primary_node in node_to_primary:
1788 node_to_primary[inst.primary_node].add(inst.name)
1789 for secnode in inst.secondary_nodes:
1790 if secnode in node_to_secondary:
1791 node_to_secondary[secnode].add(inst.name)
1793 master_node = self.cfg.GetMasterNode()
1795 # end data gathering
1798 for node in nodelist:
1800 for field in self.op.output_fields:
1803 elif field == "pinst_list":
1804 val = list(node_to_primary[node.name])
1805 elif field == "sinst_list":
1806 val = list(node_to_secondary[node.name])
1807 elif field == "pinst_cnt":
1808 val = len(node_to_primary[node.name])
1809 elif field == "sinst_cnt":
1810 val = len(node_to_secondary[node.name])
1811 elif field == "pip":
1812 val = node.primary_ip
1813 elif field == "sip":
1814 val = node.secondary_ip
1815 elif field == "tags":
1816 val = list(node.GetTags())
1817 elif field == "serial_no":
1818 val = node.serial_no
1819 elif field == "master_candidate":
1820 val = node.master_candidate
1821 elif field == "master":
1822 val = node.name == master_node
1823 elif field == "offline":
1825 elif self._FIELDS_DYNAMIC.Matches(field):
1826 val = live_data[node.name].get(field, None)
1828 raise errors.ParameterError(field)
1829 node_output.append(val)
1830 output.append(node_output)
1835 class LUQueryNodeVolumes(NoHooksLU):
1836 """Logical unit for getting volumes on node(s).
1839 _OP_REQP = ["nodes", "output_fields"]
1841 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1842 _FIELDS_STATIC = utils.FieldSet("node")
1844 def ExpandNames(self):
1845 _CheckOutputFields(static=self._FIELDS_STATIC,
1846 dynamic=self._FIELDS_DYNAMIC,
1847 selected=self.op.output_fields)
1849 self.needed_locks = {}
1850 self.share_locks[locking.LEVEL_NODE] = 1
1851 if not self.op.nodes:
1852 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1854 self.needed_locks[locking.LEVEL_NODE] = \
1855 _GetWantedNodes(self, self.op.nodes)
1857 def CheckPrereq(self):
1858 """Check prerequisites.
1860 This checks that the fields required are valid output fields.
1863 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1865 def Exec(self, feedback_fn):
1866 """Computes the list of nodes and their attributes.
1869 nodenames = self.nodes
1870 volumes = self.rpc.call_node_volumes(nodenames)
1872 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1873 in self.cfg.GetInstanceList()]
1875 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1878 for node in nodenames:
1879 if node not in volumes or volumes[node].failed or not volumes[node].data:
1882 node_vols = volumes[node].data[:]
1883 node_vols.sort(key=lambda vol: vol['dev'])
1885 for vol in node_vols:
1887 for field in self.op.output_fields:
1890 elif field == "phys":
1894 elif field == "name":
1896 elif field == "size":
1897 val = int(float(vol['size']))
1898 elif field == "instance":
1900 if node not in lv_by_node[inst]:
1902 if vol['name'] in lv_by_node[inst][node]:
1908 raise errors.ParameterError(field)
1909 node_output.append(str(val))
1911 output.append(node_output)
1916 class LUAddNode(LogicalUnit):
1917 """Logical unit for adding node to the cluster.
1921 HTYPE = constants.HTYPE_NODE
1922 _OP_REQP = ["node_name"]
1924 def BuildHooksEnv(self):
1927 This will run on all nodes before, and on all nodes + the new node after.
1931 "OP_TARGET": self.op.node_name,
1932 "NODE_NAME": self.op.node_name,
1933 "NODE_PIP": self.op.primary_ip,
1934 "NODE_SIP": self.op.secondary_ip,
1936 nodes_0 = self.cfg.GetNodeList()
1937 nodes_1 = nodes_0 + [self.op.node_name, ]
1938 return env, nodes_0, nodes_1
1940 def CheckPrereq(self):
1941 """Check prerequisites.
1944 - the new node is not already in the config
1946 - its parameters (single/dual homed) matches the cluster
1948 Any errors are signalled by raising errors.OpPrereqError.
1951 node_name = self.op.node_name
1954 dns_data = utils.HostInfo(node_name)
1956 node = dns_data.name
1957 primary_ip = self.op.primary_ip = dns_data.ip
1958 secondary_ip = getattr(self.op, "secondary_ip", None)
1959 if secondary_ip is None:
1960 secondary_ip = primary_ip
1961 if not utils.IsValidIP(secondary_ip):
1962 raise errors.OpPrereqError("Invalid secondary IP given")
1963 self.op.secondary_ip = secondary_ip
1965 node_list = cfg.GetNodeList()
1966 if not self.op.readd and node in node_list:
1967 raise errors.OpPrereqError("Node %s is already in the configuration" %
1969 elif self.op.readd and node not in node_list:
1970 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1972 for existing_node_name in node_list:
1973 existing_node = cfg.GetNodeInfo(existing_node_name)
1975 if self.op.readd and node == existing_node_name:
1976 if (existing_node.primary_ip != primary_ip or
1977 existing_node.secondary_ip != secondary_ip):
1978 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1979 " address configuration as before")
1982 if (existing_node.primary_ip == primary_ip or
1983 existing_node.secondary_ip == primary_ip or
1984 existing_node.primary_ip == secondary_ip or
1985 existing_node.secondary_ip == secondary_ip):
1986 raise errors.OpPrereqError("New node ip address(es) conflict with"
1987 " existing node %s" % existing_node.name)
1989 # check that the type of the node (single versus dual homed) is the
1990 # same as for the master
1991 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1992 master_singlehomed = myself.secondary_ip == myself.primary_ip
1993 newbie_singlehomed = secondary_ip == primary_ip
1994 if master_singlehomed != newbie_singlehomed:
1995 if master_singlehomed:
1996 raise errors.OpPrereqError("The master has no private ip but the"
1997 " new node has one")
1999 raise errors.OpPrereqError("The master has a private ip but the"
2000 " new node doesn't have one")
2002 # checks reachablity
2003 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2004 raise errors.OpPrereqError("Node not reachable by ping")
2006 if not newbie_singlehomed:
2007 # check reachability from my secondary ip to newbie's secondary ip
2008 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2009 source=myself.secondary_ip):
2010 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2011 " based ping to noded port")
2013 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2014 mc_now, _ = self.cfg.GetMasterCandidateStats()
2015 master_candidate = mc_now < cp_size
2017 self.new_node = objects.Node(name=node,
2018 primary_ip=primary_ip,
2019 secondary_ip=secondary_ip,
2020 master_candidate=master_candidate,
2023 def Exec(self, feedback_fn):
2024 """Adds the new node to the cluster.
2027 new_node = self.new_node
2028 node = new_node.name
2030 # check connectivity
2031 result = self.rpc.call_version([node])[node]
2034 if constants.PROTOCOL_VERSION == result.data:
2035 logging.info("Communication to node %s fine, sw version %s match",
2038 raise errors.OpExecError("Version mismatch master version %s,"
2039 " node version %s" %
2040 (constants.PROTOCOL_VERSION, result.data))
2042 raise errors.OpExecError("Cannot get version from the new node")
2045 logging.info("Copy ssh key to node %s", node)
2046 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2048 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2049 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2055 keyarray.append(f.read())
2059 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2061 keyarray[3], keyarray[4], keyarray[5])
2063 if result.failed or not result.data:
2064 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2066 # Add node to our /etc/hosts, and add key to known_hosts
2067 utils.AddHostToEtcHosts(new_node.name)
2069 if new_node.secondary_ip != new_node.primary_ip:
2070 result = self.rpc.call_node_has_ip_address(new_node.name,
2071 new_node.secondary_ip)
2072 if result.failed or not result.data:
2073 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2074 " you gave (%s). Please fix and re-run this"
2075 " command." % new_node.secondary_ip)
2077 node_verify_list = [self.cfg.GetMasterNode()]
2078 node_verify_param = {
2080 # TODO: do a node-net-test as well?
2083 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2084 self.cfg.GetClusterName())
2085 for verifier in node_verify_list:
2086 if result[verifier].failed or not result[verifier].data:
2087 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2088 " for remote verification" % verifier)
2089 if result[verifier].data['nodelist']:
2090 for failed in result[verifier].data['nodelist']:
2091 feedback_fn("ssh/hostname verification failed %s -> %s" %
2092 (verifier, result[verifier]['nodelist'][failed]))
2093 raise errors.OpExecError("ssh/hostname verification failed.")
2095 # Distribute updated /etc/hosts and known_hosts to all nodes,
2096 # including the node just added
2097 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2098 dist_nodes = self.cfg.GetNodeList()
2099 if not self.op.readd:
2100 dist_nodes.append(node)
2101 if myself.name in dist_nodes:
2102 dist_nodes.remove(myself.name)
2104 logging.debug("Copying hosts and known_hosts to all nodes")
2105 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2106 result = self.rpc.call_upload_file(dist_nodes, fname)
2107 for to_node, to_result in result.iteritems():
2108 if to_result.failed or not to_result.data:
2109 logging.error("Copy of file %s to node %s failed", fname, to_node)
2112 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2113 to_copy.append(constants.VNC_PASSWORD_FILE)
2114 for fname in to_copy:
2115 result = self.rpc.call_upload_file([node], fname)
2116 if result[node].failed or not result[node]:
2117 logging.error("Could not copy file %s to node %s", fname, node)
2120 self.context.ReaddNode(new_node)
2122 self.context.AddNode(new_node)
2125 class LUSetNodeParams(LogicalUnit):
2126 """Modifies the parameters of a node.
2129 HPATH = "node-modify"
2130 HTYPE = constants.HTYPE_NODE
2131 _OP_REQP = ["node_name"]
2134 def CheckArguments(self):
2135 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2136 if node_name is None:
2137 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2138 self.op.node_name = node_name
2139 _CheckBooleanOpField(self.op, 'master_candidate')
2140 _CheckBooleanOpField(self.op, 'offline')
2141 if self.op.master_candidate is None and self.op.offline is None:
2142 raise errors.OpPrereqError("Please pass at least one modification")
2143 if self.op.offline == True and self.op.master_candidate == True:
2144 raise errors.OpPrereqError("Can't set the node into offline and"
2145 " master_candidate at the same time")
2147 def ExpandNames(self):
2148 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2150 def BuildHooksEnv(self):
2153 This runs on the master node.
2157 "OP_TARGET": self.op.node_name,
2158 "MASTER_CANDIDATE": str(self.op.master_candidate),
2159 "OFFLINE": str(self.op.offline),
2161 nl = [self.cfg.GetMasterNode(),
2165 def CheckPrereq(self):
2166 """Check prerequisites.
2168 This only checks the instance list against the existing names.
2171 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2173 if ((self.op.master_candidate == False or self.op.offline == True)
2174 and node.master_candidate):
2175 # we will demote the node from master_candidate
2176 if self.op.node_name == self.cfg.GetMasterNode():
2177 raise errors.OpPrereqError("The master node has to be a"
2178 " master candidate and online")
2179 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2180 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2181 if num_candidates <= cp_size:
2182 msg = ("Not enough master candidates (desired"
2183 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2185 self.LogWarning(msg)
2187 raise errors.OpPrereqError(msg)
2189 if (self.op.master_candidate == True and node.offline and
2190 not self.op.offline == False):
2191 raise errors.OpPrereqError("Can't set an offline node to"
2192 " master_candidate")
2196 def Exec(self, feedback_fn):
2204 if self.op.offline is not None:
2205 node.offline = self.op.offline
2206 result.append(("offline", str(self.op.offline)))
2207 if self.op.offline == True and node.master_candidate:
2208 node.master_candidate = False
2209 result.append(("master_candidate", "auto-demotion due to offline"))
2211 if self.op.master_candidate is not None:
2212 node.master_candidate = self.op.master_candidate
2213 result.append(("master_candidate", str(self.op.master_candidate)))
2214 if self.op.master_candidate == False:
2215 rrc = self.rpc.call_node_demote_from_mc(node.name)
2216 if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2217 or len(rrc.data) != 2):
2218 self.LogWarning("Node rpc error: %s" % rrc.error)
2219 elif not rrc.data[0]:
2220 self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2222 # this will trigger configuration file update, if needed
2223 self.cfg.Update(node)
2224 # this will trigger job queue propagation or cleanup
2225 if self.op.node_name != self.cfg.GetMasterNode():
2226 self.context.ReaddNode(node)
2231 class LUQueryClusterInfo(NoHooksLU):
2232 """Query cluster configuration.
2238 def ExpandNames(self):
2239 self.needed_locks = {}
2241 def CheckPrereq(self):
2242 """No prerequsites needed for this LU.
2247 def Exec(self, feedback_fn):
2248 """Return cluster config.
2251 cluster = self.cfg.GetClusterInfo()
2253 "software_version": constants.RELEASE_VERSION,
2254 "protocol_version": constants.PROTOCOL_VERSION,
2255 "config_version": constants.CONFIG_VERSION,
2256 "os_api_version": constants.OS_API_VERSION,
2257 "export_version": constants.EXPORT_VERSION,
2258 "architecture": (platform.architecture()[0], platform.machine()),
2259 "name": cluster.cluster_name,
2260 "master": cluster.master_node,
2261 "default_hypervisor": cluster.default_hypervisor,
2262 "enabled_hypervisors": cluster.enabled_hypervisors,
2263 "hvparams": cluster.hvparams,
2264 "beparams": cluster.beparams,
2265 "candidate_pool_size": cluster.candidate_pool_size,
2271 class LUQueryConfigValues(NoHooksLU):
2272 """Return configuration values.
2277 _FIELDS_DYNAMIC = utils.FieldSet()
2278 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2280 def ExpandNames(self):
2281 self.needed_locks = {}
2283 _CheckOutputFields(static=self._FIELDS_STATIC,
2284 dynamic=self._FIELDS_DYNAMIC,
2285 selected=self.op.output_fields)
2287 def CheckPrereq(self):
2288 """No prerequisites.
2293 def Exec(self, feedback_fn):
2294 """Dump a representation of the cluster config to the standard output.
2298 for field in self.op.output_fields:
2299 if field == "cluster_name":
2300 entry = self.cfg.GetClusterName()
2301 elif field == "master_node":
2302 entry = self.cfg.GetMasterNode()
2303 elif field == "drain_flag":
2304 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2306 raise errors.ParameterError(field)
2307 values.append(entry)
2311 class LUActivateInstanceDisks(NoHooksLU):
2312 """Bring up an instance's disks.
2315 _OP_REQP = ["instance_name"]
2318 def ExpandNames(self):
2319 self._ExpandAndLockInstance()
2320 self.needed_locks[locking.LEVEL_NODE] = []
2321 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2323 def DeclareLocks(self, level):
2324 if level == locking.LEVEL_NODE:
2325 self._LockInstancesNodes()
2327 def CheckPrereq(self):
2328 """Check prerequisites.
2330 This checks that the instance is in the cluster.
2333 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2334 assert self.instance is not None, \
2335 "Cannot retrieve locked instance %s" % self.op.instance_name
2336 _CheckNodeOnline(self, instance.primary_node)
2338 def Exec(self, feedback_fn):
2339 """Activate the disks.
2342 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2344 raise errors.OpExecError("Cannot activate block devices")
2349 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2350 """Prepare the block devices for an instance.
2352 This sets up the block devices on all nodes.
2354 @type lu: L{LogicalUnit}
2355 @param lu: the logical unit on whose behalf we execute
2356 @type instance: L{objects.Instance}
2357 @param instance: the instance for whose disks we assemble
2358 @type ignore_secondaries: boolean
2359 @param ignore_secondaries: if true, errors on secondary nodes
2360 won't result in an error return from the function
2361 @return: False if the operation failed, otherwise a list of
2362 (host, instance_visible_name, node_visible_name)
2363 with the mapping from node devices to instance devices
2368 iname = instance.name
2369 # With the two passes mechanism we try to reduce the window of
2370 # opportunity for the race condition of switching DRBD to primary
2371 # before handshaking occured, but we do not eliminate it
2373 # The proper fix would be to wait (with some limits) until the
2374 # connection has been made and drbd transitions from WFConnection
2375 # into any other network-connected state (Connected, SyncTarget,
2378 # 1st pass, assemble on all nodes in secondary mode
2379 for inst_disk in instance.disks:
2380 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2381 lu.cfg.SetDiskID(node_disk, node)
2382 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2383 if result.failed or not result:
2384 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2385 " (is_primary=False, pass=1)",
2386 inst_disk.iv_name, node)
2387 if not ignore_secondaries:
2390 # FIXME: race condition on drbd migration to primary
2392 # 2nd pass, do only the primary node
2393 for inst_disk in instance.disks:
2394 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2395 if node != instance.primary_node:
2397 lu.cfg.SetDiskID(node_disk, node)
2398 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2399 if result.failed or not result:
2400 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2401 " (is_primary=True, pass=2)",
2402 inst_disk.iv_name, node)
2404 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2406 # leave the disks configured for the primary node
2407 # this is a workaround that would be fixed better by
2408 # improving the logical/physical id handling
2409 for disk in instance.disks:
2410 lu.cfg.SetDiskID(disk, instance.primary_node)
2412 return disks_ok, device_info
2415 def _StartInstanceDisks(lu, instance, force):
2416 """Start the disks of an instance.
2419 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2420 ignore_secondaries=force)
2422 _ShutdownInstanceDisks(lu, instance)
2423 if force is not None and not force:
2424 lu.proc.LogWarning("", hint="If the message above refers to a"
2426 " you can retry the operation using '--force'.")
2427 raise errors.OpExecError("Disk consistency error")
2430 class LUDeactivateInstanceDisks(NoHooksLU):
2431 """Shutdown an instance's disks.
2434 _OP_REQP = ["instance_name"]
2437 def ExpandNames(self):
2438 self._ExpandAndLockInstance()
2439 self.needed_locks[locking.LEVEL_NODE] = []
2440 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2442 def DeclareLocks(self, level):
2443 if level == locking.LEVEL_NODE:
2444 self._LockInstancesNodes()
2446 def CheckPrereq(self):
2447 """Check prerequisites.
2449 This checks that the instance is in the cluster.
2452 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2453 assert self.instance is not None, \
2454 "Cannot retrieve locked instance %s" % self.op.instance_name
2456 def Exec(self, feedback_fn):
2457 """Deactivate the disks
2460 instance = self.instance
2461 _SafeShutdownInstanceDisks(self, instance)
2464 def _SafeShutdownInstanceDisks(lu, instance):
2465 """Shutdown block devices of an instance.
2467 This function checks if an instance is running, before calling
2468 _ShutdownInstanceDisks.
2471 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2472 [instance.hypervisor])
2473 ins_l = ins_l[instance.primary_node]
2474 if ins_l.failed or not isinstance(ins_l.data, list):
2475 raise errors.OpExecError("Can't contact node '%s'" %
2476 instance.primary_node)
2478 if instance.name in ins_l.data:
2479 raise errors.OpExecError("Instance is running, can't shutdown"
2482 _ShutdownInstanceDisks(lu, instance)
2485 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2486 """Shutdown block devices of an instance.
2488 This does the shutdown on all nodes of the instance.
2490 If the ignore_primary is false, errors on the primary node are
2495 for disk in instance.disks:
2496 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2497 lu.cfg.SetDiskID(top_disk, node)
2498 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2499 if result.failed or not result.data:
2500 logging.error("Could not shutdown block device %s on node %s",
2502 if not ignore_primary or node != instance.primary_node:
2507 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2508 """Checks if a node has enough free memory.
2510 This function check if a given node has the needed amount of free
2511 memory. In case the node has less memory or we cannot get the
2512 information from the node, this function raise an OpPrereqError
2515 @type lu: C{LogicalUnit}
2516 @param lu: a logical unit from which we get configuration data
2518 @param node: the node to check
2519 @type reason: C{str}
2520 @param reason: string to use in the error message
2521 @type requested: C{int}
2522 @param requested: the amount of memory in MiB to check for
2523 @type hypervisor: C{str}
2524 @param hypervisor: the hypervisor to ask for memory stats
2525 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2526 we cannot check the node
2529 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2530 nodeinfo[node].Raise()
2531 free_mem = nodeinfo[node].data.get('memory_free')
2532 if not isinstance(free_mem, int):
2533 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2534 " was '%s'" % (node, free_mem))
2535 if requested > free_mem:
2536 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2537 " needed %s MiB, available %s MiB" %
2538 (node, reason, requested, free_mem))
2541 class LUStartupInstance(LogicalUnit):
2542 """Starts an instance.
2545 HPATH = "instance-start"
2546 HTYPE = constants.HTYPE_INSTANCE
2547 _OP_REQP = ["instance_name", "force"]
2550 def ExpandNames(self):
2551 self._ExpandAndLockInstance()
2553 def BuildHooksEnv(self):
2556 This runs on master, primary and secondary nodes of the instance.
2560 "FORCE": self.op.force,
2562 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2563 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2564 list(self.instance.secondary_nodes))
2567 def CheckPrereq(self):
2568 """Check prerequisites.
2570 This checks that the instance is in the cluster.
2573 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2574 assert self.instance is not None, \
2575 "Cannot retrieve locked instance %s" % self.op.instance_name
2577 _CheckNodeOnline(self, instance.primary_node)
2579 bep = self.cfg.GetClusterInfo().FillBE(instance)
2580 # check bridges existance
2581 _CheckInstanceBridgesExist(self, instance)
2583 _CheckNodeFreeMemory(self, instance.primary_node,
2584 "starting instance %s" % instance.name,
2585 bep[constants.BE_MEMORY], instance.hypervisor)
2587 def Exec(self, feedback_fn):
2588 """Start the instance.
2591 instance = self.instance
2592 force = self.op.force
2593 extra_args = getattr(self.op, "extra_args", "")
2595 self.cfg.MarkInstanceUp(instance.name)
2597 node_current = instance.primary_node
2599 _StartInstanceDisks(self, instance, force)
2601 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2602 if result.failed or not result.data:
2603 _ShutdownInstanceDisks(self, instance)
2604 raise errors.OpExecError("Could not start instance")
2607 class LURebootInstance(LogicalUnit):
2608 """Reboot an instance.
2611 HPATH = "instance-reboot"
2612 HTYPE = constants.HTYPE_INSTANCE
2613 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2616 def ExpandNames(self):
2617 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2618 constants.INSTANCE_REBOOT_HARD,
2619 constants.INSTANCE_REBOOT_FULL]:
2620 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2621 (constants.INSTANCE_REBOOT_SOFT,
2622 constants.INSTANCE_REBOOT_HARD,
2623 constants.INSTANCE_REBOOT_FULL))
2624 self._ExpandAndLockInstance()
2626 def BuildHooksEnv(self):
2629 This runs on master, primary and secondary nodes of the instance.
2633 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2635 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2636 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2637 list(self.instance.secondary_nodes))
2640 def CheckPrereq(self):
2641 """Check prerequisites.
2643 This checks that the instance is in the cluster.
2646 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2647 assert self.instance is not None, \
2648 "Cannot retrieve locked instance %s" % self.op.instance_name
2650 _CheckNodeOnline(self, instance.primary_node)
2652 # check bridges existance
2653 _CheckInstanceBridgesExist(self, instance)
2655 def Exec(self, feedback_fn):
2656 """Reboot the instance.
2659 instance = self.instance
2660 ignore_secondaries = self.op.ignore_secondaries
2661 reboot_type = self.op.reboot_type
2662 extra_args = getattr(self.op, "extra_args", "")
2664 node_current = instance.primary_node
2666 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2667 constants.INSTANCE_REBOOT_HARD]:
2668 result = self.rpc.call_instance_reboot(node_current, instance,
2669 reboot_type, extra_args)
2670 if result.failed or not result.data:
2671 raise errors.OpExecError("Could not reboot instance")
2673 if not self.rpc.call_instance_shutdown(node_current, instance):
2674 raise errors.OpExecError("could not shutdown instance for full reboot")
2675 _ShutdownInstanceDisks(self, instance)
2676 _StartInstanceDisks(self, instance, ignore_secondaries)
2677 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2678 if result.failed or not result.data:
2679 _ShutdownInstanceDisks(self, instance)
2680 raise errors.OpExecError("Could not start instance for full reboot")
2682 self.cfg.MarkInstanceUp(instance.name)
2685 class LUShutdownInstance(LogicalUnit):
2686 """Shutdown an instance.
2689 HPATH = "instance-stop"
2690 HTYPE = constants.HTYPE_INSTANCE
2691 _OP_REQP = ["instance_name"]
2694 def ExpandNames(self):
2695 self._ExpandAndLockInstance()
2697 def BuildHooksEnv(self):
2700 This runs on master, primary and secondary nodes of the instance.
2703 env = _BuildInstanceHookEnvByObject(self, self.instance)
2704 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2705 list(self.instance.secondary_nodes))
2708 def CheckPrereq(self):
2709 """Check prerequisites.
2711 This checks that the instance is in the cluster.
2714 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2715 assert self.instance is not None, \
2716 "Cannot retrieve locked instance %s" % self.op.instance_name
2717 _CheckNodeOnline(self, instance.primary_node)
2719 def Exec(self, feedback_fn):
2720 """Shutdown the instance.
2723 instance = self.instance
2724 node_current = instance.primary_node
2725 self.cfg.MarkInstanceDown(instance.name)
2726 result = self.rpc.call_instance_shutdown(node_current, instance)
2727 if result.failed or not result.data:
2728 self.proc.LogWarning("Could not shutdown instance")
2730 _ShutdownInstanceDisks(self, instance)
2733 class LUReinstallInstance(LogicalUnit):
2734 """Reinstall an instance.
2737 HPATH = "instance-reinstall"
2738 HTYPE = constants.HTYPE_INSTANCE
2739 _OP_REQP = ["instance_name"]
2742 def ExpandNames(self):
2743 self._ExpandAndLockInstance()
2745 def BuildHooksEnv(self):
2748 This runs on master, primary and secondary nodes of the instance.
2751 env = _BuildInstanceHookEnvByObject(self, self.instance)
2752 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2753 list(self.instance.secondary_nodes))
2756 def CheckPrereq(self):
2757 """Check prerequisites.
2759 This checks that the instance is in the cluster and is not running.
2762 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2763 assert instance is not None, \
2764 "Cannot retrieve locked instance %s" % self.op.instance_name
2765 _CheckNodeOnline(self, instance.primary_node)
2767 if instance.disk_template == constants.DT_DISKLESS:
2768 raise errors.OpPrereqError("Instance '%s' has no disks" %
2769 self.op.instance_name)
2770 if instance.status != "down":
2771 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2772 self.op.instance_name)
2773 remote_info = self.rpc.call_instance_info(instance.primary_node,
2775 instance.hypervisor)
2776 if remote_info.failed or remote_info.data:
2777 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2778 (self.op.instance_name,
2779 instance.primary_node))
2781 self.op.os_type = getattr(self.op, "os_type", None)
2782 if self.op.os_type is not None:
2784 pnode = self.cfg.GetNodeInfo(
2785 self.cfg.ExpandNodeName(instance.primary_node))
2787 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2789 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2791 if not isinstance(result.data, objects.OS):
2792 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2793 " primary node" % self.op.os_type)
2795 self.instance = instance
2797 def Exec(self, feedback_fn):
2798 """Reinstall the instance.
2801 inst = self.instance
2803 if self.op.os_type is not None:
2804 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2805 inst.os = self.op.os_type
2806 self.cfg.Update(inst)
2808 _StartInstanceDisks(self, inst, None)
2810 feedback_fn("Running the instance OS create scripts...")
2811 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2814 raise errors.OpExecError("Could not install OS for instance %s"
2816 (inst.name, inst.primary_node))
2818 _ShutdownInstanceDisks(self, inst)
2821 class LURenameInstance(LogicalUnit):
2822 """Rename an instance.
2825 HPATH = "instance-rename"
2826 HTYPE = constants.HTYPE_INSTANCE
2827 _OP_REQP = ["instance_name", "new_name"]
2829 def BuildHooksEnv(self):
2832 This runs on master, primary and secondary nodes of the instance.
2835 env = _BuildInstanceHookEnvByObject(self, self.instance)
2836 env["INSTANCE_NEW_NAME"] = self.op.new_name
2837 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2838 list(self.instance.secondary_nodes))
2841 def CheckPrereq(self):
2842 """Check prerequisites.
2844 This checks that the instance is in the cluster and is not running.
2847 instance = self.cfg.GetInstanceInfo(
2848 self.cfg.ExpandInstanceName(self.op.instance_name))
2849 if instance is None:
2850 raise errors.OpPrereqError("Instance '%s' not known" %
2851 self.op.instance_name)
2852 _CheckNodeOnline(self, instance.primary_node)
2854 if instance.status != "down":
2855 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2856 self.op.instance_name)
2857 remote_info = self.rpc.call_instance_info(instance.primary_node,
2859 instance.hypervisor)
2861 if remote_info.data:
2862 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2863 (self.op.instance_name,
2864 instance.primary_node))
2865 self.instance = instance
2867 # new name verification
2868 name_info = utils.HostInfo(self.op.new_name)
2870 self.op.new_name = new_name = name_info.name
2871 instance_list = self.cfg.GetInstanceList()
2872 if new_name in instance_list:
2873 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2876 if not getattr(self.op, "ignore_ip", False):
2877 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2878 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2879 (name_info.ip, new_name))
2882 def Exec(self, feedback_fn):
2883 """Reinstall the instance.
2886 inst = self.instance
2887 old_name = inst.name
2889 if inst.disk_template == constants.DT_FILE:
2890 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2892 self.cfg.RenameInstance(inst.name, self.op.new_name)
2893 # Change the instance lock. This is definitely safe while we hold the BGL
2894 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2895 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2897 # re-read the instance from the configuration after rename
2898 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2900 if inst.disk_template == constants.DT_FILE:
2901 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2902 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2903 old_file_storage_dir,
2904 new_file_storage_dir)
2907 raise errors.OpExecError("Could not connect to node '%s' to rename"
2908 " directory '%s' to '%s' (but the instance"
2909 " has been renamed in Ganeti)" % (
2910 inst.primary_node, old_file_storage_dir,
2911 new_file_storage_dir))
2913 if not result.data[0]:
2914 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2915 " (but the instance has been renamed in"
2916 " Ganeti)" % (old_file_storage_dir,
2917 new_file_storage_dir))
2919 _StartInstanceDisks(self, inst, None)
2921 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2923 if result.failed or not result.data:
2924 msg = ("Could not run OS rename script for instance %s on node %s"
2925 " (but the instance has been renamed in Ganeti)" %
2926 (inst.name, inst.primary_node))
2927 self.proc.LogWarning(msg)
2929 _ShutdownInstanceDisks(self, inst)
2932 class LURemoveInstance(LogicalUnit):
2933 """Remove an instance.
2936 HPATH = "instance-remove"
2937 HTYPE = constants.HTYPE_INSTANCE
2938 _OP_REQP = ["instance_name", "ignore_failures"]
2941 def ExpandNames(self):
2942 self._ExpandAndLockInstance()
2943 self.needed_locks[locking.LEVEL_NODE] = []
2944 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2946 def DeclareLocks(self, level):
2947 if level == locking.LEVEL_NODE:
2948 self._LockInstancesNodes()
2950 def BuildHooksEnv(self):
2953 This runs on master, primary and secondary nodes of the instance.
2956 env = _BuildInstanceHookEnvByObject(self, self.instance)
2957 nl = [self.cfg.GetMasterNode()]
2960 def CheckPrereq(self):
2961 """Check prerequisites.
2963 This checks that the instance is in the cluster.
2966 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2967 assert self.instance is not None, \
2968 "Cannot retrieve locked instance %s" % self.op.instance_name
2970 def Exec(self, feedback_fn):
2971 """Remove the instance.
2974 instance = self.instance
2975 logging.info("Shutting down instance %s on node %s",
2976 instance.name, instance.primary_node)
2978 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2979 if result.failed or not result.data:
2980 if self.op.ignore_failures:
2981 feedback_fn("Warning: can't shutdown instance")
2983 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2984 (instance.name, instance.primary_node))
2986 logging.info("Removing block devices for instance %s", instance.name)
2988 if not _RemoveDisks(self, instance):
2989 if self.op.ignore_failures:
2990 feedback_fn("Warning: can't remove instance's disks")
2992 raise errors.OpExecError("Can't remove instance's disks")
2994 logging.info("Removing instance %s out of cluster config", instance.name)
2996 self.cfg.RemoveInstance(instance.name)
2997 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3000 class LUQueryInstances(NoHooksLU):
3001 """Logical unit for querying instances.
3004 _OP_REQP = ["output_fields", "names"]
3006 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3007 "admin_state", "admin_ram",
3008 "disk_template", "ip", "mac", "bridge",
3009 "sda_size", "sdb_size", "vcpus", "tags",
3010 "network_port", "beparams",
3011 "(disk).(size)/([0-9]+)",
3013 "(nic).(mac|ip|bridge)/([0-9]+)",
3014 "(nic).(macs|ips|bridges)",
3015 "(disk|nic).(count)",
3016 "serial_no", "hypervisor", "hvparams",] +
3018 for name in constants.HVS_PARAMETERS] +
3020 for name in constants.BES_PARAMETERS])
3021 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3024 def ExpandNames(self):
3025 _CheckOutputFields(static=self._FIELDS_STATIC,
3026 dynamic=self._FIELDS_DYNAMIC,
3027 selected=self.op.output_fields)
3029 self.needed_locks = {}
3030 self.share_locks[locking.LEVEL_INSTANCE] = 1
3031 self.share_locks[locking.LEVEL_NODE] = 1
3034 self.wanted = _GetWantedInstances(self, self.op.names)
3036 self.wanted = locking.ALL_SET
3038 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3040 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3041 self.needed_locks[locking.LEVEL_NODE] = []
3042 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3044 def DeclareLocks(self, level):
3045 if level == locking.LEVEL_NODE and self.do_locking:
3046 self._LockInstancesNodes()
3048 def CheckPrereq(self):
3049 """Check prerequisites.
3054 def Exec(self, feedback_fn):
3055 """Computes the list of nodes and their attributes.
3058 all_info = self.cfg.GetAllInstancesInfo()
3060 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3061 elif self.wanted != locking.ALL_SET:
3062 instance_names = self.wanted
3063 missing = set(instance_names).difference(all_info.keys())
3065 raise errors.OpExecError(
3066 "Some instances were removed before retrieving their data: %s"
3069 instance_names = all_info.keys()
3071 instance_names = utils.NiceSort(instance_names)
3072 instance_list = [all_info[iname] for iname in instance_names]
3074 # begin data gathering
3076 nodes = frozenset([inst.primary_node for inst in instance_list])
3077 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3083 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3085 result = node_data[name]
3087 # offline nodes will be in both lists
3088 off_nodes.append(name)
3090 bad_nodes.append(name)
3093 live_data.update(result.data)
3094 # else no instance is alive
3096 live_data = dict([(name, {}) for name in instance_names])
3098 # end data gathering
3103 for instance in instance_list:
3105 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3106 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3107 for field in self.op.output_fields:
3108 st_match = self._FIELDS_STATIC.Matches(field)
3113 elif field == "pnode":
3114 val = instance.primary_node
3115 elif field == "snodes":
3116 val = list(instance.secondary_nodes)
3117 elif field == "admin_state":
3118 val = (instance.status != "down")
3119 elif field == "oper_state":
3120 if instance.primary_node in bad_nodes:
3123 val = bool(live_data.get(instance.name))
3124 elif field == "status":
3125 if instance.primary_node in off_nodes:
3126 val = "ERROR_nodeoffline"
3127 elif instance.primary_node in bad_nodes:
3128 val = "ERROR_nodedown"
3130 running = bool(live_data.get(instance.name))
3132 if instance.status != "down":
3137 if instance.status != "down":
3141 elif field == "oper_ram":
3142 if instance.primary_node in bad_nodes:
3144 elif instance.name in live_data:
3145 val = live_data[instance.name].get("memory", "?")
3148 elif field == "disk_template":
3149 val = instance.disk_template
3151 val = instance.nics[0].ip
3152 elif field == "bridge":
3153 val = instance.nics[0].bridge
3154 elif field == "mac":
3155 val = instance.nics[0].mac
3156 elif field == "sda_size" or field == "sdb_size":
3157 idx = ord(field[2]) - ord('a')
3159 val = instance.FindDisk(idx).size
3160 except errors.OpPrereqError:
3162 elif field == "tags":
3163 val = list(instance.GetTags())
3164 elif field == "serial_no":
3165 val = instance.serial_no
3166 elif field == "network_port":
3167 val = instance.network_port
3168 elif field == "hypervisor":
3169 val = instance.hypervisor
3170 elif field == "hvparams":
3172 elif (field.startswith(HVPREFIX) and
3173 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3174 val = i_hv.get(field[len(HVPREFIX):], None)
3175 elif field == "beparams":
3177 elif (field.startswith(BEPREFIX) and
3178 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3179 val = i_be.get(field[len(BEPREFIX):], None)
3180 elif st_match and st_match.groups():
3181 # matches a variable list
3182 st_groups = st_match.groups()
3183 if st_groups and st_groups[0] == "disk":
3184 if st_groups[1] == "count":
3185 val = len(instance.disks)
3186 elif st_groups[1] == "sizes":
3187 val = [disk.size for disk in instance.disks]
3188 elif st_groups[1] == "size":
3190 val = instance.FindDisk(st_groups[2]).size
3191 except errors.OpPrereqError:
3194 assert False, "Unhandled disk parameter"
3195 elif st_groups[0] == "nic":
3196 if st_groups[1] == "count":
3197 val = len(instance.nics)
3198 elif st_groups[1] == "macs":
3199 val = [nic.mac for nic in instance.nics]
3200 elif st_groups[1] == "ips":
3201 val = [nic.ip for nic in instance.nics]
3202 elif st_groups[1] == "bridges":
3203 val = [nic.bridge for nic in instance.nics]
3206 nic_idx = int(st_groups[2])
3207 if nic_idx >= len(instance.nics):
3210 if st_groups[1] == "mac":
3211 val = instance.nics[nic_idx].mac
3212 elif st_groups[1] == "ip":
3213 val = instance.nics[nic_idx].ip
3214 elif st_groups[1] == "bridge":
3215 val = instance.nics[nic_idx].bridge
3217 assert False, "Unhandled NIC parameter"
3219 assert False, "Unhandled variable parameter"
3221 raise errors.ParameterError(field)
3228 class LUFailoverInstance(LogicalUnit):
3229 """Failover an instance.
3232 HPATH = "instance-failover"
3233 HTYPE = constants.HTYPE_INSTANCE
3234 _OP_REQP = ["instance_name", "ignore_consistency"]
3237 def ExpandNames(self):
3238 self._ExpandAndLockInstance()
3239 self.needed_locks[locking.LEVEL_NODE] = []
3240 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3242 def DeclareLocks(self, level):
3243 if level == locking.LEVEL_NODE:
3244 self._LockInstancesNodes()
3246 def BuildHooksEnv(self):
3249 This runs on master, primary and secondary nodes of the instance.
3253 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3255 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3256 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3259 def CheckPrereq(self):
3260 """Check prerequisites.
3262 This checks that the instance is in the cluster.
3265 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3266 assert self.instance is not None, \
3267 "Cannot retrieve locked instance %s" % self.op.instance_name
3269 bep = self.cfg.GetClusterInfo().FillBE(instance)
3270 if instance.disk_template not in constants.DTS_NET_MIRROR:
3271 raise errors.OpPrereqError("Instance's disk layout is not"
3272 " network mirrored, cannot failover.")
3274 secondary_nodes = instance.secondary_nodes
3275 if not secondary_nodes:
3276 raise errors.ProgrammerError("no secondary node but using "
3277 "a mirrored disk template")
3279 target_node = secondary_nodes[0]
3280 _CheckNodeOnline(self, target_node)
3281 # check memory requirements on the secondary node
3282 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3283 instance.name, bep[constants.BE_MEMORY],
3284 instance.hypervisor)
3286 # check bridge existance
3287 brlist = [nic.bridge for nic in instance.nics]
3288 result = self.rpc.call_bridges_exist(target_node, brlist)
3291 raise errors.OpPrereqError("One or more target bridges %s does not"
3292 " exist on destination node '%s'" %
3293 (brlist, target_node))
3295 def Exec(self, feedback_fn):
3296 """Failover an instance.
3298 The failover is done by shutting it down on its present node and
3299 starting it on the secondary.
3302 instance = self.instance
3304 source_node = instance.primary_node
3305 target_node = instance.secondary_nodes[0]
3307 feedback_fn("* checking disk consistency between source and target")
3308 for dev in instance.disks:
3309 # for drbd, these are drbd over lvm
3310 if not _CheckDiskConsistency(self, dev, target_node, False):
3311 if instance.status == "up" and not self.op.ignore_consistency:
3312 raise errors.OpExecError("Disk %s is degraded on target node,"
3313 " aborting failover." % dev.iv_name)
3315 feedback_fn("* shutting down instance on source node")
3316 logging.info("Shutting down instance %s on node %s",
3317 instance.name, source_node)
3319 result = self.rpc.call_instance_shutdown(source_node, instance)
3320 if result.failed or not result.data:
3321 if self.op.ignore_consistency:
3322 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3324 " anyway. Please make sure node %s is down",
3325 instance.name, source_node, source_node)
3327 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3328 (instance.name, source_node))
3330 feedback_fn("* deactivating the instance's disks on source node")
3331 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3332 raise errors.OpExecError("Can't shut down the instance's disks.")
3334 instance.primary_node = target_node
3335 # distribute new instance config to the other nodes
3336 self.cfg.Update(instance)
3338 # Only start the instance if it's marked as up
3339 if instance.status == "up":
3340 feedback_fn("* activating the instance's disks on target node")
3341 logging.info("Starting instance %s on node %s",
3342 instance.name, target_node)
3344 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3345 ignore_secondaries=True)
3347 _ShutdownInstanceDisks(self, instance)
3348 raise errors.OpExecError("Can't activate the instance's disks")
3350 feedback_fn("* starting the instance on the target node")
3351 result = self.rpc.call_instance_start(target_node, instance, None)
3352 if result.failed or not result.data:
3353 _ShutdownInstanceDisks(self, instance)
3354 raise errors.OpExecError("Could not start instance %s on node %s." %
3355 (instance.name, target_node))
3358 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3359 """Create a tree of block devices on the primary node.
3361 This always creates all devices.
3365 for child in device.children:
3366 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3369 lu.cfg.SetDiskID(device, node)
3370 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3371 instance.name, True, info)
3372 if new_id.failed or not new_id.data:
3374 if device.physical_id is None:
3375 device.physical_id = new_id
3379 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3380 """Create a tree of block devices on a secondary node.
3382 If this device type has to be created on secondaries, create it and
3385 If not, just recurse to children keeping the same 'force' value.
3388 if device.CreateOnSecondary():
3391 for child in device.children:
3392 if not _CreateBlockDevOnSecondary(lu, node, instance,
3393 child, force, info):
3398 lu.cfg.SetDiskID(device, node)
3399 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3400 instance.name, False, info)
3401 if new_id.failed or not new_id.data:
3403 if device.physical_id is None:
3404 device.physical_id = new_id
3408 def _GenerateUniqueNames(lu, exts):
3409 """Generate a suitable LV name.
3411 This will generate a logical volume name for the given instance.
3416 new_id = lu.cfg.GenerateUniqueID()
3417 results.append("%s%s" % (new_id, val))
3421 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3423 """Generate a drbd8 device complete with its children.
3426 port = lu.cfg.AllocatePort()
3427 vgname = lu.cfg.GetVGName()
3428 shared_secret = lu.cfg.GenerateDRBDSecret()
3429 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3430 logical_id=(vgname, names[0]))
3431 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3432 logical_id=(vgname, names[1]))
3433 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3434 logical_id=(primary, secondary, port,
3437 children=[dev_data, dev_meta],
3442 def _GenerateDiskTemplate(lu, template_name,
3443 instance_name, primary_node,
3444 secondary_nodes, disk_info,
3445 file_storage_dir, file_driver,
3447 """Generate the entire disk layout for a given template type.
3450 #TODO: compute space requirements
3452 vgname = lu.cfg.GetVGName()
3453 disk_count = len(disk_info)
3455 if template_name == constants.DT_DISKLESS:
3457 elif template_name == constants.DT_PLAIN:
3458 if len(secondary_nodes) != 0:
3459 raise errors.ProgrammerError("Wrong template configuration")
3461 names = _GenerateUniqueNames(lu, [".disk%d" % i
3462 for i in range(disk_count)])
3463 for idx, disk in enumerate(disk_info):
3464 disk_index = idx + base_index
3465 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3466 logical_id=(vgname, names[idx]),
3467 iv_name="disk/%d" % disk_index)
3468 disks.append(disk_dev)
3469 elif template_name == constants.DT_DRBD8:
3470 if len(secondary_nodes) != 1:
3471 raise errors.ProgrammerError("Wrong template configuration")
3472 remote_node = secondary_nodes[0]
3473 minors = lu.cfg.AllocateDRBDMinor(
3474 [primary_node, remote_node] * len(disk_info), instance_name)
3476 names = _GenerateUniqueNames(lu,
3477 [".disk%d_%s" % (i, s)
3478 for i in range(disk_count)
3479 for s in ("data", "meta")
3481 for idx, disk in enumerate(disk_info):
3482 disk_index = idx + base_index
3483 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3484 disk["size"], names[idx*2:idx*2+2],
3485 "disk/%d" % disk_index,
3486 minors[idx*2], minors[idx*2+1])
3487 disks.append(disk_dev)
3488 elif template_name == constants.DT_FILE:
3489 if len(secondary_nodes) != 0:
3490 raise errors.ProgrammerError("Wrong template configuration")
3492 for idx, disk in enumerate(disk_info):
3493 disk_index = idx + base_index
3494 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3495 iv_name="disk/%d" % disk_index,
3496 logical_id=(file_driver,
3497 "%s/disk%d" % (file_storage_dir,
3499 disks.append(disk_dev)
3501 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3505 def _GetInstanceInfoText(instance):
3506 """Compute that text that should be added to the disk's metadata.
3509 return "originstname+%s" % instance.name
3512 def _CreateDisks(lu, instance):
3513 """Create all disks for an instance.
3515 This abstracts away some work from AddInstance.
3517 @type lu: L{LogicalUnit}
3518 @param lu: the logical unit on whose behalf we execute
3519 @type instance: L{objects.Instance}
3520 @param instance: the instance whose disks we should create
3522 @return: the success of the creation
3525 info = _GetInstanceInfoText(instance)
3527 if instance.disk_template == constants.DT_FILE:
3528 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3529 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3532 if result.failed or not result.data:
3533 logging.error("Could not connect to node '%s'", instance.primary_node)
3536 if not result.data[0]:
3537 logging.error("Failed to create directory '%s'", file_storage_dir)
3540 # Note: this needs to be kept in sync with adding of disks in
3541 # LUSetInstanceParams
3542 for device in instance.disks:
3543 logging.info("Creating volume %s for instance %s",
3544 device.iv_name, instance.name)
3546 for secondary_node in instance.secondary_nodes:
3547 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3548 device, False, info):
3549 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3550 device.iv_name, device, secondary_node)
3553 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3554 instance, device, info):
3555 logging.error("Failed to create volume %s on primary!", device.iv_name)
3561 def _RemoveDisks(lu, instance):
3562 """Remove all disks for an instance.
3564 This abstracts away some work from `AddInstance()` and
3565 `RemoveInstance()`. Note that in case some of the devices couldn't
3566 be removed, the removal will continue with the other ones (compare
3567 with `_CreateDisks()`).
3569 @type lu: L{LogicalUnit}
3570 @param lu: the logical unit on whose behalf we execute
3571 @type instance: L{objects.Instance}
3572 @param instance: the instance whose disks we should remove
3574 @return: the success of the removal
3577 logging.info("Removing block devices for instance %s", instance.name)
3580 for device in instance.disks:
3581 for node, disk in device.ComputeNodeTree(instance.primary_node):
3582 lu.cfg.SetDiskID(disk, node)
3583 result = lu.rpc.call_blockdev_remove(node, disk)
3584 if result.failed or not result.data:
3585 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3586 " continuing anyway", device.iv_name, node)
3589 if instance.disk_template == constants.DT_FILE:
3590 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3591 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3593 if result.failed or not result.data:
3594 logging.error("Could not remove directory '%s'", file_storage_dir)
3600 def _ComputeDiskSize(disk_template, disks):
3601 """Compute disk size requirements in the volume group
3604 # Required free disk space as a function of disk and swap space
3606 constants.DT_DISKLESS: None,
3607 constants.DT_PLAIN: sum(d["size"] for d in disks),
3608 # 128 MB are added for drbd metadata for each disk
3609 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3610 constants.DT_FILE: None,
3613 if disk_template not in req_size_dict:
3614 raise errors.ProgrammerError("Disk template '%s' size requirement"
3615 " is unknown" % disk_template)
3617 return req_size_dict[disk_template]
3620 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3621 """Hypervisor parameter validation.
3623 This function abstract the hypervisor parameter validation to be
3624 used in both instance create and instance modify.
3626 @type lu: L{LogicalUnit}
3627 @param lu: the logical unit for which we check
3628 @type nodenames: list
3629 @param nodenames: the list of nodes on which we should check
3630 @type hvname: string
3631 @param hvname: the name of the hypervisor we should use
3632 @type hvparams: dict
3633 @param hvparams: the parameters which we need to check
3634 @raise errors.OpPrereqError: if the parameters are not valid
3637 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3640 for node in nodenames:
3643 if not info.data or not isinstance(info.data, (tuple, list)):
3644 raise errors.OpPrereqError("Cannot get current information"
3645 " from node '%s' (%s)" % (node, info.data))
3646 if not info.data[0]:
3647 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3648 " %s" % info.data[1])
3651 class LUCreateInstance(LogicalUnit):
3652 """Create an instance.
3655 HPATH = "instance-add"
3656 HTYPE = constants.HTYPE_INSTANCE
3657 _OP_REQP = ["instance_name", "disks", "disk_template",
3659 "wait_for_sync", "ip_check", "nics",
3660 "hvparams", "beparams"]
3663 def _ExpandNode(self, node):
3664 """Expands and checks one node name.
3667 node_full = self.cfg.ExpandNodeName(node)
3668 if node_full is None:
3669 raise errors.OpPrereqError("Unknown node %s" % node)
3672 def ExpandNames(self):
3673 """ExpandNames for CreateInstance.
3675 Figure out the right locks for instance creation.
3678 self.needed_locks = {}
3680 # set optional parameters to none if they don't exist
3681 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3682 if not hasattr(self.op, attr):
3683 setattr(self.op, attr, None)
3685 # cheap checks, mostly valid constants given
3687 # verify creation mode
3688 if self.op.mode not in (constants.INSTANCE_CREATE,
3689 constants.INSTANCE_IMPORT):
3690 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3693 # disk template and mirror node verification
3694 if self.op.disk_template not in constants.DISK_TEMPLATES:
3695 raise errors.OpPrereqError("Invalid disk template name")
3697 if self.op.hypervisor is None:
3698 self.op.hypervisor = self.cfg.GetHypervisorType()
3700 cluster = self.cfg.GetClusterInfo()
3701 enabled_hvs = cluster.enabled_hypervisors
3702 if self.op.hypervisor not in enabled_hvs:
3703 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3704 " cluster (%s)" % (self.op.hypervisor,
3705 ",".join(enabled_hvs)))
3707 # check hypervisor parameter syntax (locally)
3709 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3711 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3712 hv_type.CheckParameterSyntax(filled_hvp)
3714 # fill and remember the beparams dict
3715 utils.CheckBEParams(self.op.beparams)
3716 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3719 #### instance parameters check
3721 # instance name verification
3722 hostname1 = utils.HostInfo(self.op.instance_name)
3723 self.op.instance_name = instance_name = hostname1.name
3725 # this is just a preventive check, but someone might still add this
3726 # instance in the meantime, and creation will fail at lock-add time
3727 if instance_name in self.cfg.GetInstanceList():
3728 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3731 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3735 for nic in self.op.nics:
3736 # ip validity checks
3737 ip = nic.get("ip", None)
3738 if ip is None or ip.lower() == "none":
3740 elif ip.lower() == constants.VALUE_AUTO:
3741 nic_ip = hostname1.ip
3743 if not utils.IsValidIP(ip):
3744 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3745 " like a valid IP" % ip)
3748 # MAC address verification
3749 mac = nic.get("mac", constants.VALUE_AUTO)
3750 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3751 if not utils.IsValidMac(mac.lower()):
3752 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3754 # bridge verification
3755 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3756 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3758 # disk checks/pre-build
3760 for disk in self.op.disks:
3761 mode = disk.get("mode", constants.DISK_RDWR)
3762 if mode not in constants.DISK_ACCESS_SET:
3763 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3765 size = disk.get("size", None)
3767 raise errors.OpPrereqError("Missing disk size")
3771 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3772 self.disks.append({"size": size, "mode": mode})
3774 # used in CheckPrereq for ip ping check
3775 self.check_ip = hostname1.ip
3777 # file storage checks
3778 if (self.op.file_driver and
3779 not self.op.file_driver in constants.FILE_DRIVER):
3780 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3781 self.op.file_driver)
3783 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3784 raise errors.OpPrereqError("File storage directory path not absolute")
3786 ### Node/iallocator related checks
3787 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3788 raise errors.OpPrereqError("One and only one of iallocator and primary"
3789 " node must be given")
3791 if self.op.iallocator:
3792 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3794 self.op.pnode = self._ExpandNode(self.op.pnode)
3795 nodelist = [self.op.pnode]
3796 if self.op.snode is not None:
3797 self.op.snode = self._ExpandNode(self.op.snode)
3798 nodelist.append(self.op.snode)
3799 self.needed_locks[locking.LEVEL_NODE] = nodelist
3801 # in case of import lock the source node too
3802 if self.op.mode == constants.INSTANCE_IMPORT:
3803 src_node = getattr(self.op, "src_node", None)
3804 src_path = getattr(self.op, "src_path", None)
3806 if src_path is None:
3807 self.op.src_path = src_path = self.op.instance_name
3809 if src_node is None:
3810 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3811 self.op.src_node = None
3812 if os.path.isabs(src_path):
3813 raise errors.OpPrereqError("Importing an instance from an absolute"
3814 " path requires a source node option.")
3816 self.op.src_node = src_node = self._ExpandNode(src_node)
3817 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3818 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3819 if not os.path.isabs(src_path):
3820 self.op.src_path = src_path = \
3821 os.path.join(constants.EXPORT_DIR, src_path)
3823 else: # INSTANCE_CREATE
3824 if getattr(self.op, "os_type", None) is None:
3825 raise errors.OpPrereqError("No guest OS specified")
3827 def _RunAllocator(self):
3828 """Run the allocator based on input opcode.
3831 nics = [n.ToDict() for n in self.nics]
3832 ial = IAllocator(self,
3833 mode=constants.IALLOCATOR_MODE_ALLOC,
3834 name=self.op.instance_name,
3835 disk_template=self.op.disk_template,
3838 vcpus=self.be_full[constants.BE_VCPUS],
3839 mem_size=self.be_full[constants.BE_MEMORY],
3842 hypervisor=self.op.hypervisor,
3845 ial.Run(self.op.iallocator)
3848 raise errors.OpPrereqError("Can't compute nodes using"
3849 " iallocator '%s': %s" % (self.op.iallocator,
3851 if len(ial.nodes) != ial.required_nodes:
3852 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3853 " of nodes (%s), required %s" %
3854 (self.op.iallocator, len(ial.nodes),
3855 ial.required_nodes))
3856 self.op.pnode = ial.nodes[0]
3857 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3858 self.op.instance_name, self.op.iallocator,
3859 ", ".join(ial.nodes))
3860 if ial.required_nodes == 2:
3861 self.op.snode = ial.nodes[1]
3863 def BuildHooksEnv(self):
3866 This runs on master, primary and secondary nodes of the instance.
3870 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3871 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3872 "INSTANCE_ADD_MODE": self.op.mode,
3874 if self.op.mode == constants.INSTANCE_IMPORT:
3875 env["INSTANCE_SRC_NODE"] = self.op.src_node
3876 env["INSTANCE_SRC_PATH"] = self.op.src_path
3877 env["INSTANCE_SRC_IMAGES"] = self.src_images
3879 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3880 primary_node=self.op.pnode,
3881 secondary_nodes=self.secondaries,
3882 status=self.instance_status,
3883 os_type=self.op.os_type,
3884 memory=self.be_full[constants.BE_MEMORY],
3885 vcpus=self.be_full[constants.BE_VCPUS],
3886 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3889 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3894 def CheckPrereq(self):
3895 """Check prerequisites.
3898 if (not self.cfg.GetVGName() and
3899 self.op.disk_template not in constants.DTS_NOT_LVM):
3900 raise errors.OpPrereqError("Cluster does not support lvm-based"
3904 if self.op.mode == constants.INSTANCE_IMPORT:
3905 src_node = self.op.src_node
3906 src_path = self.op.src_path
3908 if src_node is None:
3909 exp_list = self.rpc.call_export_list(
3910 self.acquired_locks[locking.LEVEL_NODE])
3912 for node in exp_list:
3913 if not exp_list[node].failed and src_path in exp_list[node].data:
3915 self.op.src_node = src_node = node
3916 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3920 raise errors.OpPrereqError("No export found for relative path %s" %
3923 _CheckNodeOnline(self, src_node)
3924 result = self.rpc.call_export_info(src_node, src_path)
3927 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3929 export_info = result.data
3930 if not export_info.has_section(constants.INISECT_EXP):
3931 raise errors.ProgrammerError("Corrupted export config")
3933 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3934 if (int(ei_version) != constants.EXPORT_VERSION):
3935 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3936 (ei_version, constants.EXPORT_VERSION))
3938 # Check that the new instance doesn't have less disks than the export
3939 instance_disks = len(self.disks)
3940 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3941 if instance_disks < export_disks:
3942 raise errors.OpPrereqError("Not enough disks to import."
3943 " (instance: %d, export: %d)" %
3944 (instance_disks, export_disks))
3946 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3948 for idx in range(export_disks):
3949 option = 'disk%d_dump' % idx
3950 if export_info.has_option(constants.INISECT_INS, option):
3951 # FIXME: are the old os-es, disk sizes, etc. useful?
3952 export_name = export_info.get(constants.INISECT_INS, option)
3953 image = os.path.join(src_path, export_name)
3954 disk_images.append(image)
3956 disk_images.append(False)
3958 self.src_images = disk_images
3960 old_name = export_info.get(constants.INISECT_INS, 'name')
3961 # FIXME: int() here could throw a ValueError on broken exports
3962 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3963 if self.op.instance_name == old_name:
3964 for idx, nic in enumerate(self.nics):
3965 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3966 nic_mac_ini = 'nic%d_mac' % idx
3967 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3969 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3970 if self.op.start and not self.op.ip_check:
3971 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3972 " adding an instance in start mode")
3974 if self.op.ip_check:
3975 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3976 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3977 (self.check_ip, self.op.instance_name))
3981 if self.op.iallocator is not None:
3982 self._RunAllocator()
3984 #### node related checks
3986 # check primary node
3987 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3988 assert self.pnode is not None, \
3989 "Cannot retrieve locked node %s" % self.op.pnode
3991 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3994 self.secondaries = []
3996 # mirror node verification
3997 if self.op.disk_template in constants.DTS_NET_MIRROR:
3998 if self.op.snode is None:
3999 raise errors.OpPrereqError("The networked disk templates need"
4001 if self.op.snode == pnode.name:
4002 raise errors.OpPrereqError("The secondary node cannot be"
4003 " the primary node.")
4004 self.secondaries.append(self.op.snode)
4005 _CheckNodeOnline(self, self.op.snode)
4007 nodenames = [pnode.name] + self.secondaries
4009 req_size = _ComputeDiskSize(self.op.disk_template,
4012 # Check lv size requirements
4013 if req_size is not None:
4014 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4016 for node in nodenames:
4017 info = nodeinfo[node]
4021 raise errors.OpPrereqError("Cannot get current information"
4022 " from node '%s'" % node)
4023 vg_free = info.get('vg_free', None)
4024 if not isinstance(vg_free, int):
4025 raise errors.OpPrereqError("Can't compute free disk space on"
4027 if req_size > info['vg_free']:
4028 raise errors.OpPrereqError("Not enough disk space on target node %s."
4029 " %d MB available, %d MB required" %
4030 (node, info['vg_free'], req_size))
4032 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4035 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4037 if not isinstance(result.data, objects.OS):
4038 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4039 " primary node" % self.op.os_type)
4041 # bridge check on primary node
4042 bridges = [n.bridge for n in self.nics]
4043 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4046 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4047 " exist on destination node '%s'" %
4048 (",".join(bridges), pnode.name))
4050 # memory check on primary node
4052 _CheckNodeFreeMemory(self, self.pnode.name,
4053 "creating instance %s" % self.op.instance_name,
4054 self.be_full[constants.BE_MEMORY],
4058 self.instance_status = 'up'
4060 self.instance_status = 'down'
4062 def Exec(self, feedback_fn):
4063 """Create and add the instance to the cluster.
4066 instance = self.op.instance_name
4067 pnode_name = self.pnode.name
4069 for nic in self.nics:
4070 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4071 nic.mac = self.cfg.GenerateMAC()
4073 ht_kind = self.op.hypervisor
4074 if ht_kind in constants.HTS_REQ_PORT:
4075 network_port = self.cfg.AllocatePort()
4079 ##if self.op.vnc_bind_address is None:
4080 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4082 # this is needed because os.path.join does not accept None arguments
4083 if self.op.file_storage_dir is None:
4084 string_file_storage_dir = ""
4086 string_file_storage_dir = self.op.file_storage_dir
4088 # build the full file storage dir path
4089 file_storage_dir = os.path.normpath(os.path.join(
4090 self.cfg.GetFileStorageDir(),
4091 string_file_storage_dir, instance))
4094 disks = _GenerateDiskTemplate(self,
4095 self.op.disk_template,
4096 instance, pnode_name,
4100 self.op.file_driver,
4103 iobj = objects.Instance(name=instance, os=self.op.os_type,
4104 primary_node=pnode_name,
4105 nics=self.nics, disks=disks,
4106 disk_template=self.op.disk_template,
4107 status=self.instance_status,
4108 network_port=network_port,
4109 beparams=self.op.beparams,
4110 hvparams=self.op.hvparams,
4111 hypervisor=self.op.hypervisor,
4114 feedback_fn("* creating instance disks...")
4115 if not _CreateDisks(self, iobj):
4116 _RemoveDisks(self, iobj)
4117 self.cfg.ReleaseDRBDMinors(instance)
4118 raise errors.OpExecError("Device creation failed, reverting...")
4120 feedback_fn("adding instance %s to cluster config" % instance)
4122 self.cfg.AddInstance(iobj)
4123 # Declare that we don't want to remove the instance lock anymore, as we've
4124 # added the instance to the config
4125 del self.remove_locks[locking.LEVEL_INSTANCE]
4126 # Remove the temp. assignements for the instance's drbds
4127 self.cfg.ReleaseDRBDMinors(instance)
4128 # Unlock all the nodes
4129 if self.op.mode == constants.INSTANCE_IMPORT:
4130 nodes_keep = [self.op.src_node]
4131 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4132 if node != self.op.src_node]
4133 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4134 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4136 self.context.glm.release(locking.LEVEL_NODE)
4137 del self.acquired_locks[locking.LEVEL_NODE]
4139 if self.op.wait_for_sync:
4140 disk_abort = not _WaitForSync(self, iobj)
4141 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4142 # make sure the disks are not degraded (still sync-ing is ok)
4144 feedback_fn("* checking mirrors status")
4145 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4150 _RemoveDisks(self, iobj)
4151 self.cfg.RemoveInstance(iobj.name)
4152 # Make sure the instance lock gets removed
4153 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4154 raise errors.OpExecError("There are some degraded disks for"
4157 feedback_fn("creating os for instance %s on node %s" %
4158 (instance, pnode_name))
4160 if iobj.disk_template != constants.DT_DISKLESS:
4161 if self.op.mode == constants.INSTANCE_CREATE:
4162 feedback_fn("* running the instance OS create scripts...")
4163 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4166 raise errors.OpExecError("Could not add os for instance %s"
4168 (instance, pnode_name))
4170 elif self.op.mode == constants.INSTANCE_IMPORT:
4171 feedback_fn("* running the instance OS import scripts...")
4172 src_node = self.op.src_node
4173 src_images = self.src_images
4174 cluster_name = self.cfg.GetClusterName()
4175 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4176 src_node, src_images,
4178 import_result.Raise()
4179 for idx, result in enumerate(import_result.data):
4181 self.LogWarning("Could not import the image %s for instance"
4182 " %s, disk %d, on node %s" %
4183 (src_images[idx], instance, idx, pnode_name))
4185 # also checked in the prereq part
4186 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4190 logging.info("Starting instance %s on node %s", instance, pnode_name)
4191 feedback_fn("* starting instance...")
4192 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4195 raise errors.OpExecError("Could not start instance")
4198 class LUConnectConsole(NoHooksLU):
4199 """Connect to an instance's console.
4201 This is somewhat special in that it returns the command line that
4202 you need to run on the master node in order to connect to the
4206 _OP_REQP = ["instance_name"]
4209 def ExpandNames(self):
4210 self._ExpandAndLockInstance()
4212 def CheckPrereq(self):
4213 """Check prerequisites.
4215 This checks that the instance is in the cluster.
4218 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4219 assert self.instance is not None, \
4220 "Cannot retrieve locked instance %s" % self.op.instance_name
4221 _CheckNodeOnline(self, self.op.primary_node)
4223 def Exec(self, feedback_fn):
4224 """Connect to the console of an instance
4227 instance = self.instance
4228 node = instance.primary_node
4230 node_insts = self.rpc.call_instance_list([node],
4231 [instance.hypervisor])[node]
4234 if instance.name not in node_insts.data:
4235 raise errors.OpExecError("Instance %s is not running." % instance.name)
4237 logging.debug("Connecting to console of %s on %s", instance.name, node)
4239 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4240 console_cmd = hyper.GetShellCommandForConsole(instance)
4243 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4246 class LUReplaceDisks(LogicalUnit):
4247 """Replace the disks of an instance.
4250 HPATH = "mirrors-replace"
4251 HTYPE = constants.HTYPE_INSTANCE
4252 _OP_REQP = ["instance_name", "mode", "disks"]
4255 def ExpandNames(self):
4256 self._ExpandAndLockInstance()
4258 if not hasattr(self.op, "remote_node"):
4259 self.op.remote_node = None
4261 ia_name = getattr(self.op, "iallocator", None)
4262 if ia_name is not None:
4263 if self.op.remote_node is not None:
4264 raise errors.OpPrereqError("Give either the iallocator or the new"
4265 " secondary, not both")
4266 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4267 elif self.op.remote_node is not None:
4268 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4269 if remote_node is None:
4270 raise errors.OpPrereqError("Node '%s' not known" %
4271 self.op.remote_node)
4272 self.op.remote_node = remote_node
4273 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4274 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4276 self.needed_locks[locking.LEVEL_NODE] = []
4277 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4279 def DeclareLocks(self, level):
4280 # If we're not already locking all nodes in the set we have to declare the
4281 # instance's primary/secondary nodes.
4282 if (level == locking.LEVEL_NODE and
4283 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4284 self._LockInstancesNodes()
4286 def _RunAllocator(self):
4287 """Compute a new secondary node using an IAllocator.
4290 ial = IAllocator(self,
4291 mode=constants.IALLOCATOR_MODE_RELOC,
4292 name=self.op.instance_name,
4293 relocate_from=[self.sec_node])
4295 ial.Run(self.op.iallocator)
4298 raise errors.OpPrereqError("Can't compute nodes using"
4299 " iallocator '%s': %s" % (self.op.iallocator,
4301 if len(ial.nodes) != ial.required_nodes:
4302 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4303 " of nodes (%s), required %s" %
4304 (len(ial.nodes), ial.required_nodes))
4305 self.op.remote_node = ial.nodes[0]
4306 self.LogInfo("Selected new secondary for the instance: %s",
4307 self.op.remote_node)
4309 def BuildHooksEnv(self):
4312 This runs on the master, the primary and all the secondaries.
4316 "MODE": self.op.mode,
4317 "NEW_SECONDARY": self.op.remote_node,
4318 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4320 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4322 self.cfg.GetMasterNode(),
4323 self.instance.primary_node,
4325 if self.op.remote_node is not None:
4326 nl.append(self.op.remote_node)
4329 def CheckPrereq(self):
4330 """Check prerequisites.
4332 This checks that the instance is in the cluster.
4335 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4336 assert instance is not None, \
4337 "Cannot retrieve locked instance %s" % self.op.instance_name
4338 self.instance = instance
4340 if instance.disk_template not in constants.DTS_NET_MIRROR:
4341 raise errors.OpPrereqError("Instance's disk layout is not"
4342 " network mirrored.")
4344 if len(instance.secondary_nodes) != 1:
4345 raise errors.OpPrereqError("The instance has a strange layout,"
4346 " expected one secondary but found %d" %
4347 len(instance.secondary_nodes))
4349 self.sec_node = instance.secondary_nodes[0]
4351 ia_name = getattr(self.op, "iallocator", None)
4352 if ia_name is not None:
4353 self._RunAllocator()
4355 remote_node = self.op.remote_node
4356 if remote_node is not None:
4357 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4358 assert self.remote_node_info is not None, \
4359 "Cannot retrieve locked node %s" % remote_node
4361 self.remote_node_info = None
4362 if remote_node == instance.primary_node:
4363 raise errors.OpPrereqError("The specified node is the primary node of"
4365 elif remote_node == self.sec_node:
4366 if self.op.mode == constants.REPLACE_DISK_SEC:
4367 # this is for DRBD8, where we can't execute the same mode of
4368 # replacement as for drbd7 (no different port allocated)
4369 raise errors.OpPrereqError("Same secondary given, cannot execute"
4371 if instance.disk_template == constants.DT_DRBD8:
4372 if (self.op.mode == constants.REPLACE_DISK_ALL and
4373 remote_node is not None):
4374 # switch to replace secondary mode
4375 self.op.mode = constants.REPLACE_DISK_SEC
4377 if self.op.mode == constants.REPLACE_DISK_ALL:
4378 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4379 " secondary disk replacement, not"
4381 elif self.op.mode == constants.REPLACE_DISK_PRI:
4382 if remote_node is not None:
4383 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4384 " the secondary while doing a primary"
4385 " node disk replacement")
4386 self.tgt_node = instance.primary_node
4387 self.oth_node = instance.secondary_nodes[0]
4388 _CheckNodeOnline(self, self.tgt_node)
4389 _CheckNodeOnline(self, self.oth_node)
4390 elif self.op.mode == constants.REPLACE_DISK_SEC:
4391 self.new_node = remote_node # this can be None, in which case
4392 # we don't change the secondary
4393 self.tgt_node = instance.secondary_nodes[0]
4394 self.oth_node = instance.primary_node
4395 _CheckNodeOnline(self, self.oth_node)
4396 if self.new_node is not None:
4397 _CheckNodeOnline(self, self.new_node)
4399 _CheckNodeOnline(self, self.tgt_node)
4401 raise errors.ProgrammerError("Unhandled disk replace mode")
4403 if not self.op.disks:
4404 self.op.disks = range(len(instance.disks))
4406 for disk_idx in self.op.disks:
4407 instance.FindDisk(disk_idx)
4409 def _ExecD8DiskOnly(self, feedback_fn):
4410 """Replace a disk on the primary or secondary for dbrd8.
4412 The algorithm for replace is quite complicated:
4414 1. for each disk to be replaced:
4416 1. create new LVs on the target node with unique names
4417 1. detach old LVs from the drbd device
4418 1. rename old LVs to name_replaced.<time_t>
4419 1. rename new LVs to old LVs
4420 1. attach the new LVs (with the old names now) to the drbd device
4422 1. wait for sync across all devices
4424 1. for each modified disk:
4426 1. remove old LVs (which have the name name_replaces.<time_t>)
4428 Failures are not very well handled.
4432 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4433 instance = self.instance
4435 vgname = self.cfg.GetVGName()
4438 tgt_node = self.tgt_node
4439 oth_node = self.oth_node
4441 # Step: check device activation
4442 self.proc.LogStep(1, steps_total, "check device existence")
4443 info("checking volume groups")
4444 my_vg = cfg.GetVGName()
4445 results = self.rpc.call_vg_list([oth_node, tgt_node])
4447 raise errors.OpExecError("Can't list volume groups on the nodes")
4448 for node in oth_node, tgt_node:
4450 if res.failed or not res.data or my_vg not in res.data:
4451 raise errors.OpExecError("Volume group '%s' not found on %s" %
4453 for idx, dev in enumerate(instance.disks):
4454 if idx not in self.op.disks:
4456 for node in tgt_node, oth_node:
4457 info("checking disk/%d on %s" % (idx, node))
4458 cfg.SetDiskID(dev, node)
4459 if not self.rpc.call_blockdev_find(node, dev):
4460 raise errors.OpExecError("Can't find disk/%d on node %s" %
4463 # Step: check other node consistency
4464 self.proc.LogStep(2, steps_total, "check peer consistency")
4465 for idx, dev in enumerate(instance.disks):
4466 if idx not in self.op.disks:
4468 info("checking disk/%d consistency on %s" % (idx, oth_node))
4469 if not _CheckDiskConsistency(self, dev, oth_node,
4470 oth_node==instance.primary_node):
4471 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4472 " to replace disks on this node (%s)" %
4473 (oth_node, tgt_node))
4475 # Step: create new storage
4476 self.proc.LogStep(3, steps_total, "allocate new storage")
4477 for idx, dev in enumerate(instance.disks):
4478 if idx not in self.op.disks:
4481 cfg.SetDiskID(dev, tgt_node)
4482 lv_names = [".disk%d_%s" % (idx, suf)
4483 for suf in ["data", "meta"]]
4484 names = _GenerateUniqueNames(self, lv_names)
4485 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4486 logical_id=(vgname, names[0]))
4487 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4488 logical_id=(vgname, names[1]))
4489 new_lvs = [lv_data, lv_meta]
4490 old_lvs = dev.children
4491 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4492 info("creating new local storage on %s for %s" %
4493 (tgt_node, dev.iv_name))
4494 # since we *always* want to create this LV, we use the
4495 # _Create...OnPrimary (which forces the creation), even if we
4496 # are talking about the secondary node
4497 for new_lv in new_lvs:
4498 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4499 _GetInstanceInfoText(instance)):
4500 raise errors.OpExecError("Failed to create new LV named '%s' on"
4502 (new_lv.logical_id[1], tgt_node))
4504 # Step: for each lv, detach+rename*2+attach
4505 self.proc.LogStep(4, steps_total, "change drbd configuration")
4506 for dev, old_lvs, new_lvs in iv_names.itervalues():
4507 info("detaching %s drbd from local storage" % dev.iv_name)
4508 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4511 raise errors.OpExecError("Can't detach drbd from local storage on node"
4512 " %s for device %s" % (tgt_node, dev.iv_name))
4514 #cfg.Update(instance)
4516 # ok, we created the new LVs, so now we know we have the needed
4517 # storage; as such, we proceed on the target node to rename
4518 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4519 # using the assumption that logical_id == physical_id (which in
4520 # turn is the unique_id on that node)
4522 # FIXME(iustin): use a better name for the replaced LVs
4523 temp_suffix = int(time.time())
4524 ren_fn = lambda d, suff: (d.physical_id[0],
4525 d.physical_id[1] + "_replaced-%s" % suff)
4526 # build the rename list based on what LVs exist on the node
4528 for to_ren in old_lvs:
4529 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4530 if not find_res.failed and find_res.data is not None: # device exists
4531 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4533 info("renaming the old LVs on the target node")
4534 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4537 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4538 # now we rename the new LVs to the old LVs
4539 info("renaming the new LVs on the target node")
4540 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4541 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4544 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4546 for old, new in zip(old_lvs, new_lvs):
4547 new.logical_id = old.logical_id
4548 cfg.SetDiskID(new, tgt_node)
4550 for disk in old_lvs:
4551 disk.logical_id = ren_fn(disk, temp_suffix)
4552 cfg.SetDiskID(disk, tgt_node)
4554 # now that the new lvs have the old name, we can add them to the device
4555 info("adding new mirror component on %s" % tgt_node)
4556 result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4557 if result.failed or not result.data:
4558 for new_lv in new_lvs:
4559 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4560 if result.failed or not result.data:
4561 warning("Can't rollback device %s", hint="manually cleanup unused"
4563 raise errors.OpExecError("Can't add local storage to drbd")
4565 dev.children = new_lvs
4566 cfg.Update(instance)
4568 # Step: wait for sync
4570 # this can fail as the old devices are degraded and _WaitForSync
4571 # does a combined result over all disks, so we don't check its
4573 self.proc.LogStep(5, steps_total, "sync devices")
4574 _WaitForSync(self, instance, unlock=True)
4576 # so check manually all the devices
4577 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4578 cfg.SetDiskID(dev, instance.primary_node)
4579 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4580 if result.failed or result.data[5]:
4581 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4583 # Step: remove old storage
4584 self.proc.LogStep(6, steps_total, "removing old storage")
4585 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4586 info("remove logical volumes for %s" % name)
4588 cfg.SetDiskID(lv, tgt_node)
4589 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4590 if result.failed or not result.data:
4591 warning("Can't remove old LV", hint="manually remove unused LVs")
4594 def _ExecD8Secondary(self, feedback_fn):
4595 """Replace the secondary node for drbd8.
4597 The algorithm for replace is quite complicated:
4598 - for all disks of the instance:
4599 - create new LVs on the new node with same names
4600 - shutdown the drbd device on the old secondary
4601 - disconnect the drbd network on the primary
4602 - create the drbd device on the new secondary
4603 - network attach the drbd on the primary, using an artifice:
4604 the drbd code for Attach() will connect to the network if it
4605 finds a device which is connected to the good local disks but
4607 - wait for sync across all devices
4608 - remove all disks from the old secondary
4610 Failures are not very well handled.
4614 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4615 instance = self.instance
4617 vgname = self.cfg.GetVGName()
4620 old_node = self.tgt_node
4621 new_node = self.new_node
4622 pri_node = instance.primary_node
4624 # Step: check device activation
4625 self.proc.LogStep(1, steps_total, "check device existence")
4626 info("checking volume groups")
4627 my_vg = cfg.GetVGName()
4628 results = self.rpc.call_vg_list([pri_node, new_node])
4629 for node in pri_node, new_node:
4631 if res.failed or not res.data or my_vg not in res.data:
4632 raise errors.OpExecError("Volume group '%s' not found on %s" %
4634 for idx, dev in enumerate(instance.disks):
4635 if idx not in self.op.disks:
4637 info("checking disk/%d on %s" % (idx, pri_node))
4638 cfg.SetDiskID(dev, pri_node)
4639 result = self.rpc.call_blockdev_find(pri_node, dev)
4642 raise errors.OpExecError("Can't find disk/%d on node %s" %
4645 # Step: check other node consistency
4646 self.proc.LogStep(2, steps_total, "check peer consistency")
4647 for idx, dev in enumerate(instance.disks):
4648 if idx not in self.op.disks:
4650 info("checking disk/%d consistency on %s" % (idx, pri_node))
4651 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4652 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4653 " unsafe to replace the secondary" %
4656 # Step: create new storage
4657 self.proc.LogStep(3, steps_total, "allocate new storage")
4658 for idx, dev in enumerate(instance.disks):
4660 info("adding new local storage on %s for disk/%d" %
4662 # since we *always* want to create this LV, we use the
4663 # _Create...OnPrimary (which forces the creation), even if we
4664 # are talking about the secondary node
4665 for new_lv in dev.children:
4666 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4667 _GetInstanceInfoText(instance)):
4668 raise errors.OpExecError("Failed to create new LV named '%s' on"
4670 (new_lv.logical_id[1], new_node))
4672 # Step 4: dbrd minors and drbd setups changes
4673 # after this, we must manually remove the drbd minors on both the
4674 # error and the success paths
4675 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4677 logging.debug("Allocated minors %s" % (minors,))
4678 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4679 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4681 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4682 # create new devices on new_node
4683 if pri_node == dev.logical_id[0]:
4684 new_logical_id = (pri_node, new_node,
4685 dev.logical_id[2], dev.logical_id[3], new_minor,
4688 new_logical_id = (new_node, pri_node,
4689 dev.logical_id[2], new_minor, dev.logical_id[4],
4691 iv_names[idx] = (dev, dev.children, new_logical_id)
4692 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4694 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4695 logical_id=new_logical_id,
4696 children=dev.children)
4697 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4699 _GetInstanceInfoText(instance)):
4700 self.cfg.ReleaseDRBDMinors(instance.name)
4701 raise errors.OpExecError("Failed to create new DRBD on"
4702 " node '%s'" % new_node)
4704 for idx, dev in enumerate(instance.disks):
4705 # we have new devices, shutdown the drbd on the old secondary
4706 info("shutting down drbd for disk/%d on old node" % idx)
4707 cfg.SetDiskID(dev, old_node)
4708 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4709 if result.failed or not result.data:
4710 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4711 hint="Please cleanup this device manually as soon as possible")
4713 info("detaching primary drbds from the network (=> standalone)")
4715 for idx, dev in enumerate(instance.disks):
4716 cfg.SetDiskID(dev, pri_node)
4717 # set the network part of the physical (unique in bdev terms) id
4718 # to None, meaning detach from network
4719 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4720 # and 'find' the device, which will 'fix' it to match the
4722 result = self.rpc.call_blockdev_find(pri_node, dev)
4723 if not result.failed and result.data:
4726 warning("Failed to detach drbd disk/%d from network, unusual case" %
4730 # no detaches succeeded (very unlikely)
4731 self.cfg.ReleaseDRBDMinors(instance.name)
4732 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4734 # if we managed to detach at least one, we update all the disks of
4735 # the instance to point to the new secondary
4736 info("updating instance configuration")
4737 for dev, _, new_logical_id in iv_names.itervalues():
4738 dev.logical_id = new_logical_id
4739 cfg.SetDiskID(dev, pri_node)
4740 cfg.Update(instance)
4741 # we can remove now the temp minors as now the new values are
4742 # written to the config file (and therefore stable)
4743 self.cfg.ReleaseDRBDMinors(instance.name)
4745 # and now perform the drbd attach
4746 info("attaching primary drbds to new secondary (standalone => connected)")
4748 for idx, dev in enumerate(instance.disks):
4749 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4750 # since the attach is smart, it's enough to 'find' the device,
4751 # it will automatically activate the network, if the physical_id
4753 cfg.SetDiskID(dev, pri_node)
4754 logging.debug("Disk to attach: %s", dev)
4755 result = self.rpc.call_blockdev_find(pri_node, dev)
4756 if result.failed or not result.data:
4757 warning("can't attach drbd disk/%d to new secondary!" % idx,
4758 "please do a gnt-instance info to see the status of disks")
4760 # this can fail as the old devices are degraded and _WaitForSync
4761 # does a combined result over all disks, so we don't check its
4763 self.proc.LogStep(5, steps_total, "sync devices")
4764 _WaitForSync(self, instance, unlock=True)
4766 # so check manually all the devices
4767 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4768 cfg.SetDiskID(dev, pri_node)
4769 result = self.rpc.call_blockdev_find(pri_node, dev)
4772 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4774 self.proc.LogStep(6, steps_total, "removing old storage")
4775 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4776 info("remove logical volumes for disk/%d" % idx)
4778 cfg.SetDiskID(lv, old_node)
4779 result = self.rpc.call_blockdev_remove(old_node, lv)
4780 if result.failed or not result.data:
4781 warning("Can't remove LV on old secondary",
4782 hint="Cleanup stale volumes by hand")
4784 def Exec(self, feedback_fn):
4785 """Execute disk replacement.
4787 This dispatches the disk replacement to the appropriate handler.
4790 instance = self.instance
4792 # Activate the instance disks if we're replacing them on a down instance
4793 if instance.status == "down":
4794 _StartInstanceDisks(self, instance, True)
4796 if instance.disk_template == constants.DT_DRBD8:
4797 if self.op.remote_node is None:
4798 fn = self._ExecD8DiskOnly
4800 fn = self._ExecD8Secondary
4802 raise errors.ProgrammerError("Unhandled disk replacement case")
4804 ret = fn(feedback_fn)
4806 # Deactivate the instance disks if we're replacing them on a down instance
4807 if instance.status == "down":
4808 _SafeShutdownInstanceDisks(self, instance)
4813 class LUGrowDisk(LogicalUnit):
4814 """Grow a disk of an instance.
4818 HTYPE = constants.HTYPE_INSTANCE
4819 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4822 def ExpandNames(self):
4823 self._ExpandAndLockInstance()
4824 self.needed_locks[locking.LEVEL_NODE] = []
4825 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4827 def DeclareLocks(self, level):
4828 if level == locking.LEVEL_NODE:
4829 self._LockInstancesNodes()
4831 def BuildHooksEnv(self):
4834 This runs on the master, the primary and all the secondaries.
4838 "DISK": self.op.disk,
4839 "AMOUNT": self.op.amount,
4841 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4843 self.cfg.GetMasterNode(),
4844 self.instance.primary_node,
4848 def CheckPrereq(self):
4849 """Check prerequisites.
4851 This checks that the instance is in the cluster.
4854 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4855 assert instance is not None, \
4856 "Cannot retrieve locked instance %s" % self.op.instance_name
4857 _CheckNodeOnline(self, instance.primary_node)
4858 for node in instance.secondary_nodes:
4859 _CheckNodeOnline(self, node)
4862 self.instance = instance
4864 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4865 raise errors.OpPrereqError("Instance's disk layout does not support"
4868 self.disk = instance.FindDisk(self.op.disk)
4870 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4871 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4872 instance.hypervisor)
4873 for node in nodenames:
4874 info = nodeinfo[node]
4875 if info.failed or not info.data:
4876 raise errors.OpPrereqError("Cannot get current information"
4877 " from node '%s'" % node)
4878 vg_free = info.data.get('vg_free', None)
4879 if not isinstance(vg_free, int):
4880 raise errors.OpPrereqError("Can't compute free disk space on"
4882 if self.op.amount > vg_free:
4883 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4884 " %d MiB available, %d MiB required" %
4885 (node, vg_free, self.op.amount))
4887 def Exec(self, feedback_fn):
4888 """Execute disk grow.
4891 instance = self.instance
4893 for node in (instance.secondary_nodes + (instance.primary_node,)):
4894 self.cfg.SetDiskID(disk, node)
4895 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4897 if (not result.data or not isinstance(result.data, (list, tuple)) or
4898 len(result.data) != 2):
4899 raise errors.OpExecError("Grow request failed to node %s" % node)
4900 elif not result.data[0]:
4901 raise errors.OpExecError("Grow request failed to node %s: %s" %
4902 (node, result.data[1]))
4903 disk.RecordGrow(self.op.amount)
4904 self.cfg.Update(instance)
4905 if self.op.wait_for_sync:
4906 disk_abort = not _WaitForSync(self, instance)
4908 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4909 " status.\nPlease check the instance.")
4912 class LUQueryInstanceData(NoHooksLU):
4913 """Query runtime instance data.
4916 _OP_REQP = ["instances", "static"]
4919 def ExpandNames(self):
4920 self.needed_locks = {}
4921 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4923 if not isinstance(self.op.instances, list):
4924 raise errors.OpPrereqError("Invalid argument type 'instances'")
4926 if self.op.instances:
4927 self.wanted_names = []
4928 for name in self.op.instances:
4929 full_name = self.cfg.ExpandInstanceName(name)
4930 if full_name is None:
4931 raise errors.OpPrereqError("Instance '%s' not known" %
4932 self.op.instance_name)
4933 self.wanted_names.append(full_name)
4934 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4936 self.wanted_names = None
4937 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4939 self.needed_locks[locking.LEVEL_NODE] = []
4940 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4942 def DeclareLocks(self, level):
4943 if level == locking.LEVEL_NODE:
4944 self._LockInstancesNodes()
4946 def CheckPrereq(self):
4947 """Check prerequisites.
4949 This only checks the optional instance list against the existing names.
4952 if self.wanted_names is None:
4953 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4955 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4956 in self.wanted_names]
4959 def _ComputeDiskStatus(self, instance, snode, dev):
4960 """Compute block device status.
4963 static = self.op.static
4965 self.cfg.SetDiskID(dev, instance.primary_node)
4966 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4968 dev_pstatus = dev_pstatus.data
4972 if dev.dev_type in constants.LDS_DRBD:
4973 # we change the snode then (otherwise we use the one passed in)
4974 if dev.logical_id[0] == instance.primary_node:
4975 snode = dev.logical_id[1]
4977 snode = dev.logical_id[0]
4979 if snode and not static:
4980 self.cfg.SetDiskID(dev, snode)
4981 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4983 dev_sstatus = dev_sstatus.data
4988 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4989 for child in dev.children]
4994 "iv_name": dev.iv_name,
4995 "dev_type": dev.dev_type,
4996 "logical_id": dev.logical_id,
4997 "physical_id": dev.physical_id,
4998 "pstatus": dev_pstatus,
4999 "sstatus": dev_sstatus,
5000 "children": dev_children,
5006 def Exec(self, feedback_fn):
5007 """Gather and return data"""
5010 cluster = self.cfg.GetClusterInfo()
5012 for instance in self.wanted_instances:
5013 if not self.op.static:
5014 remote_info = self.rpc.call_instance_info(instance.primary_node,
5016 instance.hypervisor)
5018 remote_info = remote_info.data
5019 if remote_info and "state" in remote_info:
5022 remote_state = "down"
5025 if instance.status == "down":
5026 config_state = "down"
5030 disks = [self._ComputeDiskStatus(instance, None, device)
5031 for device in instance.disks]
5034 "name": instance.name,
5035 "config_state": config_state,
5036 "run_state": remote_state,
5037 "pnode": instance.primary_node,
5038 "snodes": instance.secondary_nodes,
5040 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5042 "hypervisor": instance.hypervisor,
5043 "network_port": instance.network_port,
5044 "hv_instance": instance.hvparams,
5045 "hv_actual": cluster.FillHV(instance),
5046 "be_instance": instance.beparams,
5047 "be_actual": cluster.FillBE(instance),
5050 result[instance.name] = idict
5055 class LUSetInstanceParams(LogicalUnit):
5056 """Modifies an instances's parameters.
5059 HPATH = "instance-modify"
5060 HTYPE = constants.HTYPE_INSTANCE
5061 _OP_REQP = ["instance_name"]
5064 def CheckArguments(self):
5065 if not hasattr(self.op, 'nics'):
5067 if not hasattr(self.op, 'disks'):
5069 if not hasattr(self.op, 'beparams'):
5070 self.op.beparams = {}
5071 if not hasattr(self.op, 'hvparams'):
5072 self.op.hvparams = {}
5073 self.op.force = getattr(self.op, "force", False)
5074 if not (self.op.nics or self.op.disks or
5075 self.op.hvparams or self.op.beparams):
5076 raise errors.OpPrereqError("No changes submitted")
5078 utils.CheckBEParams(self.op.beparams)
5082 for disk_op, disk_dict in self.op.disks:
5083 if disk_op == constants.DDM_REMOVE:
5086 elif disk_op == constants.DDM_ADD:
5089 if not isinstance(disk_op, int):
5090 raise errors.OpPrereqError("Invalid disk index")
5091 if disk_op == constants.DDM_ADD:
5092 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5093 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5094 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5095 size = disk_dict.get('size', None)
5097 raise errors.OpPrereqError("Required disk parameter size missing")
5100 except ValueError, err:
5101 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5103 disk_dict['size'] = size
5105 # modification of disk
5106 if 'size' in disk_dict:
5107 raise errors.OpPrereqError("Disk size change not possible, use"
5110 if disk_addremove > 1:
5111 raise errors.OpPrereqError("Only one disk add or remove operation"
5112 " supported at a time")
5116 for nic_op, nic_dict in self.op.nics:
5117 if nic_op == constants.DDM_REMOVE:
5120 elif nic_op == constants.DDM_ADD:
5123 if not isinstance(nic_op, int):
5124 raise errors.OpPrereqError("Invalid nic index")
5126 # nic_dict should be a dict
5127 nic_ip = nic_dict.get('ip', None)
5128 if nic_ip is not None:
5129 if nic_ip.lower() == "none":
5130 nic_dict['ip'] = None
5132 if not utils.IsValidIP(nic_ip):
5133 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5134 # we can only check None bridges and assign the default one
5135 nic_bridge = nic_dict.get('bridge', None)
5136 if nic_bridge is None:
5137 nic_dict['bridge'] = self.cfg.GetDefBridge()
5138 # but we can validate MACs
5139 nic_mac = nic_dict.get('mac', None)
5140 if nic_mac is not None:
5141 if self.cfg.IsMacInUse(nic_mac):
5142 raise errors.OpPrereqError("MAC address %s already in use"
5143 " in cluster" % nic_mac)
5144 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5145 if not utils.IsValidMac(nic_mac):
5146 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5147 if nic_addremove > 1:
5148 raise errors.OpPrereqError("Only one NIC add or remove operation"
5149 " supported at a time")
5151 def ExpandNames(self):
5152 self._ExpandAndLockInstance()
5153 self.needed_locks[locking.LEVEL_NODE] = []
5154 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5156 def DeclareLocks(self, level):
5157 if level == locking.LEVEL_NODE:
5158 self._LockInstancesNodes()
5160 def BuildHooksEnv(self):
5163 This runs on the master, primary and secondaries.
5167 if constants.BE_MEMORY in self.be_new:
5168 args['memory'] = self.be_new[constants.BE_MEMORY]
5169 if constants.BE_VCPUS in self.be_new:
5170 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5171 # FIXME: readd disk/nic changes
5172 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5173 nl = [self.cfg.GetMasterNode(),
5174 self.instance.primary_node] + list(self.instance.secondary_nodes)
5177 def CheckPrereq(self):
5178 """Check prerequisites.
5180 This only checks the instance list against the existing names.
5183 force = self.force = self.op.force
5185 # checking the new params on the primary/secondary nodes
5187 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5188 assert self.instance is not None, \
5189 "Cannot retrieve locked instance %s" % self.op.instance_name
5190 pnode = self.instance.primary_node
5192 nodelist.extend(instance.secondary_nodes)
5194 # hvparams processing
5195 if self.op.hvparams:
5196 i_hvdict = copy.deepcopy(instance.hvparams)
5197 for key, val in self.op.hvparams.iteritems():
5198 if val == constants.VALUE_DEFAULT:
5203 elif val == constants.VALUE_NONE:
5204 i_hvdict[key] = None
5207 cluster = self.cfg.GetClusterInfo()
5208 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5211 hypervisor.GetHypervisor(
5212 instance.hypervisor).CheckParameterSyntax(hv_new)
5213 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5214 self.hv_new = hv_new # the new actual values
5215 self.hv_inst = i_hvdict # the new dict (without defaults)
5217 self.hv_new = self.hv_inst = {}
5219 # beparams processing
5220 if self.op.beparams:
5221 i_bedict = copy.deepcopy(instance.beparams)
5222 for key, val in self.op.beparams.iteritems():
5223 if val == constants.VALUE_DEFAULT:
5230 cluster = self.cfg.GetClusterInfo()
5231 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5233 self.be_new = be_new # the new actual values
5234 self.be_inst = i_bedict # the new dict (without defaults)
5236 self.be_new = self.be_inst = {}
5240 if constants.BE_MEMORY in self.op.beparams and not self.force:
5241 mem_check_list = [pnode]
5242 if be_new[constants.BE_AUTO_BALANCE]:
5243 # either we changed auto_balance to yes or it was from before
5244 mem_check_list.extend(instance.secondary_nodes)
5245 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5246 instance.hypervisor)
5247 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5248 instance.hypervisor)
5249 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5250 # Assume the primary node is unreachable and go ahead
5251 self.warn.append("Can't get info from primary node %s" % pnode)
5253 if not instance_info.failed and instance_info.data:
5254 current_mem = instance_info.data['memory']
5256 # Assume instance not running
5257 # (there is a slight race condition here, but it's not very probable,
5258 # and we have no other way to check)
5260 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5261 nodeinfo[pnode].data['memory_free'])
5263 raise errors.OpPrereqError("This change will prevent the instance"
5264 " from starting, due to %d MB of memory"
5265 " missing on its primary node" % miss_mem)
5267 if be_new[constants.BE_AUTO_BALANCE]:
5268 for node, nres in instance.secondary_nodes.iteritems():
5269 if nres.failed or not isinstance(nres.data, dict):
5270 self.warn.append("Can't get info from secondary node %s" % node)
5271 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5272 self.warn.append("Not enough memory to failover instance to"
5273 " secondary node %s" % node)
5276 for nic_op, nic_dict in self.op.nics:
5277 if nic_op == constants.DDM_REMOVE:
5278 if not instance.nics:
5279 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5281 if nic_op != constants.DDM_ADD:
5283 if nic_op < 0 or nic_op >= len(instance.nics):
5284 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5286 (nic_op, len(instance.nics)))
5287 nic_bridge = nic_dict.get('bridge', None)
5288 if nic_bridge is not None:
5289 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5290 msg = ("Bridge '%s' doesn't exist on one of"
5291 " the instance nodes" % nic_bridge)
5293 self.warn.append(msg)
5295 raise errors.OpPrereqError(msg)
5298 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5299 raise errors.OpPrereqError("Disk operations not supported for"
5300 " diskless instances")
5301 for disk_op, disk_dict in self.op.disks:
5302 if disk_op == constants.DDM_REMOVE:
5303 if len(instance.disks) == 1:
5304 raise errors.OpPrereqError("Cannot remove the last disk of"
5306 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5307 ins_l = ins_l[pnode]
5308 if not type(ins_l) is list:
5309 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5310 if instance.name in ins_l:
5311 raise errors.OpPrereqError("Instance is running, can't remove"
5314 if (disk_op == constants.DDM_ADD and
5315 len(instance.nics) >= constants.MAX_DISKS):
5316 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5317 " add more" % constants.MAX_DISKS)
5318 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5320 if disk_op < 0 or disk_op >= len(instance.disks):
5321 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5323 (disk_op, len(instance.disks)))
5327 def Exec(self, feedback_fn):
5328 """Modifies an instance.
5330 All parameters take effect only at the next restart of the instance.
5333 # Process here the warnings from CheckPrereq, as we don't have a
5334 # feedback_fn there.
5335 for warn in self.warn:
5336 feedback_fn("WARNING: %s" % warn)
5339 instance = self.instance
5341 for disk_op, disk_dict in self.op.disks:
5342 if disk_op == constants.DDM_REMOVE:
5343 # remove the last disk
5344 device = instance.disks.pop()
5345 device_idx = len(instance.disks)
5346 for node, disk in device.ComputeNodeTree(instance.primary_node):
5347 self.cfg.SetDiskID(disk, node)
5348 result = self.rpc.call_blockdev_remove(node, disk)
5349 if result.failed or not result.data:
5350 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5351 " continuing anyway", device_idx, node)
5352 result.append(("disk/%d" % device_idx, "remove"))
5353 elif disk_op == constants.DDM_ADD:
5355 if instance.disk_template == constants.DT_FILE:
5356 file_driver, file_path = instance.disks[0].logical_id
5357 file_path = os.path.dirname(file_path)
5359 file_driver = file_path = None
5360 disk_idx_base = len(instance.disks)
5361 new_disk = _GenerateDiskTemplate(self,
5362 instance.disk_template,
5363 instance, instance.primary_node,
5364 instance.secondary_nodes,
5369 new_disk.mode = disk_dict['mode']
5370 instance.disks.append(new_disk)
5371 info = _GetInstanceInfoText(instance)
5373 logging.info("Creating volume %s for instance %s",
5374 new_disk.iv_name, instance.name)
5375 # Note: this needs to be kept in sync with _CreateDisks
5377 for secondary_node in instance.secondary_nodes:
5378 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5379 new_disk, False, info):
5380 self.LogWarning("Failed to create volume %s (%s) on"
5381 " secondary node %s!",
5382 new_disk.iv_name, new_disk, secondary_node)
5384 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5385 instance, new_disk, info):
5386 self.LogWarning("Failed to create volume %s on primary!",
5388 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5389 (new_disk.size, new_disk.mode)))
5391 # change a given disk
5392 instance.disks[disk_op].mode = disk_dict['mode']
5393 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5395 for nic_op, nic_dict in self.op.nics:
5396 if nic_op == constants.DDM_REMOVE:
5397 # remove the last nic
5398 del instance.nics[-1]
5399 result.append(("nic.%d" % len(instance.nics), "remove"))
5400 elif nic_op == constants.DDM_ADD:
5402 if 'mac' not in nic_dict:
5403 mac = constants.VALUE_GENERATE
5405 mac = nic_dict['mac']
5406 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5407 mac = self.cfg.GenerateMAC()
5408 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5409 bridge=nic_dict.get('bridge', None))
5410 instance.nics.append(new_nic)
5411 result.append(("nic.%d" % (len(instance.nics) - 1),
5412 "add:mac=%s,ip=%s,bridge=%s" %
5413 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5415 # change a given nic
5416 for key in 'mac', 'ip', 'bridge':
5418 setattr(instance.nics[nic_op], key, nic_dict[key])
5419 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5422 if self.op.hvparams:
5423 instance.hvparams = self.hv_new
5424 for key, val in self.op.hvparams.iteritems():
5425 result.append(("hv/%s" % key, val))
5428 if self.op.beparams:
5429 instance.beparams = self.be_inst
5430 for key, val in self.op.beparams.iteritems():
5431 result.append(("be/%s" % key, val))
5433 self.cfg.Update(instance)
5438 class LUQueryExports(NoHooksLU):
5439 """Query the exports list
5442 _OP_REQP = ['nodes']
5445 def ExpandNames(self):
5446 self.needed_locks = {}
5447 self.share_locks[locking.LEVEL_NODE] = 1
5448 if not self.op.nodes:
5449 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5451 self.needed_locks[locking.LEVEL_NODE] = \
5452 _GetWantedNodes(self, self.op.nodes)
5454 def CheckPrereq(self):
5455 """Check prerequisites.
5458 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5460 def Exec(self, feedback_fn):
5461 """Compute the list of all the exported system images.
5464 @return: a dictionary with the structure node->(export-list)
5465 where export-list is a list of the instances exported on
5469 rpcresult = self.rpc.call_export_list(self.nodes)
5471 for node in rpcresult:
5472 if rpcresult[node].failed:
5473 result[node] = False
5475 result[node] = rpcresult[node].data
5480 class LUExportInstance(LogicalUnit):
5481 """Export an instance to an image in the cluster.
5484 HPATH = "instance-export"
5485 HTYPE = constants.HTYPE_INSTANCE
5486 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5489 def ExpandNames(self):
5490 self._ExpandAndLockInstance()
5491 # FIXME: lock only instance primary and destination node
5493 # Sad but true, for now we have do lock all nodes, as we don't know where
5494 # the previous export might be, and and in this LU we search for it and
5495 # remove it from its current node. In the future we could fix this by:
5496 # - making a tasklet to search (share-lock all), then create the new one,
5497 # then one to remove, after
5498 # - removing the removal operation altoghether
5499 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5501 def DeclareLocks(self, level):
5502 """Last minute lock declaration."""
5503 # All nodes are locked anyway, so nothing to do here.
5505 def BuildHooksEnv(self):
5508 This will run on the master, primary node and target node.
5512 "EXPORT_NODE": self.op.target_node,
5513 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5515 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5516 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5517 self.op.target_node]
5520 def CheckPrereq(self):
5521 """Check prerequisites.
5523 This checks that the instance and node names are valid.
5526 instance_name = self.op.instance_name
5527 self.instance = self.cfg.GetInstanceInfo(instance_name)
5528 assert self.instance is not None, \
5529 "Cannot retrieve locked instance %s" % self.op.instance_name
5530 _CheckNodeOnline(self, instance.primary_node)
5532 self.dst_node = self.cfg.GetNodeInfo(
5533 self.cfg.ExpandNodeName(self.op.target_node))
5535 if self.dst_node is None:
5536 # This is wrong node name, not a non-locked node
5537 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5538 _CheckNodeOnline(self, self.op.target_node)
5540 # instance disk type verification
5541 for disk in self.instance.disks:
5542 if disk.dev_type == constants.LD_FILE:
5543 raise errors.OpPrereqError("Export not supported for instances with"
5544 " file-based disks")
5546 def Exec(self, feedback_fn):
5547 """Export an instance to an image in the cluster.
5550 instance = self.instance
5551 dst_node = self.dst_node
5552 src_node = instance.primary_node
5553 if self.op.shutdown:
5554 # shutdown the instance, but not the disks
5555 result = self.rpc.call_instance_shutdown(src_node, instance)
5558 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5559 (instance.name, src_node))
5561 vgname = self.cfg.GetVGName()
5566 for disk in instance.disks:
5567 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5568 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5569 if new_dev_name.failed or not new_dev_name.data:
5570 self.LogWarning("Could not snapshot block device %s on node %s",
5571 disk.logical_id[1], src_node)
5572 snap_disks.append(False)
5574 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5575 logical_id=(vgname, new_dev_name.data),
5576 physical_id=(vgname, new_dev_name.data),
5577 iv_name=disk.iv_name)
5578 snap_disks.append(new_dev)
5581 if self.op.shutdown and instance.status == "up":
5582 result = self.rpc.call_instance_start(src_node, instance, None)
5583 if result.failed or not result.data:
5584 _ShutdownInstanceDisks(self, instance)
5585 raise errors.OpExecError("Could not start instance")
5587 # TODO: check for size
5589 cluster_name = self.cfg.GetClusterName()
5590 for idx, dev in enumerate(snap_disks):
5592 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5593 instance, cluster_name, idx)
5594 if result.failed or not result.data:
5595 self.LogWarning("Could not export block device %s from node %s to"
5596 " node %s", dev.logical_id[1], src_node,
5598 result = self.rpc.call_blockdev_remove(src_node, dev)
5599 if result.failed or not result.data:
5600 self.LogWarning("Could not remove snapshot block device %s from node"
5601 " %s", dev.logical_id[1], src_node)
5603 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5604 if result.failed or not result.data:
5605 self.LogWarning("Could not finalize export for instance %s on node %s",
5606 instance.name, dst_node.name)
5608 nodelist = self.cfg.GetNodeList()
5609 nodelist.remove(dst_node.name)
5611 # on one-node clusters nodelist will be empty after the removal
5612 # if we proceed the backup would be removed because OpQueryExports
5613 # substitutes an empty list with the full cluster node list.
5615 exportlist = self.rpc.call_export_list(nodelist)
5616 for node in exportlist:
5617 if exportlist[node].failed:
5619 if instance.name in exportlist[node].data:
5620 if not self.rpc.call_export_remove(node, instance.name):
5621 self.LogWarning("Could not remove older export for instance %s"
5622 " on node %s", instance.name, node)
5625 class LURemoveExport(NoHooksLU):
5626 """Remove exports related to the named instance.
5629 _OP_REQP = ["instance_name"]
5632 def ExpandNames(self):
5633 self.needed_locks = {}
5634 # We need all nodes to be locked in order for RemoveExport to work, but we
5635 # don't need to lock the instance itself, as nothing will happen to it (and
5636 # we can remove exports also for a removed instance)
5637 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5639 def CheckPrereq(self):
5640 """Check prerequisites.
5644 def Exec(self, feedback_fn):
5645 """Remove any export.
5648 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5649 # If the instance was not found we'll try with the name that was passed in.
5650 # This will only work if it was an FQDN, though.
5652 if not instance_name:
5654 instance_name = self.op.instance_name
5656 exportlist = self.rpc.call_export_list(self.acquired_locks[
5657 locking.LEVEL_NODE])
5659 for node in exportlist:
5660 if exportlist[node].failed:
5661 self.LogWarning("Failed to query node %s, continuing" % node)
5663 if instance_name in exportlist[node].data:
5665 result = self.rpc.call_export_remove(node, instance_name)
5666 if result.failed or not result.data:
5667 logging.error("Could not remove export for instance %s"
5668 " on node %s", instance_name, node)
5670 if fqdn_warn and not found:
5671 feedback_fn("Export not found. If trying to remove an export belonging"
5672 " to a deleted instance please use its Fully Qualified"
5676 class TagsLU(NoHooksLU):
5679 This is an abstract class which is the parent of all the other tags LUs.
5683 def ExpandNames(self):
5684 self.needed_locks = {}
5685 if self.op.kind == constants.TAG_NODE:
5686 name = self.cfg.ExpandNodeName(self.op.name)
5688 raise errors.OpPrereqError("Invalid node name (%s)" %
5691 self.needed_locks[locking.LEVEL_NODE] = name
5692 elif self.op.kind == constants.TAG_INSTANCE:
5693 name = self.cfg.ExpandInstanceName(self.op.name)
5695 raise errors.OpPrereqError("Invalid instance name (%s)" %
5698 self.needed_locks[locking.LEVEL_INSTANCE] = name
5700 def CheckPrereq(self):
5701 """Check prerequisites.
5704 if self.op.kind == constants.TAG_CLUSTER:
5705 self.target = self.cfg.GetClusterInfo()
5706 elif self.op.kind == constants.TAG_NODE:
5707 self.target = self.cfg.GetNodeInfo(self.op.name)
5708 elif self.op.kind == constants.TAG_INSTANCE:
5709 self.target = self.cfg.GetInstanceInfo(self.op.name)
5711 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5715 class LUGetTags(TagsLU):
5716 """Returns the tags of a given object.
5719 _OP_REQP = ["kind", "name"]
5722 def Exec(self, feedback_fn):
5723 """Returns the tag list.
5726 return list(self.target.GetTags())
5729 class LUSearchTags(NoHooksLU):
5730 """Searches the tags for a given pattern.
5733 _OP_REQP = ["pattern"]
5736 def ExpandNames(self):
5737 self.needed_locks = {}
5739 def CheckPrereq(self):
5740 """Check prerequisites.
5742 This checks the pattern passed for validity by compiling it.
5746 self.re = re.compile(self.op.pattern)
5747 except re.error, err:
5748 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5749 (self.op.pattern, err))
5751 def Exec(self, feedback_fn):
5752 """Returns the tag list.
5756 tgts = [("/cluster", cfg.GetClusterInfo())]
5757 ilist = cfg.GetAllInstancesInfo().values()
5758 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5759 nlist = cfg.GetAllNodesInfo().values()
5760 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5762 for path, target in tgts:
5763 for tag in target.GetTags():
5764 if self.re.search(tag):
5765 results.append((path, tag))
5769 class LUAddTags(TagsLU):
5770 """Sets a tag on a given object.
5773 _OP_REQP = ["kind", "name", "tags"]
5776 def CheckPrereq(self):
5777 """Check prerequisites.
5779 This checks the type and length of the tag name and value.
5782 TagsLU.CheckPrereq(self)
5783 for tag in self.op.tags:
5784 objects.TaggableObject.ValidateTag(tag)
5786 def Exec(self, feedback_fn):
5791 for tag in self.op.tags:
5792 self.target.AddTag(tag)
5793 except errors.TagError, err:
5794 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5796 self.cfg.Update(self.target)
5797 except errors.ConfigurationError:
5798 raise errors.OpRetryError("There has been a modification to the"
5799 " config file and the operation has been"
5800 " aborted. Please retry.")
5803 class LUDelTags(TagsLU):
5804 """Delete a list of tags from a given object.
5807 _OP_REQP = ["kind", "name", "tags"]
5810 def CheckPrereq(self):
5811 """Check prerequisites.
5813 This checks that we have the given tag.
5816 TagsLU.CheckPrereq(self)
5817 for tag in self.op.tags:
5818 objects.TaggableObject.ValidateTag(tag)
5819 del_tags = frozenset(self.op.tags)
5820 cur_tags = self.target.GetTags()
5821 if not del_tags <= cur_tags:
5822 diff_tags = del_tags - cur_tags
5823 diff_names = ["'%s'" % tag for tag in diff_tags]
5825 raise errors.OpPrereqError("Tag(s) %s not found" %
5826 (",".join(diff_names)))
5828 def Exec(self, feedback_fn):
5829 """Remove the tag from the object.
5832 for tag in self.op.tags:
5833 self.target.RemoveTag(tag)
5835 self.cfg.Update(self.target)
5836 except errors.ConfigurationError:
5837 raise errors.OpRetryError("There has been a modification to the"
5838 " config file and the operation has been"
5839 " aborted. Please retry.")
5842 class LUTestDelay(NoHooksLU):
5843 """Sleep for a specified amount of time.
5845 This LU sleeps on the master and/or nodes for a specified amount of
5849 _OP_REQP = ["duration", "on_master", "on_nodes"]
5852 def ExpandNames(self):
5853 """Expand names and set required locks.
5855 This expands the node list, if any.
5858 self.needed_locks = {}
5859 if self.op.on_nodes:
5860 # _GetWantedNodes can be used here, but is not always appropriate to use
5861 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5863 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5864 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5866 def CheckPrereq(self):
5867 """Check prerequisites.
5871 def Exec(self, feedback_fn):
5872 """Do the actual sleep.
5875 if self.op.on_master:
5876 if not utils.TestDelay(self.op.duration):
5877 raise errors.OpExecError("Error during master delay test")
5878 if self.op.on_nodes:
5879 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5881 raise errors.OpExecError("Complete failure from rpc call")
5882 for node, node_result in result.items():
5884 if not node_result.data:
5885 raise errors.OpExecError("Failure during rpc call to node %s,"
5886 " result: %s" % (node, node_result.data))
5889 class IAllocator(object):
5890 """IAllocator framework.
5892 An IAllocator instance has three sets of attributes:
5893 - cfg that is needed to query the cluster
5894 - input data (all members of the _KEYS class attribute are required)
5895 - four buffer attributes (in|out_data|text), that represent the
5896 input (to the external script) in text and data structure format,
5897 and the output from it, again in two formats
5898 - the result variables from the script (success, info, nodes) for
5903 "mem_size", "disks", "disk_template",
5904 "os", "tags", "nics", "vcpus", "hypervisor",
5910 def __init__(self, lu, mode, name, **kwargs):
5912 # init buffer variables
5913 self.in_text = self.out_text = self.in_data = self.out_data = None
5914 # init all input fields so that pylint is happy
5917 self.mem_size = self.disks = self.disk_template = None
5918 self.os = self.tags = self.nics = self.vcpus = None
5919 self.relocate_from = None
5921 self.required_nodes = None
5922 # init result fields
5923 self.success = self.info = self.nodes = None
5924 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5925 keyset = self._ALLO_KEYS
5926 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5927 keyset = self._RELO_KEYS
5929 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5930 " IAllocator" % self.mode)
5932 if key not in keyset:
5933 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5934 " IAllocator" % key)
5935 setattr(self, key, kwargs[key])
5937 if key not in kwargs:
5938 raise errors.ProgrammerError("Missing input parameter '%s' to"
5939 " IAllocator" % key)
5940 self._BuildInputData()
5942 def _ComputeClusterData(self):
5943 """Compute the generic allocator input data.
5945 This is the data that is independent of the actual operation.
5949 cluster_info = cfg.GetClusterInfo()
5953 "cluster_name": cfg.GetClusterName(),
5954 "cluster_tags": list(cluster_info.GetTags()),
5955 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5956 # we don't have job IDs
5958 iinfo = cfg.GetAllInstancesInfo().values()
5959 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5963 node_list = cfg.GetNodeList()
5965 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5966 hypervisor = self.hypervisor
5967 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5968 hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5970 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5972 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5973 cluster_info.enabled_hypervisors)
5974 for nname in node_list:
5975 ninfo = cfg.GetNodeInfo(nname)
5976 node_data[nname].Raise()
5977 if not isinstance(node_data[nname].data, dict):
5978 raise errors.OpExecError("Can't get data for node %s" % nname)
5979 remote_info = node_data[nname].data
5980 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5981 'vg_size', 'vg_free', 'cpu_total']:
5982 if attr not in remote_info:
5983 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5986 remote_info[attr] = int(remote_info[attr])
5987 except ValueError, err:
5988 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5989 " %s" % (nname, attr, str(err)))
5990 # compute memory used by primary instances
5991 i_p_mem = i_p_up_mem = 0
5992 for iinfo, beinfo in i_list:
5993 if iinfo.primary_node == nname:
5994 i_p_mem += beinfo[constants.BE_MEMORY]
5995 if iinfo.name not in node_iinfo[nname]:
5998 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5999 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6000 remote_info['memory_free'] -= max(0, i_mem_diff)
6002 if iinfo.status == "up":
6003 i_p_up_mem += beinfo[constants.BE_MEMORY]
6005 # compute memory used by instances
6007 "tags": list(ninfo.GetTags()),
6008 "total_memory": remote_info['memory_total'],
6009 "reserved_memory": remote_info['memory_dom0'],
6010 "free_memory": remote_info['memory_free'],
6011 "i_pri_memory": i_p_mem,
6012 "i_pri_up_memory": i_p_up_mem,
6013 "total_disk": remote_info['vg_size'],
6014 "free_disk": remote_info['vg_free'],
6015 "primary_ip": ninfo.primary_ip,
6016 "secondary_ip": ninfo.secondary_ip,
6017 "total_cpus": remote_info['cpu_total'],
6018 "offline": ninfo.offline,
6020 node_results[nname] = pnr
6021 data["nodes"] = node_results
6025 for iinfo, beinfo in i_list:
6026 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6027 for n in iinfo.nics]
6029 "tags": list(iinfo.GetTags()),
6030 "should_run": iinfo.status == "up",
6031 "vcpus": beinfo[constants.BE_VCPUS],
6032 "memory": beinfo[constants.BE_MEMORY],
6034 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6036 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6037 "disk_template": iinfo.disk_template,
6038 "hypervisor": iinfo.hypervisor,
6040 instance_data[iinfo.name] = pir
6042 data["instances"] = instance_data
6046 def _AddNewInstance(self):
6047 """Add new instance data to allocator structure.
6049 This in combination with _AllocatorGetClusterData will create the
6050 correct structure needed as input for the allocator.
6052 The checks for the completeness of the opcode must have already been
6057 if len(self.disks) != 2:
6058 raise errors.OpExecError("Only two-disk configurations supported")
6060 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6062 if self.disk_template in constants.DTS_NET_MIRROR:
6063 self.required_nodes = 2
6065 self.required_nodes = 1
6069 "disk_template": self.disk_template,
6072 "vcpus": self.vcpus,
6073 "memory": self.mem_size,
6074 "disks": self.disks,
6075 "disk_space_total": disk_space,
6077 "required_nodes": self.required_nodes,
6079 data["request"] = request
6081 def _AddRelocateInstance(self):
6082 """Add relocate instance data to allocator structure.
6084 This in combination with _IAllocatorGetClusterData will create the
6085 correct structure needed as input for the allocator.
6087 The checks for the completeness of the opcode must have already been
6091 instance = self.lu.cfg.GetInstanceInfo(self.name)
6092 if instance is None:
6093 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6094 " IAllocator" % self.name)
6096 if instance.disk_template not in constants.DTS_NET_MIRROR:
6097 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6099 if len(instance.secondary_nodes) != 1:
6100 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6102 self.required_nodes = 1
6103 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6104 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6109 "disk_space_total": disk_space,
6110 "required_nodes": self.required_nodes,
6111 "relocate_from": self.relocate_from,
6113 self.in_data["request"] = request
6115 def _BuildInputData(self):
6116 """Build input data structures.
6119 self._ComputeClusterData()
6121 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6122 self._AddNewInstance()
6124 self._AddRelocateInstance()
6126 self.in_text = serializer.Dump(self.in_data)
6128 def Run(self, name, validate=True, call_fn=None):
6129 """Run an instance allocator and return the results.
6133 call_fn = self.lu.rpc.call_iallocator_runner
6136 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6139 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6140 raise errors.OpExecError("Invalid result from master iallocator runner")
6142 rcode, stdout, stderr, fail = result.data
6144 if rcode == constants.IARUN_NOTFOUND:
6145 raise errors.OpExecError("Can't find allocator '%s'" % name)
6146 elif rcode == constants.IARUN_FAILURE:
6147 raise errors.OpExecError("Instance allocator call failed: %s,"
6148 " output: %s" % (fail, stdout+stderr))
6149 self.out_text = stdout
6151 self._ValidateResult()
6153 def _ValidateResult(self):
6154 """Process the allocator results.
6156 This will process and if successful save the result in
6157 self.out_data and the other parameters.
6161 rdict = serializer.Load(self.out_text)
6162 except Exception, err:
6163 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6165 if not isinstance(rdict, dict):
6166 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6168 for key in "success", "info", "nodes":
6169 if key not in rdict:
6170 raise errors.OpExecError("Can't parse iallocator results:"
6171 " missing key '%s'" % key)
6172 setattr(self, key, rdict[key])
6174 if not isinstance(rdict["nodes"], list):
6175 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6177 self.out_data = rdict
6180 class LUTestAllocator(NoHooksLU):
6181 """Run allocator tests.
6183 This LU runs the allocator tests
6186 _OP_REQP = ["direction", "mode", "name"]
6188 def CheckPrereq(self):
6189 """Check prerequisites.
6191 This checks the opcode parameters depending on the director and mode test.
6194 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6195 for attr in ["name", "mem_size", "disks", "disk_template",
6196 "os", "tags", "nics", "vcpus"]:
6197 if not hasattr(self.op, attr):
6198 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6200 iname = self.cfg.ExpandInstanceName(self.op.name)
6201 if iname is not None:
6202 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6204 if not isinstance(self.op.nics, list):
6205 raise errors.OpPrereqError("Invalid parameter 'nics'")
6206 for row in self.op.nics:
6207 if (not isinstance(row, dict) or
6210 "bridge" not in row):
6211 raise errors.OpPrereqError("Invalid contents of the"
6212 " 'nics' parameter")
6213 if not isinstance(self.op.disks, list):
6214 raise errors.OpPrereqError("Invalid parameter 'disks'")
6215 if len(self.op.disks) != 2:
6216 raise errors.OpPrereqError("Only two-disk configurations supported")
6217 for row in self.op.disks:
6218 if (not isinstance(row, dict) or
6219 "size" not in row or
6220 not isinstance(row["size"], int) or
6221 "mode" not in row or
6222 row["mode"] not in ['r', 'w']):
6223 raise errors.OpPrereqError("Invalid contents of the"
6224 " 'disks' parameter")
6225 if self.op.hypervisor is None:
6226 self.op.hypervisor = self.cfg.GetHypervisorType()
6227 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6228 if not hasattr(self.op, "name"):
6229 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6230 fname = self.cfg.ExpandInstanceName(self.op.name)
6232 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6234 self.op.name = fname
6235 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6237 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6240 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6241 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6242 raise errors.OpPrereqError("Missing allocator name")
6243 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6244 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6247 def Exec(self, feedback_fn):
6248 """Run the allocator test.
6251 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6252 ial = IAllocator(self,
6255 mem_size=self.op.mem_size,
6256 disks=self.op.disks,
6257 disk_template=self.op.disk_template,
6261 vcpus=self.op.vcpus,
6262 hypervisor=self.op.hypervisor,
6265 ial = IAllocator(self,
6268 relocate_from=list(self.relocate_from),
6271 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6272 result = ial.in_text
6274 ial.Run(self.op.allocator, validate=False)
6275 result = ial.out_text