4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the nodes is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445 memory, vcpus, nics):
446 """Builds instance related env variables for hooks
448 This builds the hook environment from individual variables.
451 @param name: the name of the instance
452 @type primary_node: string
453 @param primary_node: the name of the instance's primary node
454 @type secondary_nodes: list
455 @param secondary_nodes: list of secondary nodes as strings
456 @type os_type: string
457 @param os_type: the name of the instance's OS
459 @param status: the desired status of the instances
461 @param memory: the memory size of the instance
463 @param vcpus: the count of VCPUs the instance has
465 @param nics: list of tuples (ip, bridge, mac) representing
466 the NICs the instance has
468 @return: the hook environment for this instance
473 "INSTANCE_NAME": name,
474 "INSTANCE_PRIMARY": primary_node,
475 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476 "INSTANCE_OS_TYPE": os_type,
477 "INSTANCE_STATUS": status,
478 "INSTANCE_MEMORY": memory,
479 "INSTANCE_VCPUS": vcpus,
483 nic_count = len(nics)
484 for idx, (ip, bridge, mac) in enumerate(nics):
487 env["INSTANCE_NIC%d_IP" % idx] = ip
488 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
493 env["INSTANCE_NIC_COUNT"] = nic_count
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499 """Builds instance related env variables for hooks from an object.
501 @type lu: L{LogicalUnit}
502 @param lu: the logical unit on whose behalf we execute
503 @type instance: L{objects.Instance}
504 @param instance: the instance for which we should build the
507 @param override: dictionary with key/values that will override
510 @return: the hook environment dictionary
513 bep = lu.cfg.GetClusterInfo().FillBE(instance)
515 'name': instance.name,
516 'primary_node': instance.primary_node,
517 'secondary_nodes': instance.secondary_nodes,
518 'os_type': instance.os,
519 'status': instance.os,
520 'memory': bep[constants.BE_MEMORY],
521 'vcpus': bep[constants.BE_VCPUS],
522 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
525 args.update(override)
526 return _BuildInstanceHookEnv(**args)
529 def _AdjustCandidatePool(lu):
530 """Adjust the candidate pool after node operations.
533 mod_list = lu.cfg.MaintainCandidatePool()
535 lu.LogInfo("Promoted nodes to master candidate role: %s",
536 ", ".join(node.name for node in mod_list))
537 for name in mod_list:
538 lu.context.ReaddNode(name)
539 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
541 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
545 def _CheckInstanceBridgesExist(lu, instance):
546 """Check that the brigdes needed by an instance exist.
549 # check bridges existance
550 brlist = [nic.bridge for nic in instance.nics]
551 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
554 raise errors.OpPrereqError("One or more target bridges %s does not"
555 " exist on destination node '%s'" %
556 (brlist, instance.primary_node))
559 class LUDestroyCluster(NoHooksLU):
560 """Logical unit for destroying the cluster.
565 def CheckPrereq(self):
566 """Check prerequisites.
568 This checks whether the cluster is empty.
570 Any errors are signalled by raising errors.OpPrereqError.
573 master = self.cfg.GetMasterNode()
575 nodelist = self.cfg.GetNodeList()
576 if len(nodelist) != 1 or nodelist[0] != master:
577 raise errors.OpPrereqError("There are still %d node(s) in"
578 " this cluster." % (len(nodelist) - 1))
579 instancelist = self.cfg.GetInstanceList()
581 raise errors.OpPrereqError("There are still %d instance(s) in"
582 " this cluster." % len(instancelist))
584 def Exec(self, feedback_fn):
585 """Destroys the cluster.
588 master = self.cfg.GetMasterNode()
589 result = self.rpc.call_node_stop_master(master, False)
592 raise errors.OpExecError("Could not disable the master role")
593 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594 utils.CreateBackup(priv_key)
595 utils.CreateBackup(pub_key)
599 class LUVerifyCluster(LogicalUnit):
600 """Verifies the cluster status.
603 HPATH = "cluster-verify"
604 HTYPE = constants.HTYPE_CLUSTER
605 _OP_REQP = ["skip_checks"]
608 def ExpandNames(self):
609 self.needed_locks = {
610 locking.LEVEL_NODE: locking.ALL_SET,
611 locking.LEVEL_INSTANCE: locking.ALL_SET,
613 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
615 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616 node_result, feedback_fn, master_files):
617 """Run multiple tests against a node.
621 - compares ganeti version
622 - checks vg existance and size > 20G
623 - checks config file checksum
624 - checks ssh to other nodes
626 @type nodeinfo: L{objects.Node}
627 @param nodeinfo: the node to check
628 @param file_list: required list of files
629 @param local_cksum: dictionary of local files and their checksums
630 @param node_result: the results from the node
631 @param feedback_fn: function used to accumulate results
632 @param master_files: list of files that only masters should have
637 # main result, node_result should be a non-empty dict
638 if not node_result or not isinstance(node_result, dict):
639 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
642 # compares ganeti version
643 local_version = constants.PROTOCOL_VERSION
644 remote_version = node_result.get('version', None)
645 if not remote_version:
646 feedback_fn(" - ERROR: connection to %s failed" % (node))
649 if local_version != remote_version:
650 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
651 (local_version, node, remote_version))
654 # checks vg existance and size > 20G
657 vglist = node_result.get(constants.NV_VGLIST, None)
659 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
663 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664 constants.MIN_VG_SIZE)
666 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
669 # checks config file checksum
671 remote_cksum = node_result.get(constants.NV_FILELIST, None)
672 if not isinstance(remote_cksum, dict):
674 feedback_fn(" - ERROR: node hasn't returned file checksum data")
676 for file_name in file_list:
677 node_is_mc = nodeinfo.master_candidate
678 must_have_file = file_name not in master_files
679 if file_name not in remote_cksum:
680 if node_is_mc or must_have_file:
682 feedback_fn(" - ERROR: file '%s' missing" % file_name)
683 elif remote_cksum[file_name] != local_cksum[file_name]:
684 if node_is_mc or must_have_file:
686 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
688 # not candidate and this is not a must-have file
690 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
693 # all good, except non-master/non-must have combination
694 if not node_is_mc and not must_have_file:
695 feedback_fn(" - ERROR: file '%s' should not exist on non master"
696 " candidates" % file_name)
700 if constants.NV_NODELIST not in node_result:
702 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
704 if node_result[constants.NV_NODELIST]:
706 for node in node_result[constants.NV_NODELIST]:
707 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
708 (node, node_result[constants.NV_NODELIST][node]))
710 if constants.NV_NODENETTEST not in node_result:
712 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
714 if node_result[constants.NV_NODENETTEST]:
716 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
718 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
719 (node, node_result[constants.NV_NODENETTEST][node]))
721 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722 if isinstance(hyp_result, dict):
723 for hv_name, hv_result in hyp_result.iteritems():
724 if hv_result is not None:
725 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
726 (hv_name, hv_result))
729 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730 node_instance, feedback_fn, n_offline):
731 """Verify an instance.
733 This function checks to see if the required block devices are
734 available on the instance's node.
739 node_current = instanceconfig.primary_node
742 instanceconfig.MapLVsByNode(node_vol_should)
744 for node in node_vol_should:
745 if node in n_offline:
746 # ignore missing volumes on offline nodes
748 for volume in node_vol_should[node]:
749 if node not in node_vol_is or volume not in node_vol_is[node]:
750 feedback_fn(" - ERROR: volume %s missing on node %s" %
754 if not instanceconfig.status == 'down':
755 if ((node_current not in node_instance or
756 not instance in node_instance[node_current]) and
757 node_current not in n_offline):
758 feedback_fn(" - ERROR: instance %s not running on node %s" %
759 (instance, node_current))
762 for node in node_instance:
763 if (not node == node_current):
764 if instance in node_instance[node]:
765 feedback_fn(" - ERROR: instance %s should not run on node %s" %
771 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772 """Verify if there are any unknown volumes in the cluster.
774 The .os, .swap and backup volumes are ignored. All other volumes are
780 for node in node_vol_is:
781 for volume in node_vol_is[node]:
782 if node not in node_vol_should or volume not in node_vol_should[node]:
783 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
788 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789 """Verify the list of running instances.
791 This checks what instances are running but unknown to the cluster.
795 for node in node_instance:
796 for runninginstance in node_instance[node]:
797 if runninginstance not in instancelist:
798 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
799 (runninginstance, node))
803 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804 """Verify N+1 Memory Resilience.
806 Check that if one single node dies we can still start all the instances it
812 for node, nodeinfo in node_info.iteritems():
813 # This code checks that every node which is now listed as secondary has
814 # enough memory to host all instances it is supposed to should a single
815 # other node in the cluster fail.
816 # FIXME: not ready for failover to an arbitrary node
817 # FIXME: does not support file-backed instances
818 # WARNING: we currently take into account down instances as well as up
819 # ones, considering that even if they're down someone might want to start
820 # them even in the event of a node failure.
821 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
823 for instance in instances:
824 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825 if bep[constants.BE_AUTO_BALANCE]:
826 needed_mem += bep[constants.BE_MEMORY]
827 if nodeinfo['mfree'] < needed_mem:
828 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
829 " failovers should node %s fail" % (node, prinode))
833 def CheckPrereq(self):
834 """Check prerequisites.
836 Transform the list of checks we're going to skip into a set and check that
837 all its members are valid.
840 self.skip_set = frozenset(self.op.skip_checks)
841 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842 raise errors.OpPrereqError("Invalid checks to be skipped specified")
844 def BuildHooksEnv(self):
847 Cluster-Verify hooks just rone in the post phase and their failure makes
848 the output be logged in the verify output and the verification to fail.
851 all_nodes = self.cfg.GetNodeList()
852 # TODO: populate the environment with useful information for verify hooks
854 return env, [], all_nodes
856 def Exec(self, feedback_fn):
857 """Verify integrity of cluster, performing various test on nodes.
861 feedback_fn("* Verifying global settings")
862 for msg in self.cfg.VerifyConfig():
863 feedback_fn(" - ERROR: %s" % msg)
865 vg_name = self.cfg.GetVGName()
866 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867 nodelist = utils.NiceSort(self.cfg.GetNodeList())
868 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870 i_non_redundant = [] # Non redundant instances
871 i_non_a_balanced = [] # Non auto-balanced instances
872 n_offline = [] # List of offline nodes
878 # FIXME: verify OS list
880 master_files = [constants.CLUSTER_CONF_FILE]
882 file_names = ssconf.SimpleStore().GetFileList()
883 file_names.append(constants.SSL_CERT_FILE)
884 file_names.extend(master_files)
886 local_checksums = utils.FingerprintFiles(file_names)
888 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
889 node_verify_param = {
890 constants.NV_FILELIST: file_names,
891 constants.NV_NODELIST: nodelist,
892 constants.NV_HYPERVISOR: hypervisors,
893 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
894 node.secondary_ip) for node in nodeinfo],
895 constants.NV_LVLIST: vg_name,
896 constants.NV_INSTANCELIST: hypervisors,
897 constants.NV_VGLIST: None,
898 constants.NV_VERSION: None,
899 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
901 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
902 self.cfg.GetClusterName())
904 cluster = self.cfg.GetClusterInfo()
905 master_node = self.cfg.GetMasterNode()
906 for node_i in nodeinfo:
908 nresult = all_nvinfo[node].data
911 feedback_fn("* Skipping offline node %s" % (node,))
912 n_offline.append(node)
915 if node == master_node:
917 elif node_i.master_candidate:
918 ntype = "master candidate"
921 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
923 if all_nvinfo[node].failed or not isinstance(nresult, dict):
924 feedback_fn(" - ERROR: connection to %s failed" % (node,))
928 result = self._VerifyNode(node_i, file_names, local_checksums,
929 nresult, feedback_fn, master_files)
932 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
933 if isinstance(lvdata, basestring):
934 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
935 (node, lvdata.encode('string_escape')))
937 node_volume[node] = {}
938 elif not isinstance(lvdata, dict):
939 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
943 node_volume[node] = lvdata
946 idata = nresult.get(constants.NV_INSTANCELIST, None)
947 if not isinstance(idata, list):
948 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
953 node_instance[node] = idata
956 nodeinfo = nresult.get(constants.NV_HVINFO, None)
957 if not isinstance(nodeinfo, dict):
958 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
964 "mfree": int(nodeinfo['memory_free']),
965 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
968 # dictionary holding all instances this node is secondary for,
969 # grouped by their primary node. Each key is a cluster node, and each
970 # value is a list of instances which have the key as primary and the
971 # current node as secondary. this is handy to calculate N+1 memory
972 # availability if you can only failover from a primary to its
974 "sinst-by-pnode": {},
977 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
983 for instance in instancelist:
984 feedback_fn("* Verifying instance %s" % instance)
985 inst_config = self.cfg.GetInstanceInfo(instance)
986 result = self._VerifyInstance(instance, inst_config, node_volume,
987 node_instance, feedback_fn, n_offline)
989 inst_nodes_offline = []
991 inst_config.MapLVsByNode(node_vol_should)
993 instance_cfg[instance] = inst_config
995 pnode = inst_config.primary_node
996 if pnode in node_info:
997 node_info[pnode]['pinst'].append(instance)
998 elif pnode not in n_offline:
999 feedback_fn(" - ERROR: instance %s, connection to primary node"
1000 " %s failed" % (instance, pnode))
1003 if pnode in n_offline:
1004 inst_nodes_offline.append(pnode)
1006 # If the instance is non-redundant we cannot survive losing its primary
1007 # node, so we are not N+1 compliant. On the other hand we have no disk
1008 # templates with more than one secondary so that situation is not well
1010 # FIXME: does not support file-backed instances
1011 if len(inst_config.secondary_nodes) == 0:
1012 i_non_redundant.append(instance)
1013 elif len(inst_config.secondary_nodes) > 1:
1014 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1017 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1018 i_non_a_balanced.append(instance)
1020 for snode in inst_config.secondary_nodes:
1021 if snode in node_info:
1022 node_info[snode]['sinst'].append(instance)
1023 if pnode not in node_info[snode]['sinst-by-pnode']:
1024 node_info[snode]['sinst-by-pnode'][pnode] = []
1025 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1026 elif snode not in n_offline:
1027 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1028 " %s failed" % (instance, snode))
1030 if snode in n_offline:
1031 inst_nodes_offline.append(snode)
1033 if inst_nodes_offline:
1034 # warn that the instance lives on offline nodes, and set bad=True
1035 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1036 ", ".join(inst_nodes_offline))
1039 feedback_fn("* Verifying orphan volumes")
1040 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1044 feedback_fn("* Verifying remaining instances")
1045 result = self._VerifyOrphanInstances(instancelist, node_instance,
1049 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1050 feedback_fn("* Verifying N+1 Memory redundancy")
1051 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1054 feedback_fn("* Other Notes")
1056 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1057 % len(i_non_redundant))
1059 if i_non_a_balanced:
1060 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1061 % len(i_non_a_balanced))
1064 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1068 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1069 """Analize the post-hooks' result
1071 This method analyses the hook result, handles it, and sends some
1072 nicely-formatted feedback back to the user.
1074 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1075 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1076 @param hooks_results: the results of the multi-node hooks rpc call
1077 @param feedback_fn: function used send feedback back to the caller
1078 @param lu_result: previous Exec result
1079 @return: the new Exec result, based on the previous result
1083 # We only really run POST phase hooks, and are only interested in
1085 if phase == constants.HOOKS_PHASE_POST:
1086 # Used to change hooks' output to proper indentation
1087 indent_re = re.compile('^', re.M)
1088 feedback_fn("* Hooks Results")
1089 if not hooks_results:
1090 feedback_fn(" - ERROR: general communication failure")
1093 for node_name in hooks_results:
1094 show_node_header = True
1095 res = hooks_results[node_name]
1096 if res.failed or res.data is False or not isinstance(res.data, list):
1098 # no need to warn or set fail return value
1100 feedback_fn(" Communication failure in hooks execution")
1103 for script, hkr, output in res.data:
1104 if hkr == constants.HKR_FAIL:
1105 # The node header is only shown once, if there are
1106 # failing hooks on that node
1107 if show_node_header:
1108 feedback_fn(" Node %s:" % node_name)
1109 show_node_header = False
1110 feedback_fn(" ERROR: Script %s failed, output:" % script)
1111 output = indent_re.sub(' ', output)
1112 feedback_fn("%s" % output)
1118 class LUVerifyDisks(NoHooksLU):
1119 """Verifies the cluster disks status.
1125 def ExpandNames(self):
1126 self.needed_locks = {
1127 locking.LEVEL_NODE: locking.ALL_SET,
1128 locking.LEVEL_INSTANCE: locking.ALL_SET,
1130 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1132 def CheckPrereq(self):
1133 """Check prerequisites.
1135 This has no prerequisites.
1140 def Exec(self, feedback_fn):
1141 """Verify integrity of cluster disks.
1144 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1146 vg_name = self.cfg.GetVGName()
1147 nodes = utils.NiceSort(self.cfg.GetNodeList())
1148 instances = [self.cfg.GetInstanceInfo(name)
1149 for name in self.cfg.GetInstanceList()]
1152 for inst in instances:
1154 if (inst.status != "up" or
1155 inst.disk_template not in constants.DTS_NET_MIRROR):
1157 inst.MapLVsByNode(inst_lvs)
1158 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1159 for node, vol_list in inst_lvs.iteritems():
1160 for vol in vol_list:
1161 nv_dict[(node, vol)] = inst
1166 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1171 lvs = node_lvs[node]
1174 self.LogWarning("Connection to node %s failed: %s" %
1178 if isinstance(lvs, basestring):
1179 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1180 res_nlvm[node] = lvs
1181 elif not isinstance(lvs, dict):
1182 logging.warning("Connection to node %s failed or invalid data"
1184 res_nodes.append(node)
1187 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1188 inst = nv_dict.pop((node, lv_name), None)
1189 if (not lv_online and inst is not None
1190 and inst.name not in res_instances):
1191 res_instances.append(inst.name)
1193 # any leftover items in nv_dict are missing LVs, let's arrange the
1195 for key, inst in nv_dict.iteritems():
1196 if inst.name not in res_missing:
1197 res_missing[inst.name] = []
1198 res_missing[inst.name].append(key)
1203 class LURenameCluster(LogicalUnit):
1204 """Rename the cluster.
1207 HPATH = "cluster-rename"
1208 HTYPE = constants.HTYPE_CLUSTER
1211 def BuildHooksEnv(self):
1216 "OP_TARGET": self.cfg.GetClusterName(),
1217 "NEW_NAME": self.op.name,
1219 mn = self.cfg.GetMasterNode()
1220 return env, [mn], [mn]
1222 def CheckPrereq(self):
1223 """Verify that the passed name is a valid one.
1226 hostname = utils.HostInfo(self.op.name)
1228 new_name = hostname.name
1229 self.ip = new_ip = hostname.ip
1230 old_name = self.cfg.GetClusterName()
1231 old_ip = self.cfg.GetMasterIP()
1232 if new_name == old_name and new_ip == old_ip:
1233 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1234 " cluster has changed")
1235 if new_ip != old_ip:
1236 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1237 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1238 " reachable on the network. Aborting." %
1241 self.op.name = new_name
1243 def Exec(self, feedback_fn):
1244 """Rename the cluster.
1247 clustername = self.op.name
1250 # shutdown the master IP
1251 master = self.cfg.GetMasterNode()
1252 result = self.rpc.call_node_stop_master(master, False)
1253 if result.failed or not result.data:
1254 raise errors.OpExecError("Could not disable the master role")
1257 cluster = self.cfg.GetClusterInfo()
1258 cluster.cluster_name = clustername
1259 cluster.master_ip = ip
1260 self.cfg.Update(cluster)
1262 # update the known hosts file
1263 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1264 node_list = self.cfg.GetNodeList()
1266 node_list.remove(master)
1269 result = self.rpc.call_upload_file(node_list,
1270 constants.SSH_KNOWN_HOSTS_FILE)
1271 for to_node, to_result in result.iteritems():
1272 if to_result.failed or not to_result.data:
1273 logging.error("Copy of file %s to node %s failed",
1274 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1277 result = self.rpc.call_node_start_master(master, False)
1278 if result.failed or not result.data:
1279 self.LogWarning("Could not re-enable the master role on"
1280 " the master, please restart manually.")
1283 def _RecursiveCheckIfLVMBased(disk):
1284 """Check if the given disk or its children are lvm-based.
1286 @type disk: L{objects.Disk}
1287 @param disk: the disk to check
1289 @return: boolean indicating whether a LD_LV dev_type was found or not
1293 for chdisk in disk.children:
1294 if _RecursiveCheckIfLVMBased(chdisk):
1296 return disk.dev_type == constants.LD_LV
1299 class LUSetClusterParams(LogicalUnit):
1300 """Change the parameters of the cluster.
1303 HPATH = "cluster-modify"
1304 HTYPE = constants.HTYPE_CLUSTER
1308 def CheckParameters(self):
1312 if not hasattr(self.op, "candidate_pool_size"):
1313 self.op.candidate_pool_size = None
1314 if self.op.candidate_pool_size is not None:
1316 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1317 except ValueError, err:
1318 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1320 if self.op.candidate_pool_size < 1:
1321 raise errors.OpPrereqError("At least one master candidate needed")
1323 def ExpandNames(self):
1324 # FIXME: in the future maybe other cluster params won't require checking on
1325 # all nodes to be modified.
1326 self.needed_locks = {
1327 locking.LEVEL_NODE: locking.ALL_SET,
1329 self.share_locks[locking.LEVEL_NODE] = 1
1331 def BuildHooksEnv(self):
1336 "OP_TARGET": self.cfg.GetClusterName(),
1337 "NEW_VG_NAME": self.op.vg_name,
1339 mn = self.cfg.GetMasterNode()
1340 return env, [mn], [mn]
1342 def CheckPrereq(self):
1343 """Check prerequisites.
1345 This checks whether the given params don't conflict and
1346 if the given volume group is valid.
1349 # FIXME: This only works because there is only one parameter that can be
1350 # changed or removed.
1351 if self.op.vg_name is not None and not self.op.vg_name:
1352 instances = self.cfg.GetAllInstancesInfo().values()
1353 for inst in instances:
1354 for disk in inst.disks:
1355 if _RecursiveCheckIfLVMBased(disk):
1356 raise errors.OpPrereqError("Cannot disable lvm storage while"
1357 " lvm-based instances exist")
1359 node_list = self.acquired_locks[locking.LEVEL_NODE]
1361 # if vg_name not None, checks given volume group on all nodes
1363 vglist = self.rpc.call_vg_list(node_list)
1364 for node in node_list:
1365 if vglist[node].failed:
1366 # ignoring down node
1367 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1369 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1371 constants.MIN_VG_SIZE)
1373 raise errors.OpPrereqError("Error on node '%s': %s" %
1376 self.cluster = cluster = self.cfg.GetClusterInfo()
1377 # validate beparams changes
1378 if self.op.beparams:
1379 utils.CheckBEParams(self.op.beparams)
1380 self.new_beparams = cluster.FillDict(
1381 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1383 # hypervisor list/parameters
1384 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1385 if self.op.hvparams:
1386 if not isinstance(self.op.hvparams, dict):
1387 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1388 for hv_name, hv_dict in self.op.hvparams.items():
1389 if hv_name not in self.new_hvparams:
1390 self.new_hvparams[hv_name] = hv_dict
1392 self.new_hvparams[hv_name].update(hv_dict)
1394 if self.op.enabled_hypervisors is not None:
1395 self.hv_list = self.op.enabled_hypervisors
1397 self.hv_list = cluster.enabled_hypervisors
1399 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1400 # either the enabled list has changed, or the parameters have, validate
1401 for hv_name, hv_params in self.new_hvparams.items():
1402 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1403 (self.op.enabled_hypervisors and
1404 hv_name in self.op.enabled_hypervisors)):
1405 # either this is a new hypervisor, or its parameters have changed
1406 hv_class = hypervisor.GetHypervisor(hv_name)
1407 hv_class.CheckParameterSyntax(hv_params)
1408 _CheckHVParams(self, node_list, hv_name, hv_params)
1410 def Exec(self, feedback_fn):
1411 """Change the parameters of the cluster.
1414 if self.op.vg_name is not None:
1415 if self.op.vg_name != self.cfg.GetVGName():
1416 self.cfg.SetVGName(self.op.vg_name)
1418 feedback_fn("Cluster LVM configuration already in desired"
1419 " state, not changing")
1420 if self.op.hvparams:
1421 self.cluster.hvparams = self.new_hvparams
1422 if self.op.enabled_hypervisors is not None:
1423 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1424 if self.op.beparams:
1425 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1426 if self.op.candidate_pool_size is not None:
1427 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1429 self.cfg.Update(self.cluster)
1431 # we want to update nodes after the cluster so that if any errors
1432 # happen, we have recorded and saved the cluster info
1433 if self.op.candidate_pool_size is not None:
1434 _AdjustCandidatePool(self)
1437 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1438 """Sleep and poll for an instance's disk to sync.
1441 if not instance.disks:
1445 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1447 node = instance.primary_node
1449 for dev in instance.disks:
1450 lu.cfg.SetDiskID(dev, node)
1456 cumul_degraded = False
1457 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1458 if rstats.failed or not rstats.data:
1459 lu.LogWarning("Can't get any data from node %s", node)
1462 raise errors.RemoteError("Can't contact node %s for mirror data,"
1463 " aborting." % node)
1466 rstats = rstats.data
1468 for i in range(len(rstats)):
1471 lu.LogWarning("Can't compute data for node %s/%s",
1472 node, instance.disks[i].iv_name)
1474 # we ignore the ldisk parameter
1475 perc_done, est_time, is_degraded, _ = mstat
1476 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1477 if perc_done is not None:
1479 if est_time is not None:
1480 rem_time = "%d estimated seconds remaining" % est_time
1483 rem_time = "no time estimate"
1484 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1485 (instance.disks[i].iv_name, perc_done, rem_time))
1489 time.sleep(min(60, max_time))
1492 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1493 return not cumul_degraded
1496 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1497 """Check that mirrors are not degraded.
1499 The ldisk parameter, if True, will change the test from the
1500 is_degraded attribute (which represents overall non-ok status for
1501 the device(s)) to the ldisk (representing the local storage status).
1504 lu.cfg.SetDiskID(dev, node)
1511 if on_primary or dev.AssembleOnSecondary():
1512 rstats = lu.rpc.call_blockdev_find(node, dev)
1513 if rstats.failed or not rstats.data:
1514 logging.warning("Node %s: disk degraded, not found or node down", node)
1517 result = result and (not rstats.data[idx])
1519 for child in dev.children:
1520 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1525 class LUDiagnoseOS(NoHooksLU):
1526 """Logical unit for OS diagnose/query.
1529 _OP_REQP = ["output_fields", "names"]
1531 _FIELDS_STATIC = utils.FieldSet()
1532 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1534 def ExpandNames(self):
1536 raise errors.OpPrereqError("Selective OS query not supported")
1538 _CheckOutputFields(static=self._FIELDS_STATIC,
1539 dynamic=self._FIELDS_DYNAMIC,
1540 selected=self.op.output_fields)
1542 # Lock all nodes, in shared mode
1543 self.needed_locks = {}
1544 self.share_locks[locking.LEVEL_NODE] = 1
1545 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1547 def CheckPrereq(self):
1548 """Check prerequisites.
1553 def _DiagnoseByOS(node_list, rlist):
1554 """Remaps a per-node return list into an a per-os per-node dictionary
1556 @param node_list: a list with the names of all nodes
1557 @param rlist: a map with node names as keys and OS objects as values
1560 @returns: a dictionary with osnames as keys and as value another map, with
1561 nodes as keys and list of OS objects as values, eg::
1563 {"debian-etch": {"node1": [<object>,...],
1564 "node2": [<object>,]}
1569 for node_name, nr in rlist.iteritems():
1570 if nr.failed or not nr.data:
1572 for os_obj in nr.data:
1573 if os_obj.name not in all_os:
1574 # build a list of nodes for this os containing empty lists
1575 # for each node in node_list
1576 all_os[os_obj.name] = {}
1577 for nname in node_list:
1578 all_os[os_obj.name][nname] = []
1579 all_os[os_obj.name][node_name].append(os_obj)
1582 def Exec(self, feedback_fn):
1583 """Compute the list of OSes.
1586 node_list = self.acquired_locks[locking.LEVEL_NODE]
1587 node_data = self.rpc.call_os_diagnose(node_list)
1588 if node_data == False:
1589 raise errors.OpExecError("Can't gather the list of OSes")
1590 pol = self._DiagnoseByOS(node_list, node_data)
1592 for os_name, os_data in pol.iteritems():
1594 for field in self.op.output_fields:
1597 elif field == "valid":
1598 val = utils.all([osl and osl[0] for osl in os_data.values()])
1599 elif field == "node_status":
1601 for node_name, nos_list in os_data.iteritems():
1602 val[node_name] = [(v.status, v.path) for v in nos_list]
1604 raise errors.ParameterError(field)
1611 class LURemoveNode(LogicalUnit):
1612 """Logical unit for removing a node.
1615 HPATH = "node-remove"
1616 HTYPE = constants.HTYPE_NODE
1617 _OP_REQP = ["node_name"]
1619 def BuildHooksEnv(self):
1622 This doesn't run on the target node in the pre phase as a failed
1623 node would then be impossible to remove.
1627 "OP_TARGET": self.op.node_name,
1628 "NODE_NAME": self.op.node_name,
1630 all_nodes = self.cfg.GetNodeList()
1631 all_nodes.remove(self.op.node_name)
1632 return env, all_nodes, all_nodes
1634 def CheckPrereq(self):
1635 """Check prerequisites.
1638 - the node exists in the configuration
1639 - it does not have primary or secondary instances
1640 - it's not the master
1642 Any errors are signalled by raising errors.OpPrereqError.
1645 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1647 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1649 instance_list = self.cfg.GetInstanceList()
1651 masternode = self.cfg.GetMasterNode()
1652 if node.name == masternode:
1653 raise errors.OpPrereqError("Node is the master node,"
1654 " you need to failover first.")
1656 for instance_name in instance_list:
1657 instance = self.cfg.GetInstanceInfo(instance_name)
1658 if node.name == instance.primary_node:
1659 raise errors.OpPrereqError("Instance %s still running on the node,"
1660 " please remove first." % instance_name)
1661 if node.name in instance.secondary_nodes:
1662 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1663 " please remove first." % instance_name)
1664 self.op.node_name = node.name
1667 def Exec(self, feedback_fn):
1668 """Removes the node from the cluster.
1672 logging.info("Stopping the node daemon and removing configs from node %s",
1675 self.context.RemoveNode(node.name)
1677 self.rpc.call_node_leave_cluster(node.name)
1679 # Promote nodes to master candidate as needed
1680 _AdjustCandidatePool(self)
1683 class LUQueryNodes(NoHooksLU):
1684 """Logical unit for querying nodes.
1687 _OP_REQP = ["output_fields", "names"]
1689 _FIELDS_DYNAMIC = utils.FieldSet(
1691 "mtotal", "mnode", "mfree",
1696 _FIELDS_STATIC = utils.FieldSet(
1697 "name", "pinst_cnt", "sinst_cnt",
1698 "pinst_list", "sinst_list",
1699 "pip", "sip", "tags",
1706 def ExpandNames(self):
1707 _CheckOutputFields(static=self._FIELDS_STATIC,
1708 dynamic=self._FIELDS_DYNAMIC,
1709 selected=self.op.output_fields)
1711 self.needed_locks = {}
1712 self.share_locks[locking.LEVEL_NODE] = 1
1715 self.wanted = _GetWantedNodes(self, self.op.names)
1717 self.wanted = locking.ALL_SET
1719 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1721 # if we don't request only static fields, we need to lock the nodes
1722 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1725 def CheckPrereq(self):
1726 """Check prerequisites.
1729 # The validation of the node list is done in the _GetWantedNodes,
1730 # if non empty, and if empty, there's no validation to do
1733 def Exec(self, feedback_fn):
1734 """Computes the list of nodes and their attributes.
1737 all_info = self.cfg.GetAllNodesInfo()
1739 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1740 elif self.wanted != locking.ALL_SET:
1741 nodenames = self.wanted
1742 missing = set(nodenames).difference(all_info.keys())
1744 raise errors.OpExecError(
1745 "Some nodes were removed before retrieving their data: %s" % missing)
1747 nodenames = all_info.keys()
1749 nodenames = utils.NiceSort(nodenames)
1750 nodelist = [all_info[name] for name in nodenames]
1752 # begin data gathering
1756 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1757 self.cfg.GetHypervisorType())
1758 for name in nodenames:
1759 nodeinfo = node_data[name]
1760 if not nodeinfo.failed and nodeinfo.data:
1761 nodeinfo = nodeinfo.data
1762 fn = utils.TryConvert
1764 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1765 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1766 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1767 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1768 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1769 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1770 "bootid": nodeinfo.get('bootid', None),
1773 live_data[name] = {}
1775 live_data = dict.fromkeys(nodenames, {})
1777 node_to_primary = dict([(name, set()) for name in nodenames])
1778 node_to_secondary = dict([(name, set()) for name in nodenames])
1780 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1781 "sinst_cnt", "sinst_list"))
1782 if inst_fields & frozenset(self.op.output_fields):
1783 instancelist = self.cfg.GetInstanceList()
1785 for instance_name in instancelist:
1786 inst = self.cfg.GetInstanceInfo(instance_name)
1787 if inst.primary_node in node_to_primary:
1788 node_to_primary[inst.primary_node].add(inst.name)
1789 for secnode in inst.secondary_nodes:
1790 if secnode in node_to_secondary:
1791 node_to_secondary[secnode].add(inst.name)
1793 master_node = self.cfg.GetMasterNode()
1795 # end data gathering
1798 for node in nodelist:
1800 for field in self.op.output_fields:
1803 elif field == "pinst_list":
1804 val = list(node_to_primary[node.name])
1805 elif field == "sinst_list":
1806 val = list(node_to_secondary[node.name])
1807 elif field == "pinst_cnt":
1808 val = len(node_to_primary[node.name])
1809 elif field == "sinst_cnt":
1810 val = len(node_to_secondary[node.name])
1811 elif field == "pip":
1812 val = node.primary_ip
1813 elif field == "sip":
1814 val = node.secondary_ip
1815 elif field == "tags":
1816 val = list(node.GetTags())
1817 elif field == "serial_no":
1818 val = node.serial_no
1819 elif field == "master_candidate":
1820 val = node.master_candidate
1821 elif field == "master":
1822 val = node.name == master_node
1823 elif field == "offline":
1825 elif self._FIELDS_DYNAMIC.Matches(field):
1826 val = live_data[node.name].get(field, None)
1828 raise errors.ParameterError(field)
1829 node_output.append(val)
1830 output.append(node_output)
1835 class LUQueryNodeVolumes(NoHooksLU):
1836 """Logical unit for getting volumes on node(s).
1839 _OP_REQP = ["nodes", "output_fields"]
1841 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1842 _FIELDS_STATIC = utils.FieldSet("node")
1844 def ExpandNames(self):
1845 _CheckOutputFields(static=self._FIELDS_STATIC,
1846 dynamic=self._FIELDS_DYNAMIC,
1847 selected=self.op.output_fields)
1849 self.needed_locks = {}
1850 self.share_locks[locking.LEVEL_NODE] = 1
1851 if not self.op.nodes:
1852 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1854 self.needed_locks[locking.LEVEL_NODE] = \
1855 _GetWantedNodes(self, self.op.nodes)
1857 def CheckPrereq(self):
1858 """Check prerequisites.
1860 This checks that the fields required are valid output fields.
1863 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1865 def Exec(self, feedback_fn):
1866 """Computes the list of nodes and their attributes.
1869 nodenames = self.nodes
1870 volumes = self.rpc.call_node_volumes(nodenames)
1872 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1873 in self.cfg.GetInstanceList()]
1875 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1878 for node in nodenames:
1879 if node not in volumes or volumes[node].failed or not volumes[node].data:
1882 node_vols = volumes[node].data[:]
1883 node_vols.sort(key=lambda vol: vol['dev'])
1885 for vol in node_vols:
1887 for field in self.op.output_fields:
1890 elif field == "phys":
1894 elif field == "name":
1896 elif field == "size":
1897 val = int(float(vol['size']))
1898 elif field == "instance":
1900 if node not in lv_by_node[inst]:
1902 if vol['name'] in lv_by_node[inst][node]:
1908 raise errors.ParameterError(field)
1909 node_output.append(str(val))
1911 output.append(node_output)
1916 class LUAddNode(LogicalUnit):
1917 """Logical unit for adding node to the cluster.
1921 HTYPE = constants.HTYPE_NODE
1922 _OP_REQP = ["node_name"]
1924 def BuildHooksEnv(self):
1927 This will run on all nodes before, and on all nodes + the new node after.
1931 "OP_TARGET": self.op.node_name,
1932 "NODE_NAME": self.op.node_name,
1933 "NODE_PIP": self.op.primary_ip,
1934 "NODE_SIP": self.op.secondary_ip,
1936 nodes_0 = self.cfg.GetNodeList()
1937 nodes_1 = nodes_0 + [self.op.node_name, ]
1938 return env, nodes_0, nodes_1
1940 def CheckPrereq(self):
1941 """Check prerequisites.
1944 - the new node is not already in the config
1946 - its parameters (single/dual homed) matches the cluster
1948 Any errors are signalled by raising errors.OpPrereqError.
1951 node_name = self.op.node_name
1954 dns_data = utils.HostInfo(node_name)
1956 node = dns_data.name
1957 primary_ip = self.op.primary_ip = dns_data.ip
1958 secondary_ip = getattr(self.op, "secondary_ip", None)
1959 if secondary_ip is None:
1960 secondary_ip = primary_ip
1961 if not utils.IsValidIP(secondary_ip):
1962 raise errors.OpPrereqError("Invalid secondary IP given")
1963 self.op.secondary_ip = secondary_ip
1965 node_list = cfg.GetNodeList()
1966 if not self.op.readd and node in node_list:
1967 raise errors.OpPrereqError("Node %s is already in the configuration" %
1969 elif self.op.readd and node not in node_list:
1970 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1972 for existing_node_name in node_list:
1973 existing_node = cfg.GetNodeInfo(existing_node_name)
1975 if self.op.readd and node == existing_node_name:
1976 if (existing_node.primary_ip != primary_ip or
1977 existing_node.secondary_ip != secondary_ip):
1978 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1979 " address configuration as before")
1982 if (existing_node.primary_ip == primary_ip or
1983 existing_node.secondary_ip == primary_ip or
1984 existing_node.primary_ip == secondary_ip or
1985 existing_node.secondary_ip == secondary_ip):
1986 raise errors.OpPrereqError("New node ip address(es) conflict with"
1987 " existing node %s" % existing_node.name)
1989 # check that the type of the node (single versus dual homed) is the
1990 # same as for the master
1991 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1992 master_singlehomed = myself.secondary_ip == myself.primary_ip
1993 newbie_singlehomed = secondary_ip == primary_ip
1994 if master_singlehomed != newbie_singlehomed:
1995 if master_singlehomed:
1996 raise errors.OpPrereqError("The master has no private ip but the"
1997 " new node has one")
1999 raise errors.OpPrereqError("The master has a private ip but the"
2000 " new node doesn't have one")
2002 # checks reachablity
2003 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2004 raise errors.OpPrereqError("Node not reachable by ping")
2006 if not newbie_singlehomed:
2007 # check reachability from my secondary ip to newbie's secondary ip
2008 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2009 source=myself.secondary_ip):
2010 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2011 " based ping to noded port")
2013 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2014 mc_now, _ = self.cfg.GetMasterCandidateStats()
2015 master_candidate = mc_now < cp_size
2017 self.new_node = objects.Node(name=node,
2018 primary_ip=primary_ip,
2019 secondary_ip=secondary_ip,
2020 master_candidate=master_candidate,
2023 def Exec(self, feedback_fn):
2024 """Adds the new node to the cluster.
2027 new_node = self.new_node
2028 node = new_node.name
2030 # check connectivity
2031 result = self.rpc.call_version([node])[node]
2034 if constants.PROTOCOL_VERSION == result.data:
2035 logging.info("Communication to node %s fine, sw version %s match",
2038 raise errors.OpExecError("Version mismatch master version %s,"
2039 " node version %s" %
2040 (constants.PROTOCOL_VERSION, result.data))
2042 raise errors.OpExecError("Cannot get version from the new node")
2045 logging.info("Copy ssh key to node %s", node)
2046 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2048 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2049 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2055 keyarray.append(f.read())
2059 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2061 keyarray[3], keyarray[4], keyarray[5])
2063 if result.failed or not result.data:
2064 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2066 # Add node to our /etc/hosts, and add key to known_hosts
2067 utils.AddHostToEtcHosts(new_node.name)
2069 if new_node.secondary_ip != new_node.primary_ip:
2070 result = self.rpc.call_node_has_ip_address(new_node.name,
2071 new_node.secondary_ip)
2072 if result.failed or not result.data:
2073 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2074 " you gave (%s). Please fix and re-run this"
2075 " command." % new_node.secondary_ip)
2077 node_verify_list = [self.cfg.GetMasterNode()]
2078 node_verify_param = {
2080 # TODO: do a node-net-test as well?
2083 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2084 self.cfg.GetClusterName())
2085 for verifier in node_verify_list:
2086 if result[verifier].failed or not result[verifier].data:
2087 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2088 " for remote verification" % verifier)
2089 if result[verifier].data['nodelist']:
2090 for failed in result[verifier].data['nodelist']:
2091 feedback_fn("ssh/hostname verification failed %s -> %s" %
2092 (verifier, result[verifier]['nodelist'][failed]))
2093 raise errors.OpExecError("ssh/hostname verification failed.")
2095 # Distribute updated /etc/hosts and known_hosts to all nodes,
2096 # including the node just added
2097 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2098 dist_nodes = self.cfg.GetNodeList()
2099 if not self.op.readd:
2100 dist_nodes.append(node)
2101 if myself.name in dist_nodes:
2102 dist_nodes.remove(myself.name)
2104 logging.debug("Copying hosts and known_hosts to all nodes")
2105 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2106 result = self.rpc.call_upload_file(dist_nodes, fname)
2107 for to_node, to_result in result.iteritems():
2108 if to_result.failed or not to_result.data:
2109 logging.error("Copy of file %s to node %s failed", fname, to_node)
2112 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2113 to_copy.append(constants.VNC_PASSWORD_FILE)
2114 for fname in to_copy:
2115 result = self.rpc.call_upload_file([node], fname)
2116 if result[node].failed or not result[node]:
2117 logging.error("Could not copy file %s to node %s", fname, node)
2120 self.context.ReaddNode(new_node)
2122 self.context.AddNode(new_node)
2125 class LUSetNodeParams(LogicalUnit):
2126 """Modifies the parameters of a node.
2129 HPATH = "node-modify"
2130 HTYPE = constants.HTYPE_NODE
2131 _OP_REQP = ["node_name"]
2134 def CheckArguments(self):
2135 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2136 if node_name is None:
2137 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2138 self.op.node_name = node_name
2139 _CheckBooleanOpField(self.op, 'master_candidate')
2140 _CheckBooleanOpField(self.op, 'offline')
2141 if self.op.master_candidate is None and self.op.offline is None:
2142 raise errors.OpPrereqError("Please pass at least one modification")
2143 if self.op.offline == True and self.op.master_candidate == True:
2144 raise errors.OpPrereqError("Can't set the node into offline and"
2145 " master_candidate at the same time")
2147 def ExpandNames(self):
2148 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2150 def BuildHooksEnv(self):
2153 This runs on the master node.
2157 "OP_TARGET": self.op.node_name,
2158 "MASTER_CANDIDATE": str(self.op.master_candidate),
2159 "OFFLINE": str(self.op.offline),
2161 nl = [self.cfg.GetMasterNode(),
2165 def CheckPrereq(self):
2166 """Check prerequisites.
2168 This only checks the instance list against the existing names.
2171 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2173 if ((self.op.master_candidate == False or self.op.offline == True)
2174 and node.master_candidate):
2175 # we will demote the node from master_candidate
2176 if self.op.node_name == self.cfg.GetMasterNode():
2177 raise errors.OpPrereqError("The master node has to be a"
2178 " master candidate and online")
2179 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2180 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2181 if num_candidates <= cp_size:
2182 msg = ("Not enough master candidates (desired"
2183 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2185 self.LogWarning(msg)
2187 raise errors.OpPrereqError(msg)
2189 if (self.op.master_candidate == True and node.offline and
2190 not self.op.offline == False):
2191 raise errors.OpPrereqError("Can't set an offline node to"
2192 " master_candidate")
2196 def Exec(self, feedback_fn):
2204 if self.op.offline is not None:
2205 node.offline = self.op.offline
2206 result.append(("offline", str(self.op.offline)))
2207 if self.op.offline == True and node.master_candidate:
2208 node.master_candidate = False
2209 result.append(("master_candidate", "auto-demotion due to offline"))
2211 if self.op.master_candidate is not None:
2212 node.master_candidate = self.op.master_candidate
2213 result.append(("master_candidate", str(self.op.master_candidate)))
2214 if self.op.master_candidate == False:
2215 rrc = self.rpc.call_node_demote_from_mc(node.name)
2216 if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2217 or len(rrc.data) != 2):
2218 self.LogWarning("Node rpc error: %s" % rrc.error)
2219 elif not rrc.data[0]:
2220 self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2222 # this will trigger configuration file update, if needed
2223 self.cfg.Update(node)
2224 # this will trigger job queue propagation or cleanup
2225 if self.op.node_name != self.cfg.GetMasterNode():
2226 self.context.ReaddNode(node)
2231 class LUQueryClusterInfo(NoHooksLU):
2232 """Query cluster configuration.
2238 def ExpandNames(self):
2239 self.needed_locks = {}
2241 def CheckPrereq(self):
2242 """No prerequsites needed for this LU.
2247 def Exec(self, feedback_fn):
2248 """Return cluster config.
2251 cluster = self.cfg.GetClusterInfo()
2253 "software_version": constants.RELEASE_VERSION,
2254 "protocol_version": constants.PROTOCOL_VERSION,
2255 "config_version": constants.CONFIG_VERSION,
2256 "os_api_version": constants.OS_API_VERSION,
2257 "export_version": constants.EXPORT_VERSION,
2258 "architecture": (platform.architecture()[0], platform.machine()),
2259 "name": cluster.cluster_name,
2260 "master": cluster.master_node,
2261 "default_hypervisor": cluster.default_hypervisor,
2262 "enabled_hypervisors": cluster.enabled_hypervisors,
2263 "hvparams": cluster.hvparams,
2264 "beparams": cluster.beparams,
2265 "candidate_pool_size": cluster.candidate_pool_size,
2271 class LUQueryConfigValues(NoHooksLU):
2272 """Return configuration values.
2277 _FIELDS_DYNAMIC = utils.FieldSet()
2278 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2280 def ExpandNames(self):
2281 self.needed_locks = {}
2283 _CheckOutputFields(static=self._FIELDS_STATIC,
2284 dynamic=self._FIELDS_DYNAMIC,
2285 selected=self.op.output_fields)
2287 def CheckPrereq(self):
2288 """No prerequisites.
2293 def Exec(self, feedback_fn):
2294 """Dump a representation of the cluster config to the standard output.
2298 for field in self.op.output_fields:
2299 if field == "cluster_name":
2300 entry = self.cfg.GetClusterName()
2301 elif field == "master_node":
2302 entry = self.cfg.GetMasterNode()
2303 elif field == "drain_flag":
2304 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2306 raise errors.ParameterError(field)
2307 values.append(entry)
2311 class LUActivateInstanceDisks(NoHooksLU):
2312 """Bring up an instance's disks.
2315 _OP_REQP = ["instance_name"]
2318 def ExpandNames(self):
2319 self._ExpandAndLockInstance()
2320 self.needed_locks[locking.LEVEL_NODE] = []
2321 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2323 def DeclareLocks(self, level):
2324 if level == locking.LEVEL_NODE:
2325 self._LockInstancesNodes()
2327 def CheckPrereq(self):
2328 """Check prerequisites.
2330 This checks that the instance is in the cluster.
2333 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2334 assert self.instance is not None, \
2335 "Cannot retrieve locked instance %s" % self.op.instance_name
2336 _CheckNodeOnline(self, self.instance.primary_node)
2338 def Exec(self, feedback_fn):
2339 """Activate the disks.
2342 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2344 raise errors.OpExecError("Cannot activate block devices")
2349 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2350 """Prepare the block devices for an instance.
2352 This sets up the block devices on all nodes.
2354 @type lu: L{LogicalUnit}
2355 @param lu: the logical unit on whose behalf we execute
2356 @type instance: L{objects.Instance}
2357 @param instance: the instance for whose disks we assemble
2358 @type ignore_secondaries: boolean
2359 @param ignore_secondaries: if true, errors on secondary nodes
2360 won't result in an error return from the function
2361 @return: False if the operation failed, otherwise a list of
2362 (host, instance_visible_name, node_visible_name)
2363 with the mapping from node devices to instance devices
2368 iname = instance.name
2369 # With the two passes mechanism we try to reduce the window of
2370 # opportunity for the race condition of switching DRBD to primary
2371 # before handshaking occured, but we do not eliminate it
2373 # The proper fix would be to wait (with some limits) until the
2374 # connection has been made and drbd transitions from WFConnection
2375 # into any other network-connected state (Connected, SyncTarget,
2378 # 1st pass, assemble on all nodes in secondary mode
2379 for inst_disk in instance.disks:
2380 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2381 lu.cfg.SetDiskID(node_disk, node)
2382 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2383 if result.failed or not result:
2384 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2385 " (is_primary=False, pass=1)",
2386 inst_disk.iv_name, node)
2387 if not ignore_secondaries:
2390 # FIXME: race condition on drbd migration to primary
2392 # 2nd pass, do only the primary node
2393 for inst_disk in instance.disks:
2394 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2395 if node != instance.primary_node:
2397 lu.cfg.SetDiskID(node_disk, node)
2398 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2399 if result.failed or not result:
2400 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2401 " (is_primary=True, pass=2)",
2402 inst_disk.iv_name, node)
2404 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2406 # leave the disks configured for the primary node
2407 # this is a workaround that would be fixed better by
2408 # improving the logical/physical id handling
2409 for disk in instance.disks:
2410 lu.cfg.SetDiskID(disk, instance.primary_node)
2412 return disks_ok, device_info
2415 def _StartInstanceDisks(lu, instance, force):
2416 """Start the disks of an instance.
2419 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2420 ignore_secondaries=force)
2422 _ShutdownInstanceDisks(lu, instance)
2423 if force is not None and not force:
2424 lu.proc.LogWarning("", hint="If the message above refers to a"
2426 " you can retry the operation using '--force'.")
2427 raise errors.OpExecError("Disk consistency error")
2430 class LUDeactivateInstanceDisks(NoHooksLU):
2431 """Shutdown an instance's disks.
2434 _OP_REQP = ["instance_name"]
2437 def ExpandNames(self):
2438 self._ExpandAndLockInstance()
2439 self.needed_locks[locking.LEVEL_NODE] = []
2440 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2442 def DeclareLocks(self, level):
2443 if level == locking.LEVEL_NODE:
2444 self._LockInstancesNodes()
2446 def CheckPrereq(self):
2447 """Check prerequisites.
2449 This checks that the instance is in the cluster.
2452 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2453 assert self.instance is not None, \
2454 "Cannot retrieve locked instance %s" % self.op.instance_name
2456 def Exec(self, feedback_fn):
2457 """Deactivate the disks
2460 instance = self.instance
2461 _SafeShutdownInstanceDisks(self, instance)
2464 def _SafeShutdownInstanceDisks(lu, instance):
2465 """Shutdown block devices of an instance.
2467 This function checks if an instance is running, before calling
2468 _ShutdownInstanceDisks.
2471 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2472 [instance.hypervisor])
2473 ins_l = ins_l[instance.primary_node]
2474 if ins_l.failed or not isinstance(ins_l.data, list):
2475 raise errors.OpExecError("Can't contact node '%s'" %
2476 instance.primary_node)
2478 if instance.name in ins_l.data:
2479 raise errors.OpExecError("Instance is running, can't shutdown"
2482 _ShutdownInstanceDisks(lu, instance)
2485 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2486 """Shutdown block devices of an instance.
2488 This does the shutdown on all nodes of the instance.
2490 If the ignore_primary is false, errors on the primary node are
2495 for disk in instance.disks:
2496 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2497 lu.cfg.SetDiskID(top_disk, node)
2498 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2499 if result.failed or not result.data:
2500 logging.error("Could not shutdown block device %s on node %s",
2502 if not ignore_primary or node != instance.primary_node:
2507 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2508 """Checks if a node has enough free memory.
2510 This function check if a given node has the needed amount of free
2511 memory. In case the node has less memory or we cannot get the
2512 information from the node, this function raise an OpPrereqError
2515 @type lu: C{LogicalUnit}
2516 @param lu: a logical unit from which we get configuration data
2518 @param node: the node to check
2519 @type reason: C{str}
2520 @param reason: string to use in the error message
2521 @type requested: C{int}
2522 @param requested: the amount of memory in MiB to check for
2523 @type hypervisor_name: C{str}
2524 @param hypervisor_name: the hypervisor to ask for memory stats
2525 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2526 we cannot check the node
2529 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2530 nodeinfo[node].Raise()
2531 free_mem = nodeinfo[node].data.get('memory_free')
2532 if not isinstance(free_mem, int):
2533 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2534 " was '%s'" % (node, free_mem))
2535 if requested > free_mem:
2536 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2537 " needed %s MiB, available %s MiB" %
2538 (node, reason, requested, free_mem))
2541 class LUStartupInstance(LogicalUnit):
2542 """Starts an instance.
2545 HPATH = "instance-start"
2546 HTYPE = constants.HTYPE_INSTANCE
2547 _OP_REQP = ["instance_name", "force"]
2550 def ExpandNames(self):
2551 self._ExpandAndLockInstance()
2553 def BuildHooksEnv(self):
2556 This runs on master, primary and secondary nodes of the instance.
2560 "FORCE": self.op.force,
2562 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2563 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2564 list(self.instance.secondary_nodes))
2567 def CheckPrereq(self):
2568 """Check prerequisites.
2570 This checks that the instance is in the cluster.
2573 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2574 assert self.instance is not None, \
2575 "Cannot retrieve locked instance %s" % self.op.instance_name
2577 _CheckNodeOnline(self, instance.primary_node)
2579 bep = self.cfg.GetClusterInfo().FillBE(instance)
2580 # check bridges existance
2581 _CheckInstanceBridgesExist(self, instance)
2583 _CheckNodeFreeMemory(self, instance.primary_node,
2584 "starting instance %s" % instance.name,
2585 bep[constants.BE_MEMORY], instance.hypervisor)
2587 def Exec(self, feedback_fn):
2588 """Start the instance.
2591 instance = self.instance
2592 force = self.op.force
2593 extra_args = getattr(self.op, "extra_args", "")
2595 self.cfg.MarkInstanceUp(instance.name)
2597 node_current = instance.primary_node
2599 _StartInstanceDisks(self, instance, force)
2601 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2602 if result.failed or not result.data:
2603 _ShutdownInstanceDisks(self, instance)
2604 raise errors.OpExecError("Could not start instance")
2607 class LURebootInstance(LogicalUnit):
2608 """Reboot an instance.
2611 HPATH = "instance-reboot"
2612 HTYPE = constants.HTYPE_INSTANCE
2613 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2616 def ExpandNames(self):
2617 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2618 constants.INSTANCE_REBOOT_HARD,
2619 constants.INSTANCE_REBOOT_FULL]:
2620 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2621 (constants.INSTANCE_REBOOT_SOFT,
2622 constants.INSTANCE_REBOOT_HARD,
2623 constants.INSTANCE_REBOOT_FULL))
2624 self._ExpandAndLockInstance()
2626 def BuildHooksEnv(self):
2629 This runs on master, primary and secondary nodes of the instance.
2633 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2635 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2636 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2637 list(self.instance.secondary_nodes))
2640 def CheckPrereq(self):
2641 """Check prerequisites.
2643 This checks that the instance is in the cluster.
2646 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2647 assert self.instance is not None, \
2648 "Cannot retrieve locked instance %s" % self.op.instance_name
2650 _CheckNodeOnline(self, instance.primary_node)
2652 # check bridges existance
2653 _CheckInstanceBridgesExist(self, instance)
2655 def Exec(self, feedback_fn):
2656 """Reboot the instance.
2659 instance = self.instance
2660 ignore_secondaries = self.op.ignore_secondaries
2661 reboot_type = self.op.reboot_type
2662 extra_args = getattr(self.op, "extra_args", "")
2664 node_current = instance.primary_node
2666 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2667 constants.INSTANCE_REBOOT_HARD]:
2668 result = self.rpc.call_instance_reboot(node_current, instance,
2669 reboot_type, extra_args)
2670 if result.failed or not result.data:
2671 raise errors.OpExecError("Could not reboot instance")
2673 if not self.rpc.call_instance_shutdown(node_current, instance):
2674 raise errors.OpExecError("could not shutdown instance for full reboot")
2675 _ShutdownInstanceDisks(self, instance)
2676 _StartInstanceDisks(self, instance, ignore_secondaries)
2677 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2678 if result.failed or not result.data:
2679 _ShutdownInstanceDisks(self, instance)
2680 raise errors.OpExecError("Could not start instance for full reboot")
2682 self.cfg.MarkInstanceUp(instance.name)
2685 class LUShutdownInstance(LogicalUnit):
2686 """Shutdown an instance.
2689 HPATH = "instance-stop"
2690 HTYPE = constants.HTYPE_INSTANCE
2691 _OP_REQP = ["instance_name"]
2694 def ExpandNames(self):
2695 self._ExpandAndLockInstance()
2697 def BuildHooksEnv(self):
2700 This runs on master, primary and secondary nodes of the instance.
2703 env = _BuildInstanceHookEnvByObject(self, self.instance)
2704 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2705 list(self.instance.secondary_nodes))
2708 def CheckPrereq(self):
2709 """Check prerequisites.
2711 This checks that the instance is in the cluster.
2714 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2715 assert self.instance is not None, \
2716 "Cannot retrieve locked instance %s" % self.op.instance_name
2717 _CheckNodeOnline(self, self.instance.primary_node)
2719 def Exec(self, feedback_fn):
2720 """Shutdown the instance.
2723 instance = self.instance
2724 node_current = instance.primary_node
2725 self.cfg.MarkInstanceDown(instance.name)
2726 result = self.rpc.call_instance_shutdown(node_current, instance)
2727 if result.failed or not result.data:
2728 self.proc.LogWarning("Could not shutdown instance")
2730 _ShutdownInstanceDisks(self, instance)
2733 class LUReinstallInstance(LogicalUnit):
2734 """Reinstall an instance.
2737 HPATH = "instance-reinstall"
2738 HTYPE = constants.HTYPE_INSTANCE
2739 _OP_REQP = ["instance_name"]
2742 def ExpandNames(self):
2743 self._ExpandAndLockInstance()
2745 def BuildHooksEnv(self):
2748 This runs on master, primary and secondary nodes of the instance.
2751 env = _BuildInstanceHookEnvByObject(self, self.instance)
2752 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2753 list(self.instance.secondary_nodes))
2756 def CheckPrereq(self):
2757 """Check prerequisites.
2759 This checks that the instance is in the cluster and is not running.
2762 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2763 assert instance is not None, \
2764 "Cannot retrieve locked instance %s" % self.op.instance_name
2765 _CheckNodeOnline(self, instance.primary_node)
2767 if instance.disk_template == constants.DT_DISKLESS:
2768 raise errors.OpPrereqError("Instance '%s' has no disks" %
2769 self.op.instance_name)
2770 if instance.status != "down":
2771 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2772 self.op.instance_name)
2773 remote_info = self.rpc.call_instance_info(instance.primary_node,
2775 instance.hypervisor)
2776 if remote_info.failed or remote_info.data:
2777 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2778 (self.op.instance_name,
2779 instance.primary_node))
2781 self.op.os_type = getattr(self.op, "os_type", None)
2782 if self.op.os_type is not None:
2784 pnode = self.cfg.GetNodeInfo(
2785 self.cfg.ExpandNodeName(instance.primary_node))
2787 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2789 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2791 if not isinstance(result.data, objects.OS):
2792 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2793 " primary node" % self.op.os_type)
2795 self.instance = instance
2797 def Exec(self, feedback_fn):
2798 """Reinstall the instance.
2801 inst = self.instance
2803 if self.op.os_type is not None:
2804 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2805 inst.os = self.op.os_type
2806 self.cfg.Update(inst)
2808 _StartInstanceDisks(self, inst, None)
2810 feedback_fn("Running the instance OS create scripts...")
2811 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2814 raise errors.OpExecError("Could not install OS for instance %s"
2816 (inst.name, inst.primary_node))
2818 _ShutdownInstanceDisks(self, inst)
2821 class LURenameInstance(LogicalUnit):
2822 """Rename an instance.
2825 HPATH = "instance-rename"
2826 HTYPE = constants.HTYPE_INSTANCE
2827 _OP_REQP = ["instance_name", "new_name"]
2829 def BuildHooksEnv(self):
2832 This runs on master, primary and secondary nodes of the instance.
2835 env = _BuildInstanceHookEnvByObject(self, self.instance)
2836 env["INSTANCE_NEW_NAME"] = self.op.new_name
2837 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2838 list(self.instance.secondary_nodes))
2841 def CheckPrereq(self):
2842 """Check prerequisites.
2844 This checks that the instance is in the cluster and is not running.
2847 instance = self.cfg.GetInstanceInfo(
2848 self.cfg.ExpandInstanceName(self.op.instance_name))
2849 if instance is None:
2850 raise errors.OpPrereqError("Instance '%s' not known" %
2851 self.op.instance_name)
2852 _CheckNodeOnline(self, instance.primary_node)
2854 if instance.status != "down":
2855 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2856 self.op.instance_name)
2857 remote_info = self.rpc.call_instance_info(instance.primary_node,
2859 instance.hypervisor)
2861 if remote_info.data:
2862 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2863 (self.op.instance_name,
2864 instance.primary_node))
2865 self.instance = instance
2867 # new name verification
2868 name_info = utils.HostInfo(self.op.new_name)
2870 self.op.new_name = new_name = name_info.name
2871 instance_list = self.cfg.GetInstanceList()
2872 if new_name in instance_list:
2873 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2876 if not getattr(self.op, "ignore_ip", False):
2877 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2878 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2879 (name_info.ip, new_name))
2882 def Exec(self, feedback_fn):
2883 """Reinstall the instance.
2886 inst = self.instance
2887 old_name = inst.name
2889 if inst.disk_template == constants.DT_FILE:
2890 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2892 self.cfg.RenameInstance(inst.name, self.op.new_name)
2893 # Change the instance lock. This is definitely safe while we hold the BGL
2894 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2895 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2897 # re-read the instance from the configuration after rename
2898 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2900 if inst.disk_template == constants.DT_FILE:
2901 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2902 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2903 old_file_storage_dir,
2904 new_file_storage_dir)
2907 raise errors.OpExecError("Could not connect to node '%s' to rename"
2908 " directory '%s' to '%s' (but the instance"
2909 " has been renamed in Ganeti)" % (
2910 inst.primary_node, old_file_storage_dir,
2911 new_file_storage_dir))
2913 if not result.data[0]:
2914 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2915 " (but the instance has been renamed in"
2916 " Ganeti)" % (old_file_storage_dir,
2917 new_file_storage_dir))
2919 _StartInstanceDisks(self, inst, None)
2921 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2923 if result.failed or not result.data:
2924 msg = ("Could not run OS rename script for instance %s on node %s"
2925 " (but the instance has been renamed in Ganeti)" %
2926 (inst.name, inst.primary_node))
2927 self.proc.LogWarning(msg)
2929 _ShutdownInstanceDisks(self, inst)
2932 class LURemoveInstance(LogicalUnit):
2933 """Remove an instance.
2936 HPATH = "instance-remove"
2937 HTYPE = constants.HTYPE_INSTANCE
2938 _OP_REQP = ["instance_name", "ignore_failures"]
2941 def ExpandNames(self):
2942 self._ExpandAndLockInstance()
2943 self.needed_locks[locking.LEVEL_NODE] = []
2944 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2946 def DeclareLocks(self, level):
2947 if level == locking.LEVEL_NODE:
2948 self._LockInstancesNodes()
2950 def BuildHooksEnv(self):
2953 This runs on master, primary and secondary nodes of the instance.
2956 env = _BuildInstanceHookEnvByObject(self, self.instance)
2957 nl = [self.cfg.GetMasterNode()]
2960 def CheckPrereq(self):
2961 """Check prerequisites.
2963 This checks that the instance is in the cluster.
2966 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2967 assert self.instance is not None, \
2968 "Cannot retrieve locked instance %s" % self.op.instance_name
2970 def Exec(self, feedback_fn):
2971 """Remove the instance.
2974 instance = self.instance
2975 logging.info("Shutting down instance %s on node %s",
2976 instance.name, instance.primary_node)
2978 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2979 if result.failed or not result.data:
2980 if self.op.ignore_failures:
2981 feedback_fn("Warning: can't shutdown instance")
2983 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2984 (instance.name, instance.primary_node))
2986 logging.info("Removing block devices for instance %s", instance.name)
2988 if not _RemoveDisks(self, instance):
2989 if self.op.ignore_failures:
2990 feedback_fn("Warning: can't remove instance's disks")
2992 raise errors.OpExecError("Can't remove instance's disks")
2994 logging.info("Removing instance %s out of cluster config", instance.name)
2996 self.cfg.RemoveInstance(instance.name)
2997 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3000 class LUQueryInstances(NoHooksLU):
3001 """Logical unit for querying instances.
3004 _OP_REQP = ["output_fields", "names"]
3006 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3007 "admin_state", "admin_ram",
3008 "disk_template", "ip", "mac", "bridge",
3009 "sda_size", "sdb_size", "vcpus", "tags",
3010 "network_port", "beparams",
3011 "(disk).(size)/([0-9]+)",
3013 "(nic).(mac|ip|bridge)/([0-9]+)",
3014 "(nic).(macs|ips|bridges)",
3015 "(disk|nic).(count)",
3016 "serial_no", "hypervisor", "hvparams",] +
3018 for name in constants.HVS_PARAMETERS] +
3020 for name in constants.BES_PARAMETERS])
3021 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3024 def ExpandNames(self):
3025 _CheckOutputFields(static=self._FIELDS_STATIC,
3026 dynamic=self._FIELDS_DYNAMIC,
3027 selected=self.op.output_fields)
3029 self.needed_locks = {}
3030 self.share_locks[locking.LEVEL_INSTANCE] = 1
3031 self.share_locks[locking.LEVEL_NODE] = 1
3034 self.wanted = _GetWantedInstances(self, self.op.names)
3036 self.wanted = locking.ALL_SET
3038 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3040 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3041 self.needed_locks[locking.LEVEL_NODE] = []
3042 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3044 def DeclareLocks(self, level):
3045 if level == locking.LEVEL_NODE and self.do_locking:
3046 self._LockInstancesNodes()
3048 def CheckPrereq(self):
3049 """Check prerequisites.
3054 def Exec(self, feedback_fn):
3055 """Computes the list of nodes and their attributes.
3058 all_info = self.cfg.GetAllInstancesInfo()
3060 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3061 elif self.wanted != locking.ALL_SET:
3062 instance_names = self.wanted
3063 missing = set(instance_names).difference(all_info.keys())
3065 raise errors.OpExecError(
3066 "Some instances were removed before retrieving their data: %s"
3069 instance_names = all_info.keys()
3071 instance_names = utils.NiceSort(instance_names)
3072 instance_list = [all_info[iname] for iname in instance_names]
3074 # begin data gathering
3076 nodes = frozenset([inst.primary_node for inst in instance_list])
3077 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3083 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3085 result = node_data[name]
3087 # offline nodes will be in both lists
3088 off_nodes.append(name)
3090 bad_nodes.append(name)
3093 live_data.update(result.data)
3094 # else no instance is alive
3096 live_data = dict([(name, {}) for name in instance_names])
3098 # end data gathering
3103 for instance in instance_list:
3105 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3106 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3107 for field in self.op.output_fields:
3108 st_match = self._FIELDS_STATIC.Matches(field)
3113 elif field == "pnode":
3114 val = instance.primary_node
3115 elif field == "snodes":
3116 val = list(instance.secondary_nodes)
3117 elif field == "admin_state":
3118 val = (instance.status != "down")
3119 elif field == "oper_state":
3120 if instance.primary_node in bad_nodes:
3123 val = bool(live_data.get(instance.name))
3124 elif field == "status":
3125 if instance.primary_node in off_nodes:
3126 val = "ERROR_nodeoffline"
3127 elif instance.primary_node in bad_nodes:
3128 val = "ERROR_nodedown"
3130 running = bool(live_data.get(instance.name))
3132 if instance.status != "down":
3137 if instance.status != "down":
3141 elif field == "oper_ram":
3142 if instance.primary_node in bad_nodes:
3144 elif instance.name in live_data:
3145 val = live_data[instance.name].get("memory", "?")
3148 elif field == "disk_template":
3149 val = instance.disk_template
3151 val = instance.nics[0].ip
3152 elif field == "bridge":
3153 val = instance.nics[0].bridge
3154 elif field == "mac":
3155 val = instance.nics[0].mac
3156 elif field == "sda_size" or field == "sdb_size":
3157 idx = ord(field[2]) - ord('a')
3159 val = instance.FindDisk(idx).size
3160 except errors.OpPrereqError:
3162 elif field == "tags":
3163 val = list(instance.GetTags())
3164 elif field == "serial_no":
3165 val = instance.serial_no
3166 elif field == "network_port":
3167 val = instance.network_port
3168 elif field == "hypervisor":
3169 val = instance.hypervisor
3170 elif field == "hvparams":
3172 elif (field.startswith(HVPREFIX) and
3173 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3174 val = i_hv.get(field[len(HVPREFIX):], None)
3175 elif field == "beparams":
3177 elif (field.startswith(BEPREFIX) and
3178 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3179 val = i_be.get(field[len(BEPREFIX):], None)
3180 elif st_match and st_match.groups():
3181 # matches a variable list
3182 st_groups = st_match.groups()
3183 if st_groups and st_groups[0] == "disk":
3184 if st_groups[1] == "count":
3185 val = len(instance.disks)
3186 elif st_groups[1] == "sizes":
3187 val = [disk.size for disk in instance.disks]
3188 elif st_groups[1] == "size":
3190 val = instance.FindDisk(st_groups[2]).size
3191 except errors.OpPrereqError:
3194 assert False, "Unhandled disk parameter"
3195 elif st_groups[0] == "nic":
3196 if st_groups[1] == "count":
3197 val = len(instance.nics)
3198 elif st_groups[1] == "macs":
3199 val = [nic.mac for nic in instance.nics]
3200 elif st_groups[1] == "ips":
3201 val = [nic.ip for nic in instance.nics]
3202 elif st_groups[1] == "bridges":
3203 val = [nic.bridge for nic in instance.nics]
3206 nic_idx = int(st_groups[2])
3207 if nic_idx >= len(instance.nics):
3210 if st_groups[1] == "mac":
3211 val = instance.nics[nic_idx].mac
3212 elif st_groups[1] == "ip":
3213 val = instance.nics[nic_idx].ip
3214 elif st_groups[1] == "bridge":
3215 val = instance.nics[nic_idx].bridge
3217 assert False, "Unhandled NIC parameter"
3219 assert False, "Unhandled variable parameter"
3221 raise errors.ParameterError(field)
3228 class LUFailoverInstance(LogicalUnit):
3229 """Failover an instance.
3232 HPATH = "instance-failover"
3233 HTYPE = constants.HTYPE_INSTANCE
3234 _OP_REQP = ["instance_name", "ignore_consistency"]
3237 def ExpandNames(self):
3238 self._ExpandAndLockInstance()
3239 self.needed_locks[locking.LEVEL_NODE] = []
3240 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3242 def DeclareLocks(self, level):
3243 if level == locking.LEVEL_NODE:
3244 self._LockInstancesNodes()
3246 def BuildHooksEnv(self):
3249 This runs on master, primary and secondary nodes of the instance.
3253 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3255 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3256 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3259 def CheckPrereq(self):
3260 """Check prerequisites.
3262 This checks that the instance is in the cluster.
3265 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3266 assert self.instance is not None, \
3267 "Cannot retrieve locked instance %s" % self.op.instance_name
3269 bep = self.cfg.GetClusterInfo().FillBE(instance)
3270 if instance.disk_template not in constants.DTS_NET_MIRROR:
3271 raise errors.OpPrereqError("Instance's disk layout is not"
3272 " network mirrored, cannot failover.")
3274 secondary_nodes = instance.secondary_nodes
3275 if not secondary_nodes:
3276 raise errors.ProgrammerError("no secondary node but using "
3277 "a mirrored disk template")
3279 target_node = secondary_nodes[0]
3280 _CheckNodeOnline(self, target_node)
3281 # check memory requirements on the secondary node
3282 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3283 instance.name, bep[constants.BE_MEMORY],
3284 instance.hypervisor)
3286 # check bridge existance
3287 brlist = [nic.bridge for nic in instance.nics]
3288 result = self.rpc.call_bridges_exist(target_node, brlist)
3291 raise errors.OpPrereqError("One or more target bridges %s does not"
3292 " exist on destination node '%s'" %
3293 (brlist, target_node))
3295 def Exec(self, feedback_fn):
3296 """Failover an instance.
3298 The failover is done by shutting it down on its present node and
3299 starting it on the secondary.
3302 instance = self.instance
3304 source_node = instance.primary_node
3305 target_node = instance.secondary_nodes[0]
3307 feedback_fn("* checking disk consistency between source and target")
3308 for dev in instance.disks:
3309 # for drbd, these are drbd over lvm
3310 if not _CheckDiskConsistency(self, dev, target_node, False):
3311 if instance.status == "up" and not self.op.ignore_consistency:
3312 raise errors.OpExecError("Disk %s is degraded on target node,"
3313 " aborting failover." % dev.iv_name)
3315 feedback_fn("* shutting down instance on source node")
3316 logging.info("Shutting down instance %s on node %s",
3317 instance.name, source_node)
3319 result = self.rpc.call_instance_shutdown(source_node, instance)
3320 if result.failed or not result.data:
3321 if self.op.ignore_consistency:
3322 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3324 " anyway. Please make sure node %s is down",
3325 instance.name, source_node, source_node)
3327 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3328 (instance.name, source_node))
3330 feedback_fn("* deactivating the instance's disks on source node")
3331 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3332 raise errors.OpExecError("Can't shut down the instance's disks.")
3334 instance.primary_node = target_node
3335 # distribute new instance config to the other nodes
3336 self.cfg.Update(instance)
3338 # Only start the instance if it's marked as up
3339 if instance.status == "up":
3340 feedback_fn("* activating the instance's disks on target node")
3341 logging.info("Starting instance %s on node %s",
3342 instance.name, target_node)
3344 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3345 ignore_secondaries=True)
3347 _ShutdownInstanceDisks(self, instance)
3348 raise errors.OpExecError("Can't activate the instance's disks")
3350 feedback_fn("* starting the instance on the target node")
3351 result = self.rpc.call_instance_start(target_node, instance, None)
3352 if result.failed or not result.data:
3353 _ShutdownInstanceDisks(self, instance)
3354 raise errors.OpExecError("Could not start instance %s on node %s." %
3355 (instance.name, target_node))
3358 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3359 """Create a tree of block devices on the primary node.
3361 This always creates all devices.
3365 for child in device.children:
3366 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3369 lu.cfg.SetDiskID(device, node)
3370 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3371 instance.name, True, info)
3372 if new_id.failed or not new_id.data:
3374 if device.physical_id is None:
3375 device.physical_id = new_id
3379 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3380 """Create a tree of block devices on a secondary node.
3382 If this device type has to be created on secondaries, create it and
3385 If not, just recurse to children keeping the same 'force' value.
3388 if device.CreateOnSecondary():
3391 for child in device.children:
3392 if not _CreateBlockDevOnSecondary(lu, node, instance,
3393 child, force, info):
3398 lu.cfg.SetDiskID(device, node)
3399 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3400 instance.name, False, info)
3401 if new_id.failed or not new_id.data:
3403 if device.physical_id is None:
3404 device.physical_id = new_id
3408 def _GenerateUniqueNames(lu, exts):
3409 """Generate a suitable LV name.
3411 This will generate a logical volume name for the given instance.
3416 new_id = lu.cfg.GenerateUniqueID()
3417 results.append("%s%s" % (new_id, val))
3421 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3423 """Generate a drbd8 device complete with its children.
3426 port = lu.cfg.AllocatePort()
3427 vgname = lu.cfg.GetVGName()
3428 shared_secret = lu.cfg.GenerateDRBDSecret()
3429 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3430 logical_id=(vgname, names[0]))
3431 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3432 logical_id=(vgname, names[1]))
3433 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3434 logical_id=(primary, secondary, port,
3437 children=[dev_data, dev_meta],
3442 def _GenerateDiskTemplate(lu, template_name,
3443 instance_name, primary_node,
3444 secondary_nodes, disk_info,
3445 file_storage_dir, file_driver,
3447 """Generate the entire disk layout for a given template type.
3450 #TODO: compute space requirements
3452 vgname = lu.cfg.GetVGName()
3453 disk_count = len(disk_info)
3455 if template_name == constants.DT_DISKLESS:
3457 elif template_name == constants.DT_PLAIN:
3458 if len(secondary_nodes) != 0:
3459 raise errors.ProgrammerError("Wrong template configuration")
3461 names = _GenerateUniqueNames(lu, [".disk%d" % i
3462 for i in range(disk_count)])
3463 for idx, disk in enumerate(disk_info):
3464 disk_index = idx + base_index
3465 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3466 logical_id=(vgname, names[idx]),
3467 iv_name="disk/%d" % disk_index)
3468 disks.append(disk_dev)
3469 elif template_name == constants.DT_DRBD8:
3470 if len(secondary_nodes) != 1:
3471 raise errors.ProgrammerError("Wrong template configuration")
3472 remote_node = secondary_nodes[0]
3473 minors = lu.cfg.AllocateDRBDMinor(
3474 [primary_node, remote_node] * len(disk_info), instance_name)
3476 names = _GenerateUniqueNames(lu,
3477 [".disk%d_%s" % (i, s)
3478 for i in range(disk_count)
3479 for s in ("data", "meta")
3481 for idx, disk in enumerate(disk_info):
3482 disk_index = idx + base_index
3483 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3484 disk["size"], names[idx*2:idx*2+2],
3485 "disk/%d" % disk_index,
3486 minors[idx*2], minors[idx*2+1])
3487 disks.append(disk_dev)
3488 elif template_name == constants.DT_FILE:
3489 if len(secondary_nodes) != 0:
3490 raise errors.ProgrammerError("Wrong template configuration")
3492 for idx, disk in enumerate(disk_info):
3493 disk_index = idx + base_index
3494 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3495 iv_name="disk/%d" % disk_index,
3496 logical_id=(file_driver,
3497 "%s/disk%d" % (file_storage_dir,
3499 disks.append(disk_dev)
3501 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3505 def _GetInstanceInfoText(instance):
3506 """Compute that text that should be added to the disk's metadata.
3509 return "originstname+%s" % instance.name
3512 def _CreateDisks(lu, instance):
3513 """Create all disks for an instance.
3515 This abstracts away some work from AddInstance.
3517 @type lu: L{LogicalUnit}
3518 @param lu: the logical unit on whose behalf we execute
3519 @type instance: L{objects.Instance}
3520 @param instance: the instance whose disks we should create
3522 @return: the success of the creation
3525 info = _GetInstanceInfoText(instance)
3527 if instance.disk_template == constants.DT_FILE:
3528 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3529 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3532 if result.failed or not result.data:
3533 logging.error("Could not connect to node '%s'", instance.primary_node)
3536 if not result.data[0]:
3537 logging.error("Failed to create directory '%s'", file_storage_dir)
3540 # Note: this needs to be kept in sync with adding of disks in
3541 # LUSetInstanceParams
3542 for device in instance.disks:
3543 logging.info("Creating volume %s for instance %s",
3544 device.iv_name, instance.name)
3546 for secondary_node in instance.secondary_nodes:
3547 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3548 device, False, info):
3549 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3550 device.iv_name, device, secondary_node)
3553 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3554 instance, device, info):
3555 logging.error("Failed to create volume %s on primary!", device.iv_name)
3561 def _RemoveDisks(lu, instance):
3562 """Remove all disks for an instance.
3564 This abstracts away some work from `AddInstance()` and
3565 `RemoveInstance()`. Note that in case some of the devices couldn't
3566 be removed, the removal will continue with the other ones (compare
3567 with `_CreateDisks()`).
3569 @type lu: L{LogicalUnit}
3570 @param lu: the logical unit on whose behalf we execute
3571 @type instance: L{objects.Instance}
3572 @param instance: the instance whose disks we should remove
3574 @return: the success of the removal
3577 logging.info("Removing block devices for instance %s", instance.name)
3580 for device in instance.disks:
3581 for node, disk in device.ComputeNodeTree(instance.primary_node):
3582 lu.cfg.SetDiskID(disk, node)
3583 result = lu.rpc.call_blockdev_remove(node, disk)
3584 if result.failed or not result.data:
3585 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3586 " continuing anyway", device.iv_name, node)
3589 if instance.disk_template == constants.DT_FILE:
3590 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3591 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3593 if result.failed or not result.data:
3594 logging.error("Could not remove directory '%s'", file_storage_dir)
3600 def _ComputeDiskSize(disk_template, disks):
3601 """Compute disk size requirements in the volume group
3604 # Required free disk space as a function of disk and swap space
3606 constants.DT_DISKLESS: None,
3607 constants.DT_PLAIN: sum(d["size"] for d in disks),
3608 # 128 MB are added for drbd metadata for each disk
3609 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3610 constants.DT_FILE: None,
3613 if disk_template not in req_size_dict:
3614 raise errors.ProgrammerError("Disk template '%s' size requirement"
3615 " is unknown" % disk_template)
3617 return req_size_dict[disk_template]
3620 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3621 """Hypervisor parameter validation.
3623 This function abstract the hypervisor parameter validation to be
3624 used in both instance create and instance modify.
3626 @type lu: L{LogicalUnit}
3627 @param lu: the logical unit for which we check
3628 @type nodenames: list
3629 @param nodenames: the list of nodes on which we should check
3630 @type hvname: string
3631 @param hvname: the name of the hypervisor we should use
3632 @type hvparams: dict
3633 @param hvparams: the parameters which we need to check
3634 @raise errors.OpPrereqError: if the parameters are not valid
3637 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3640 for node in nodenames:
3643 if not info.data or not isinstance(info.data, (tuple, list)):
3644 raise errors.OpPrereqError("Cannot get current information"
3645 " from node '%s' (%s)" % (node, info.data))
3646 if not info.data[0]:
3647 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3648 " %s" % info.data[1])
3651 class LUCreateInstance(LogicalUnit):
3652 """Create an instance.
3655 HPATH = "instance-add"
3656 HTYPE = constants.HTYPE_INSTANCE
3657 _OP_REQP = ["instance_name", "disks", "disk_template",
3659 "wait_for_sync", "ip_check", "nics",
3660 "hvparams", "beparams"]
3663 def _ExpandNode(self, node):
3664 """Expands and checks one node name.
3667 node_full = self.cfg.ExpandNodeName(node)
3668 if node_full is None:
3669 raise errors.OpPrereqError("Unknown node %s" % node)
3672 def ExpandNames(self):
3673 """ExpandNames for CreateInstance.
3675 Figure out the right locks for instance creation.
3678 self.needed_locks = {}
3680 # set optional parameters to none if they don't exist
3681 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3682 if not hasattr(self.op, attr):
3683 setattr(self.op, attr, None)
3685 # cheap checks, mostly valid constants given
3687 # verify creation mode
3688 if self.op.mode not in (constants.INSTANCE_CREATE,
3689 constants.INSTANCE_IMPORT):
3690 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3693 # disk template and mirror node verification
3694 if self.op.disk_template not in constants.DISK_TEMPLATES:
3695 raise errors.OpPrereqError("Invalid disk template name")
3697 if self.op.hypervisor is None:
3698 self.op.hypervisor = self.cfg.GetHypervisorType()
3700 cluster = self.cfg.GetClusterInfo()
3701 enabled_hvs = cluster.enabled_hypervisors
3702 if self.op.hypervisor not in enabled_hvs:
3703 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3704 " cluster (%s)" % (self.op.hypervisor,
3705 ",".join(enabled_hvs)))
3707 # check hypervisor parameter syntax (locally)
3709 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3711 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3712 hv_type.CheckParameterSyntax(filled_hvp)
3714 # fill and remember the beparams dict
3715 utils.CheckBEParams(self.op.beparams)
3716 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3719 #### instance parameters check
3721 # instance name verification
3722 hostname1 = utils.HostInfo(self.op.instance_name)
3723 self.op.instance_name = instance_name = hostname1.name
3725 # this is just a preventive check, but someone might still add this
3726 # instance in the meantime, and creation will fail at lock-add time
3727 if instance_name in self.cfg.GetInstanceList():
3728 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3731 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3735 for nic in self.op.nics:
3736 # ip validity checks
3737 ip = nic.get("ip", None)
3738 if ip is None or ip.lower() == "none":
3740 elif ip.lower() == constants.VALUE_AUTO:
3741 nic_ip = hostname1.ip
3743 if not utils.IsValidIP(ip):
3744 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3745 " like a valid IP" % ip)
3748 # MAC address verification
3749 mac = nic.get("mac", constants.VALUE_AUTO)
3750 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3751 if not utils.IsValidMac(mac.lower()):
3752 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3754 # bridge verification
3755 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3756 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3758 # disk checks/pre-build
3760 for disk in self.op.disks:
3761 mode = disk.get("mode", constants.DISK_RDWR)
3762 if mode not in constants.DISK_ACCESS_SET:
3763 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3765 size = disk.get("size", None)
3767 raise errors.OpPrereqError("Missing disk size")
3771 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3772 self.disks.append({"size": size, "mode": mode})
3774 # used in CheckPrereq for ip ping check
3775 self.check_ip = hostname1.ip
3777 # file storage checks
3778 if (self.op.file_driver and
3779 not self.op.file_driver in constants.FILE_DRIVER):
3780 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3781 self.op.file_driver)
3783 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3784 raise errors.OpPrereqError("File storage directory path not absolute")
3786 ### Node/iallocator related checks
3787 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3788 raise errors.OpPrereqError("One and only one of iallocator and primary"
3789 " node must be given")
3791 if self.op.iallocator:
3792 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3794 self.op.pnode = self._ExpandNode(self.op.pnode)
3795 nodelist = [self.op.pnode]
3796 if self.op.snode is not None:
3797 self.op.snode = self._ExpandNode(self.op.snode)
3798 nodelist.append(self.op.snode)
3799 self.needed_locks[locking.LEVEL_NODE] = nodelist
3801 # in case of import lock the source node too
3802 if self.op.mode == constants.INSTANCE_IMPORT:
3803 src_node = getattr(self.op, "src_node", None)
3804 src_path = getattr(self.op, "src_path", None)
3806 if src_path is None:
3807 self.op.src_path = src_path = self.op.instance_name
3809 if src_node is None:
3810 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3811 self.op.src_node = None
3812 if os.path.isabs(src_path):
3813 raise errors.OpPrereqError("Importing an instance from an absolute"
3814 " path requires a source node option.")
3816 self.op.src_node = src_node = self._ExpandNode(src_node)
3817 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3818 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3819 if not os.path.isabs(src_path):
3820 self.op.src_path = src_path = \
3821 os.path.join(constants.EXPORT_DIR, src_path)
3823 else: # INSTANCE_CREATE
3824 if getattr(self.op, "os_type", None) is None:
3825 raise errors.OpPrereqError("No guest OS specified")
3827 def _RunAllocator(self):
3828 """Run the allocator based on input opcode.
3831 nics = [n.ToDict() for n in self.nics]
3832 ial = IAllocator(self,
3833 mode=constants.IALLOCATOR_MODE_ALLOC,
3834 name=self.op.instance_name,
3835 disk_template=self.op.disk_template,
3838 vcpus=self.be_full[constants.BE_VCPUS],
3839 mem_size=self.be_full[constants.BE_MEMORY],
3842 hypervisor=self.op.hypervisor,
3845 ial.Run(self.op.iallocator)
3848 raise errors.OpPrereqError("Can't compute nodes using"
3849 " iallocator '%s': %s" % (self.op.iallocator,
3851 if len(ial.nodes) != ial.required_nodes:
3852 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3853 " of nodes (%s), required %s" %
3854 (self.op.iallocator, len(ial.nodes),
3855 ial.required_nodes))
3856 self.op.pnode = ial.nodes[0]
3857 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3858 self.op.instance_name, self.op.iallocator,
3859 ", ".join(ial.nodes))
3860 if ial.required_nodes == 2:
3861 self.op.snode = ial.nodes[1]
3863 def BuildHooksEnv(self):
3866 This runs on master, primary and secondary nodes of the instance.
3870 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3871 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3872 "INSTANCE_ADD_MODE": self.op.mode,
3874 if self.op.mode == constants.INSTANCE_IMPORT:
3875 env["INSTANCE_SRC_NODE"] = self.op.src_node
3876 env["INSTANCE_SRC_PATH"] = self.op.src_path
3877 env["INSTANCE_SRC_IMAGES"] = self.src_images
3879 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3880 primary_node=self.op.pnode,
3881 secondary_nodes=self.secondaries,
3882 status=self.instance_status,
3883 os_type=self.op.os_type,
3884 memory=self.be_full[constants.BE_MEMORY],
3885 vcpus=self.be_full[constants.BE_VCPUS],
3886 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3889 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3894 def CheckPrereq(self):
3895 """Check prerequisites.
3898 if (not self.cfg.GetVGName() and
3899 self.op.disk_template not in constants.DTS_NOT_LVM):
3900 raise errors.OpPrereqError("Cluster does not support lvm-based"
3904 if self.op.mode == constants.INSTANCE_IMPORT:
3905 src_node = self.op.src_node
3906 src_path = self.op.src_path
3908 if src_node is None:
3909 exp_list = self.rpc.call_export_list(
3910 self.acquired_locks[locking.LEVEL_NODE])
3912 for node in exp_list:
3913 if not exp_list[node].failed and src_path in exp_list[node].data:
3915 self.op.src_node = src_node = node
3916 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3920 raise errors.OpPrereqError("No export found for relative path %s" %
3923 _CheckNodeOnline(self, src_node)
3924 result = self.rpc.call_export_info(src_node, src_path)
3927 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3929 export_info = result.data
3930 if not export_info.has_section(constants.INISECT_EXP):
3931 raise errors.ProgrammerError("Corrupted export config")
3933 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3934 if (int(ei_version) != constants.EXPORT_VERSION):
3935 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3936 (ei_version, constants.EXPORT_VERSION))
3938 # Check that the new instance doesn't have less disks than the export
3939 instance_disks = len(self.disks)
3940 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3941 if instance_disks < export_disks:
3942 raise errors.OpPrereqError("Not enough disks to import."
3943 " (instance: %d, export: %d)" %
3944 (instance_disks, export_disks))
3946 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3948 for idx in range(export_disks):
3949 option = 'disk%d_dump' % idx
3950 if export_info.has_option(constants.INISECT_INS, option):
3951 # FIXME: are the old os-es, disk sizes, etc. useful?
3952 export_name = export_info.get(constants.INISECT_INS, option)
3953 image = os.path.join(src_path, export_name)
3954 disk_images.append(image)
3956 disk_images.append(False)
3958 self.src_images = disk_images
3960 old_name = export_info.get(constants.INISECT_INS, 'name')
3961 # FIXME: int() here could throw a ValueError on broken exports
3962 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3963 if self.op.instance_name == old_name:
3964 for idx, nic in enumerate(self.nics):
3965 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3966 nic_mac_ini = 'nic%d_mac' % idx
3967 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3969 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3970 if self.op.start and not self.op.ip_check:
3971 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3972 " adding an instance in start mode")
3974 if self.op.ip_check:
3975 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3976 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3977 (self.check_ip, self.op.instance_name))
3981 if self.op.iallocator is not None:
3982 self._RunAllocator()
3984 #### node related checks
3986 # check primary node
3987 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3988 assert self.pnode is not None, \
3989 "Cannot retrieve locked node %s" % self.op.pnode
3991 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3994 self.secondaries = []
3996 # mirror node verification
3997 if self.op.disk_template in constants.DTS_NET_MIRROR:
3998 if self.op.snode is None:
3999 raise errors.OpPrereqError("The networked disk templates need"
4001 if self.op.snode == pnode.name:
4002 raise errors.OpPrereqError("The secondary node cannot be"
4003 " the primary node.")
4004 self.secondaries.append(self.op.snode)
4005 _CheckNodeOnline(self, self.op.snode)
4007 nodenames = [pnode.name] + self.secondaries
4009 req_size = _ComputeDiskSize(self.op.disk_template,
4012 # Check lv size requirements
4013 if req_size is not None:
4014 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4016 for node in nodenames:
4017 info = nodeinfo[node]
4021 raise errors.OpPrereqError("Cannot get current information"
4022 " from node '%s'" % node)
4023 vg_free = info.get('vg_free', None)
4024 if not isinstance(vg_free, int):
4025 raise errors.OpPrereqError("Can't compute free disk space on"
4027 if req_size > info['vg_free']:
4028 raise errors.OpPrereqError("Not enough disk space on target node %s."
4029 " %d MB available, %d MB required" %
4030 (node, info['vg_free'], req_size))
4032 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4035 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4037 if not isinstance(result.data, objects.OS):
4038 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4039 " primary node" % self.op.os_type)
4041 # bridge check on primary node
4042 bridges = [n.bridge for n in self.nics]
4043 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4046 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4047 " exist on destination node '%s'" %
4048 (",".join(bridges), pnode.name))
4050 # memory check on primary node
4052 _CheckNodeFreeMemory(self, self.pnode.name,
4053 "creating instance %s" % self.op.instance_name,
4054 self.be_full[constants.BE_MEMORY],
4058 self.instance_status = 'up'
4060 self.instance_status = 'down'
4062 def Exec(self, feedback_fn):
4063 """Create and add the instance to the cluster.
4066 instance = self.op.instance_name
4067 pnode_name = self.pnode.name
4069 for nic in self.nics:
4070 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4071 nic.mac = self.cfg.GenerateMAC()
4073 ht_kind = self.op.hypervisor
4074 if ht_kind in constants.HTS_REQ_PORT:
4075 network_port = self.cfg.AllocatePort()
4079 ##if self.op.vnc_bind_address is None:
4080 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4082 # this is needed because os.path.join does not accept None arguments
4083 if self.op.file_storage_dir is None:
4084 string_file_storage_dir = ""
4086 string_file_storage_dir = self.op.file_storage_dir
4088 # build the full file storage dir path
4089 file_storage_dir = os.path.normpath(os.path.join(
4090 self.cfg.GetFileStorageDir(),
4091 string_file_storage_dir, instance))
4094 disks = _GenerateDiskTemplate(self,
4095 self.op.disk_template,
4096 instance, pnode_name,
4100 self.op.file_driver,
4103 iobj = objects.Instance(name=instance, os=self.op.os_type,
4104 primary_node=pnode_name,
4105 nics=self.nics, disks=disks,
4106 disk_template=self.op.disk_template,
4107 status=self.instance_status,
4108 network_port=network_port,
4109 beparams=self.op.beparams,
4110 hvparams=self.op.hvparams,
4111 hypervisor=self.op.hypervisor,
4114 feedback_fn("* creating instance disks...")
4115 if not _CreateDisks(self, iobj):
4116 _RemoveDisks(self, iobj)
4117 self.cfg.ReleaseDRBDMinors(instance)
4118 raise errors.OpExecError("Device creation failed, reverting...")
4120 feedback_fn("adding instance %s to cluster config" % instance)
4122 self.cfg.AddInstance(iobj)
4123 # Declare that we don't want to remove the instance lock anymore, as we've
4124 # added the instance to the config
4125 del self.remove_locks[locking.LEVEL_INSTANCE]
4126 # Remove the temp. assignements for the instance's drbds
4127 self.cfg.ReleaseDRBDMinors(instance)
4128 # Unlock all the nodes
4129 if self.op.mode == constants.INSTANCE_IMPORT:
4130 nodes_keep = [self.op.src_node]
4131 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4132 if node != self.op.src_node]
4133 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4134 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4136 self.context.glm.release(locking.LEVEL_NODE)
4137 del self.acquired_locks[locking.LEVEL_NODE]
4139 if self.op.wait_for_sync:
4140 disk_abort = not _WaitForSync(self, iobj)
4141 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4142 # make sure the disks are not degraded (still sync-ing is ok)
4144 feedback_fn("* checking mirrors status")
4145 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4150 _RemoveDisks(self, iobj)
4151 self.cfg.RemoveInstance(iobj.name)
4152 # Make sure the instance lock gets removed
4153 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4154 raise errors.OpExecError("There are some degraded disks for"
4157 feedback_fn("creating os for instance %s on node %s" %
4158 (instance, pnode_name))
4160 if iobj.disk_template != constants.DT_DISKLESS:
4161 if self.op.mode == constants.INSTANCE_CREATE:
4162 feedback_fn("* running the instance OS create scripts...")
4163 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4166 raise errors.OpExecError("Could not add os for instance %s"
4168 (instance, pnode_name))
4170 elif self.op.mode == constants.INSTANCE_IMPORT:
4171 feedback_fn("* running the instance OS import scripts...")
4172 src_node = self.op.src_node
4173 src_images = self.src_images
4174 cluster_name = self.cfg.GetClusterName()
4175 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4176 src_node, src_images,
4178 import_result.Raise()
4179 for idx, result in enumerate(import_result.data):
4181 self.LogWarning("Could not import the image %s for instance"
4182 " %s, disk %d, on node %s" %
4183 (src_images[idx], instance, idx, pnode_name))
4185 # also checked in the prereq part
4186 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4190 logging.info("Starting instance %s on node %s", instance, pnode_name)
4191 feedback_fn("* starting instance...")
4192 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4195 raise errors.OpExecError("Could not start instance")
4198 class LUConnectConsole(NoHooksLU):
4199 """Connect to an instance's console.
4201 This is somewhat special in that it returns the command line that
4202 you need to run on the master node in order to connect to the
4206 _OP_REQP = ["instance_name"]
4209 def ExpandNames(self):
4210 self._ExpandAndLockInstance()
4212 def CheckPrereq(self):
4213 """Check prerequisites.
4215 This checks that the instance is in the cluster.
4218 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4219 assert self.instance is not None, \
4220 "Cannot retrieve locked instance %s" % self.op.instance_name
4221 _CheckNodeOnline(self, self.instance.primary_node)
4223 def Exec(self, feedback_fn):
4224 """Connect to the console of an instance
4227 instance = self.instance
4228 node = instance.primary_node
4230 node_insts = self.rpc.call_instance_list([node],
4231 [instance.hypervisor])[node]
4234 if instance.name not in node_insts.data:
4235 raise errors.OpExecError("Instance %s is not running." % instance.name)
4237 logging.debug("Connecting to console of %s on %s", instance.name, node)
4239 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4240 console_cmd = hyper.GetShellCommandForConsole(instance)
4243 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4246 class LUReplaceDisks(LogicalUnit):
4247 """Replace the disks of an instance.
4250 HPATH = "mirrors-replace"
4251 HTYPE = constants.HTYPE_INSTANCE
4252 _OP_REQP = ["instance_name", "mode", "disks"]
4255 def ExpandNames(self):
4256 self._ExpandAndLockInstance()
4258 if not hasattr(self.op, "remote_node"):
4259 self.op.remote_node = None
4261 ia_name = getattr(self.op, "iallocator", None)
4262 if ia_name is not None:
4263 if self.op.remote_node is not None:
4264 raise errors.OpPrereqError("Give either the iallocator or the new"
4265 " secondary, not both")
4266 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4267 elif self.op.remote_node is not None:
4268 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4269 if remote_node is None:
4270 raise errors.OpPrereqError("Node '%s' not known" %
4271 self.op.remote_node)
4272 self.op.remote_node = remote_node
4273 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4274 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4276 self.needed_locks[locking.LEVEL_NODE] = []
4277 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4279 def DeclareLocks(self, level):
4280 # If we're not already locking all nodes in the set we have to declare the
4281 # instance's primary/secondary nodes.
4282 if (level == locking.LEVEL_NODE and
4283 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4284 self._LockInstancesNodes()
4286 def _RunAllocator(self):
4287 """Compute a new secondary node using an IAllocator.
4290 ial = IAllocator(self,
4291 mode=constants.IALLOCATOR_MODE_RELOC,
4292 name=self.op.instance_name,
4293 relocate_from=[self.sec_node])
4295 ial.Run(self.op.iallocator)
4298 raise errors.OpPrereqError("Can't compute nodes using"
4299 " iallocator '%s': %s" % (self.op.iallocator,
4301 if len(ial.nodes) != ial.required_nodes:
4302 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4303 " of nodes (%s), required %s" %
4304 (len(ial.nodes), ial.required_nodes))
4305 self.op.remote_node = ial.nodes[0]
4306 self.LogInfo("Selected new secondary for the instance: %s",
4307 self.op.remote_node)
4309 def BuildHooksEnv(self):
4312 This runs on the master, the primary and all the secondaries.
4316 "MODE": self.op.mode,
4317 "NEW_SECONDARY": self.op.remote_node,
4318 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4320 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4322 self.cfg.GetMasterNode(),
4323 self.instance.primary_node,
4325 if self.op.remote_node is not None:
4326 nl.append(self.op.remote_node)
4329 def CheckPrereq(self):
4330 """Check prerequisites.
4332 This checks that the instance is in the cluster.
4335 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4336 assert instance is not None, \
4337 "Cannot retrieve locked instance %s" % self.op.instance_name
4338 self.instance = instance
4340 if instance.disk_template not in constants.DTS_NET_MIRROR:
4341 raise errors.OpPrereqError("Instance's disk layout is not"
4342 " network mirrored.")
4344 if len(instance.secondary_nodes) != 1:
4345 raise errors.OpPrereqError("The instance has a strange layout,"
4346 " expected one secondary but found %d" %
4347 len(instance.secondary_nodes))
4349 self.sec_node = instance.secondary_nodes[0]
4351 ia_name = getattr(self.op, "iallocator", None)
4352 if ia_name is not None:
4353 self._RunAllocator()
4355 remote_node = self.op.remote_node
4356 if remote_node is not None:
4357 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4358 assert self.remote_node_info is not None, \
4359 "Cannot retrieve locked node %s" % remote_node
4361 self.remote_node_info = None
4362 if remote_node == instance.primary_node:
4363 raise errors.OpPrereqError("The specified node is the primary node of"
4365 elif remote_node == self.sec_node:
4366 if self.op.mode == constants.REPLACE_DISK_SEC:
4367 # this is for DRBD8, where we can't execute the same mode of
4368 # replacement as for drbd7 (no different port allocated)
4369 raise errors.OpPrereqError("Same secondary given, cannot execute"
4371 if instance.disk_template == constants.DT_DRBD8:
4372 if (self.op.mode == constants.REPLACE_DISK_ALL and
4373 remote_node is not None):
4374 # switch to replace secondary mode
4375 self.op.mode = constants.REPLACE_DISK_SEC
4377 if self.op.mode == constants.REPLACE_DISK_ALL:
4378 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4379 " secondary disk replacement, not"
4381 elif self.op.mode == constants.REPLACE_DISK_PRI:
4382 if remote_node is not None:
4383 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4384 " the secondary while doing a primary"
4385 " node disk replacement")
4386 self.tgt_node = instance.primary_node
4387 self.oth_node = instance.secondary_nodes[0]
4388 _CheckNodeOnline(self, self.tgt_node)
4389 _CheckNodeOnline(self, self.oth_node)
4390 elif self.op.mode == constants.REPLACE_DISK_SEC:
4391 self.new_node = remote_node # this can be None, in which case
4392 # we don't change the secondary
4393 self.tgt_node = instance.secondary_nodes[0]
4394 self.oth_node = instance.primary_node
4395 _CheckNodeOnline(self, self.oth_node)
4396 if self.new_node is not None:
4397 _CheckNodeOnline(self, self.new_node)
4399 _CheckNodeOnline(self, self.tgt_node)
4401 raise errors.ProgrammerError("Unhandled disk replace mode")
4403 if not self.op.disks:
4404 self.op.disks = range(len(instance.disks))
4406 for disk_idx in self.op.disks:
4407 instance.FindDisk(disk_idx)
4409 def _ExecD8DiskOnly(self, feedback_fn):
4410 """Replace a disk on the primary or secondary for dbrd8.
4412 The algorithm for replace is quite complicated:
4414 1. for each disk to be replaced:
4416 1. create new LVs on the target node with unique names
4417 1. detach old LVs from the drbd device
4418 1. rename old LVs to name_replaced.<time_t>
4419 1. rename new LVs to old LVs
4420 1. attach the new LVs (with the old names now) to the drbd device
4422 1. wait for sync across all devices
4424 1. for each modified disk:
4426 1. remove old LVs (which have the name name_replaces.<time_t>)
4428 Failures are not very well handled.
4432 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4433 instance = self.instance
4435 vgname = self.cfg.GetVGName()
4438 tgt_node = self.tgt_node
4439 oth_node = self.oth_node
4441 # Step: check device activation
4442 self.proc.LogStep(1, steps_total, "check device existence")
4443 info("checking volume groups")
4444 my_vg = cfg.GetVGName()
4445 results = self.rpc.call_vg_list([oth_node, tgt_node])
4447 raise errors.OpExecError("Can't list volume groups on the nodes")
4448 for node in oth_node, tgt_node:
4450 if res.failed or not res.data or my_vg not in res.data:
4451 raise errors.OpExecError("Volume group '%s' not found on %s" %
4453 for idx, dev in enumerate(instance.disks):
4454 if idx not in self.op.disks:
4456 for node in tgt_node, oth_node:
4457 info("checking disk/%d on %s" % (idx, node))
4458 cfg.SetDiskID(dev, node)
4459 if not self.rpc.call_blockdev_find(node, dev):
4460 raise errors.OpExecError("Can't find disk/%d on node %s" %
4463 # Step: check other node consistency
4464 self.proc.LogStep(2, steps_total, "check peer consistency")
4465 for idx, dev in enumerate(instance.disks):
4466 if idx not in self.op.disks:
4468 info("checking disk/%d consistency on %s" % (idx, oth_node))
4469 if not _CheckDiskConsistency(self, dev, oth_node,
4470 oth_node==instance.primary_node):
4471 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4472 " to replace disks on this node (%s)" %
4473 (oth_node, tgt_node))
4475 # Step: create new storage
4476 self.proc.LogStep(3, steps_total, "allocate new storage")
4477 for idx, dev in enumerate(instance.disks):
4478 if idx not in self.op.disks:
4481 cfg.SetDiskID(dev, tgt_node)
4482 lv_names = [".disk%d_%s" % (idx, suf)
4483 for suf in ["data", "meta"]]
4484 names = _GenerateUniqueNames(self, lv_names)
4485 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4486 logical_id=(vgname, names[0]))
4487 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4488 logical_id=(vgname, names[1]))
4489 new_lvs = [lv_data, lv_meta]
4490 old_lvs = dev.children
4491 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4492 info("creating new local storage on %s for %s" %
4493 (tgt_node, dev.iv_name))
4494 # since we *always* want to create this LV, we use the
4495 # _Create...OnPrimary (which forces the creation), even if we
4496 # are talking about the secondary node
4497 for new_lv in new_lvs:
4498 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4499 _GetInstanceInfoText(instance)):
4500 raise errors.OpExecError("Failed to create new LV named '%s' on"
4502 (new_lv.logical_id[1], tgt_node))
4504 # Step: for each lv, detach+rename*2+attach
4505 self.proc.LogStep(4, steps_total, "change drbd configuration")
4506 for dev, old_lvs, new_lvs in iv_names.itervalues():
4507 info("detaching %s drbd from local storage" % dev.iv_name)
4508 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4511 raise errors.OpExecError("Can't detach drbd from local storage on node"
4512 " %s for device %s" % (tgt_node, dev.iv_name))
4514 #cfg.Update(instance)
4516 # ok, we created the new LVs, so now we know we have the needed
4517 # storage; as such, we proceed on the target node to rename
4518 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4519 # using the assumption that logical_id == physical_id (which in
4520 # turn is the unique_id on that node)
4522 # FIXME(iustin): use a better name for the replaced LVs
4523 temp_suffix = int(time.time())
4524 ren_fn = lambda d, suff: (d.physical_id[0],
4525 d.physical_id[1] + "_replaced-%s" % suff)
4526 # build the rename list based on what LVs exist on the node
4528 for to_ren in old_lvs:
4529 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4530 if not find_res.failed and find_res.data is not None: # device exists
4531 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4533 info("renaming the old LVs on the target node")
4534 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4537 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4538 # now we rename the new LVs to the old LVs
4539 info("renaming the new LVs on the target node")
4540 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4541 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4544 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4546 for old, new in zip(old_lvs, new_lvs):
4547 new.logical_id = old.logical_id
4548 cfg.SetDiskID(new, tgt_node)
4550 for disk in old_lvs:
4551 disk.logical_id = ren_fn(disk, temp_suffix)
4552 cfg.SetDiskID(disk, tgt_node)
4554 # now that the new lvs have the old name, we can add them to the device
4555 info("adding new mirror component on %s" % tgt_node)
4556 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4557 if result.failed or not result.data:
4558 for new_lv in new_lvs:
4559 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4560 if result.failed or not result.data:
4561 warning("Can't rollback device %s", hint="manually cleanup unused"
4563 raise errors.OpExecError("Can't add local storage to drbd")
4565 dev.children = new_lvs
4566 cfg.Update(instance)
4568 # Step: wait for sync
4570 # this can fail as the old devices are degraded and _WaitForSync
4571 # does a combined result over all disks, so we don't check its
4573 self.proc.LogStep(5, steps_total, "sync devices")
4574 _WaitForSync(self, instance, unlock=True)
4576 # so check manually all the devices
4577 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4578 cfg.SetDiskID(dev, instance.primary_node)
4579 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4580 if result.failed or result.data[5]:
4581 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4583 # Step: remove old storage
4584 self.proc.LogStep(6, steps_total, "removing old storage")
4585 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4586 info("remove logical volumes for %s" % name)
4588 cfg.SetDiskID(lv, tgt_node)
4589 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4590 if result.failed or not result.data:
4591 warning("Can't remove old LV", hint="manually remove unused LVs")
4594 def _ExecD8Secondary(self, feedback_fn):
4595 """Replace the secondary node for drbd8.
4597 The algorithm for replace is quite complicated:
4598 - for all disks of the instance:
4599 - create new LVs on the new node with same names
4600 - shutdown the drbd device on the old secondary
4601 - disconnect the drbd network on the primary
4602 - create the drbd device on the new secondary
4603 - network attach the drbd on the primary, using an artifice:
4604 the drbd code for Attach() will connect to the network if it
4605 finds a device which is connected to the good local disks but
4607 - wait for sync across all devices
4608 - remove all disks from the old secondary
4610 Failures are not very well handled.
4614 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4615 instance = self.instance
4619 old_node = self.tgt_node
4620 new_node = self.new_node
4621 pri_node = instance.primary_node
4623 # Step: check device activation
4624 self.proc.LogStep(1, steps_total, "check device existence")
4625 info("checking volume groups")
4626 my_vg = cfg.GetVGName()
4627 results = self.rpc.call_vg_list([pri_node, new_node])
4628 for node in pri_node, new_node:
4630 if res.failed or not res.data or my_vg not in res.data:
4631 raise errors.OpExecError("Volume group '%s' not found on %s" %
4633 for idx, dev in enumerate(instance.disks):
4634 if idx not in self.op.disks:
4636 info("checking disk/%d on %s" % (idx, pri_node))
4637 cfg.SetDiskID(dev, pri_node)
4638 result = self.rpc.call_blockdev_find(pri_node, dev)
4641 raise errors.OpExecError("Can't find disk/%d on node %s" %
4644 # Step: check other node consistency
4645 self.proc.LogStep(2, steps_total, "check peer consistency")
4646 for idx, dev in enumerate(instance.disks):
4647 if idx not in self.op.disks:
4649 info("checking disk/%d consistency on %s" % (idx, pri_node))
4650 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4651 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4652 " unsafe to replace the secondary" %
4655 # Step: create new storage
4656 self.proc.LogStep(3, steps_total, "allocate new storage")
4657 for idx, dev in enumerate(instance.disks):
4658 info("adding new local storage on %s for disk/%d" %
4660 # since we *always* want to create this LV, we use the
4661 # _Create...OnPrimary (which forces the creation), even if we
4662 # are talking about the secondary node
4663 for new_lv in dev.children:
4664 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4665 _GetInstanceInfoText(instance)):
4666 raise errors.OpExecError("Failed to create new LV named '%s' on"
4668 (new_lv.logical_id[1], new_node))
4670 # Step 4: dbrd minors and drbd setups changes
4671 # after this, we must manually remove the drbd minors on both the
4672 # error and the success paths
4673 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4675 logging.debug("Allocated minors %s" % (minors,))
4676 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4677 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4679 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4680 # create new devices on new_node
4681 if pri_node == dev.logical_id[0]:
4682 new_logical_id = (pri_node, new_node,
4683 dev.logical_id[2], dev.logical_id[3], new_minor,
4686 new_logical_id = (new_node, pri_node,
4687 dev.logical_id[2], new_minor, dev.logical_id[4],
4689 iv_names[idx] = (dev, dev.children, new_logical_id)
4690 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4692 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4693 logical_id=new_logical_id,
4694 children=dev.children)
4695 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4697 _GetInstanceInfoText(instance)):
4698 self.cfg.ReleaseDRBDMinors(instance.name)
4699 raise errors.OpExecError("Failed to create new DRBD on"
4700 " node '%s'" % new_node)
4702 for idx, dev in enumerate(instance.disks):
4703 # we have new devices, shutdown the drbd on the old secondary
4704 info("shutting down drbd for disk/%d on old node" % idx)
4705 cfg.SetDiskID(dev, old_node)
4706 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4707 if result.failed or not result.data:
4708 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4709 hint="Please cleanup this device manually as soon as possible")
4711 info("detaching primary drbds from the network (=> standalone)")
4713 for idx, dev in enumerate(instance.disks):
4714 cfg.SetDiskID(dev, pri_node)
4715 # set the network part of the physical (unique in bdev terms) id
4716 # to None, meaning detach from network
4717 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4718 # and 'find' the device, which will 'fix' it to match the
4720 result = self.rpc.call_blockdev_find(pri_node, dev)
4721 if not result.failed and result.data:
4724 warning("Failed to detach drbd disk/%d from network, unusual case" %
4728 # no detaches succeeded (very unlikely)
4729 self.cfg.ReleaseDRBDMinors(instance.name)
4730 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4732 # if we managed to detach at least one, we update all the disks of
4733 # the instance to point to the new secondary
4734 info("updating instance configuration")
4735 for dev, _, new_logical_id in iv_names.itervalues():
4736 dev.logical_id = new_logical_id
4737 cfg.SetDiskID(dev, pri_node)
4738 cfg.Update(instance)
4739 # we can remove now the temp minors as now the new values are
4740 # written to the config file (and therefore stable)
4741 self.cfg.ReleaseDRBDMinors(instance.name)
4743 # and now perform the drbd attach
4744 info("attaching primary drbds to new secondary (standalone => connected)")
4745 for idx, dev in enumerate(instance.disks):
4746 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4747 # since the attach is smart, it's enough to 'find' the device,
4748 # it will automatically activate the network, if the physical_id
4750 cfg.SetDiskID(dev, pri_node)
4751 logging.debug("Disk to attach: %s", dev)
4752 result = self.rpc.call_blockdev_find(pri_node, dev)
4753 if result.failed or not result.data:
4754 warning("can't attach drbd disk/%d to new secondary!" % idx,
4755 "please do a gnt-instance info to see the status of disks")
4757 # this can fail as the old devices are degraded and _WaitForSync
4758 # does a combined result over all disks, so we don't check its
4760 self.proc.LogStep(5, steps_total, "sync devices")
4761 _WaitForSync(self, instance, unlock=True)
4763 # so check manually all the devices
4764 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4765 cfg.SetDiskID(dev, pri_node)
4766 result = self.rpc.call_blockdev_find(pri_node, dev)
4769 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4771 self.proc.LogStep(6, steps_total, "removing old storage")
4772 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4773 info("remove logical volumes for disk/%d" % idx)
4775 cfg.SetDiskID(lv, old_node)
4776 result = self.rpc.call_blockdev_remove(old_node, lv)
4777 if result.failed or not result.data:
4778 warning("Can't remove LV on old secondary",
4779 hint="Cleanup stale volumes by hand")
4781 def Exec(self, feedback_fn):
4782 """Execute disk replacement.
4784 This dispatches the disk replacement to the appropriate handler.
4787 instance = self.instance
4789 # Activate the instance disks if we're replacing them on a down instance
4790 if instance.status == "down":
4791 _StartInstanceDisks(self, instance, True)
4793 if instance.disk_template == constants.DT_DRBD8:
4794 if self.op.remote_node is None:
4795 fn = self._ExecD8DiskOnly
4797 fn = self._ExecD8Secondary
4799 raise errors.ProgrammerError("Unhandled disk replacement case")
4801 ret = fn(feedback_fn)
4803 # Deactivate the instance disks if we're replacing them on a down instance
4804 if instance.status == "down":
4805 _SafeShutdownInstanceDisks(self, instance)
4810 class LUGrowDisk(LogicalUnit):
4811 """Grow a disk of an instance.
4815 HTYPE = constants.HTYPE_INSTANCE
4816 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4819 def ExpandNames(self):
4820 self._ExpandAndLockInstance()
4821 self.needed_locks[locking.LEVEL_NODE] = []
4822 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4824 def DeclareLocks(self, level):
4825 if level == locking.LEVEL_NODE:
4826 self._LockInstancesNodes()
4828 def BuildHooksEnv(self):
4831 This runs on the master, the primary and all the secondaries.
4835 "DISK": self.op.disk,
4836 "AMOUNT": self.op.amount,
4838 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4840 self.cfg.GetMasterNode(),
4841 self.instance.primary_node,
4845 def CheckPrereq(self):
4846 """Check prerequisites.
4848 This checks that the instance is in the cluster.
4851 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4852 assert instance is not None, \
4853 "Cannot retrieve locked instance %s" % self.op.instance_name
4854 _CheckNodeOnline(self, instance.primary_node)
4855 for node in instance.secondary_nodes:
4856 _CheckNodeOnline(self, node)
4859 self.instance = instance
4861 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4862 raise errors.OpPrereqError("Instance's disk layout does not support"
4865 self.disk = instance.FindDisk(self.op.disk)
4867 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4868 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4869 instance.hypervisor)
4870 for node in nodenames:
4871 info = nodeinfo[node]
4872 if info.failed or not info.data:
4873 raise errors.OpPrereqError("Cannot get current information"
4874 " from node '%s'" % node)
4875 vg_free = info.data.get('vg_free', None)
4876 if not isinstance(vg_free, int):
4877 raise errors.OpPrereqError("Can't compute free disk space on"
4879 if self.op.amount > vg_free:
4880 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4881 " %d MiB available, %d MiB required" %
4882 (node, vg_free, self.op.amount))
4884 def Exec(self, feedback_fn):
4885 """Execute disk grow.
4888 instance = self.instance
4890 for node in (instance.secondary_nodes + (instance.primary_node,)):
4891 self.cfg.SetDiskID(disk, node)
4892 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4894 if (not result.data or not isinstance(result.data, (list, tuple)) or
4895 len(result.data) != 2):
4896 raise errors.OpExecError("Grow request failed to node %s" % node)
4897 elif not result.data[0]:
4898 raise errors.OpExecError("Grow request failed to node %s: %s" %
4899 (node, result.data[1]))
4900 disk.RecordGrow(self.op.amount)
4901 self.cfg.Update(instance)
4902 if self.op.wait_for_sync:
4903 disk_abort = not _WaitForSync(self, instance)
4905 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4906 " status.\nPlease check the instance.")
4909 class LUQueryInstanceData(NoHooksLU):
4910 """Query runtime instance data.
4913 _OP_REQP = ["instances", "static"]
4916 def ExpandNames(self):
4917 self.needed_locks = {}
4918 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4920 if not isinstance(self.op.instances, list):
4921 raise errors.OpPrereqError("Invalid argument type 'instances'")
4923 if self.op.instances:
4924 self.wanted_names = []
4925 for name in self.op.instances:
4926 full_name = self.cfg.ExpandInstanceName(name)
4927 if full_name is None:
4928 raise errors.OpPrereqError("Instance '%s' not known" %
4929 self.op.instance_name)
4930 self.wanted_names.append(full_name)
4931 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4933 self.wanted_names = None
4934 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4936 self.needed_locks[locking.LEVEL_NODE] = []
4937 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4939 def DeclareLocks(self, level):
4940 if level == locking.LEVEL_NODE:
4941 self._LockInstancesNodes()
4943 def CheckPrereq(self):
4944 """Check prerequisites.
4946 This only checks the optional instance list against the existing names.
4949 if self.wanted_names is None:
4950 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4952 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4953 in self.wanted_names]
4956 def _ComputeDiskStatus(self, instance, snode, dev):
4957 """Compute block device status.
4960 static = self.op.static
4962 self.cfg.SetDiskID(dev, instance.primary_node)
4963 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4965 dev_pstatus = dev_pstatus.data
4969 if dev.dev_type in constants.LDS_DRBD:
4970 # we change the snode then (otherwise we use the one passed in)
4971 if dev.logical_id[0] == instance.primary_node:
4972 snode = dev.logical_id[1]
4974 snode = dev.logical_id[0]
4976 if snode and not static:
4977 self.cfg.SetDiskID(dev, snode)
4978 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4980 dev_sstatus = dev_sstatus.data
4985 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4986 for child in dev.children]
4991 "iv_name": dev.iv_name,
4992 "dev_type": dev.dev_type,
4993 "logical_id": dev.logical_id,
4994 "physical_id": dev.physical_id,
4995 "pstatus": dev_pstatus,
4996 "sstatus": dev_sstatus,
4997 "children": dev_children,
5003 def Exec(self, feedback_fn):
5004 """Gather and return data"""
5007 cluster = self.cfg.GetClusterInfo()
5009 for instance in self.wanted_instances:
5010 if not self.op.static:
5011 remote_info = self.rpc.call_instance_info(instance.primary_node,
5013 instance.hypervisor)
5015 remote_info = remote_info.data
5016 if remote_info and "state" in remote_info:
5019 remote_state = "down"
5022 if instance.status == "down":
5023 config_state = "down"
5027 disks = [self._ComputeDiskStatus(instance, None, device)
5028 for device in instance.disks]
5031 "name": instance.name,
5032 "config_state": config_state,
5033 "run_state": remote_state,
5034 "pnode": instance.primary_node,
5035 "snodes": instance.secondary_nodes,
5037 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5039 "hypervisor": instance.hypervisor,
5040 "network_port": instance.network_port,
5041 "hv_instance": instance.hvparams,
5042 "hv_actual": cluster.FillHV(instance),
5043 "be_instance": instance.beparams,
5044 "be_actual": cluster.FillBE(instance),
5047 result[instance.name] = idict
5052 class LUSetInstanceParams(LogicalUnit):
5053 """Modifies an instances's parameters.
5056 HPATH = "instance-modify"
5057 HTYPE = constants.HTYPE_INSTANCE
5058 _OP_REQP = ["instance_name"]
5061 def CheckArguments(self):
5062 if not hasattr(self.op, 'nics'):
5064 if not hasattr(self.op, 'disks'):
5066 if not hasattr(self.op, 'beparams'):
5067 self.op.beparams = {}
5068 if not hasattr(self.op, 'hvparams'):
5069 self.op.hvparams = {}
5070 self.op.force = getattr(self.op, "force", False)
5071 if not (self.op.nics or self.op.disks or
5072 self.op.hvparams or self.op.beparams):
5073 raise errors.OpPrereqError("No changes submitted")
5075 utils.CheckBEParams(self.op.beparams)
5079 for disk_op, disk_dict in self.op.disks:
5080 if disk_op == constants.DDM_REMOVE:
5083 elif disk_op == constants.DDM_ADD:
5086 if not isinstance(disk_op, int):
5087 raise errors.OpPrereqError("Invalid disk index")
5088 if disk_op == constants.DDM_ADD:
5089 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5090 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5091 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5092 size = disk_dict.get('size', None)
5094 raise errors.OpPrereqError("Required disk parameter size missing")
5097 except ValueError, err:
5098 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5100 disk_dict['size'] = size
5102 # modification of disk
5103 if 'size' in disk_dict:
5104 raise errors.OpPrereqError("Disk size change not possible, use"
5107 if disk_addremove > 1:
5108 raise errors.OpPrereqError("Only one disk add or remove operation"
5109 " supported at a time")
5113 for nic_op, nic_dict in self.op.nics:
5114 if nic_op == constants.DDM_REMOVE:
5117 elif nic_op == constants.DDM_ADD:
5120 if not isinstance(nic_op, int):
5121 raise errors.OpPrereqError("Invalid nic index")
5123 # nic_dict should be a dict
5124 nic_ip = nic_dict.get('ip', None)
5125 if nic_ip is not None:
5126 if nic_ip.lower() == "none":
5127 nic_dict['ip'] = None
5129 if not utils.IsValidIP(nic_ip):
5130 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5131 # we can only check None bridges and assign the default one
5132 nic_bridge = nic_dict.get('bridge', None)
5133 if nic_bridge is None:
5134 nic_dict['bridge'] = self.cfg.GetDefBridge()
5135 # but we can validate MACs
5136 nic_mac = nic_dict.get('mac', None)
5137 if nic_mac is not None:
5138 if self.cfg.IsMacInUse(nic_mac):
5139 raise errors.OpPrereqError("MAC address %s already in use"
5140 " in cluster" % nic_mac)
5141 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5142 if not utils.IsValidMac(nic_mac):
5143 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5144 if nic_addremove > 1:
5145 raise errors.OpPrereqError("Only one NIC add or remove operation"
5146 " supported at a time")
5148 def ExpandNames(self):
5149 self._ExpandAndLockInstance()
5150 self.needed_locks[locking.LEVEL_NODE] = []
5151 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5153 def DeclareLocks(self, level):
5154 if level == locking.LEVEL_NODE:
5155 self._LockInstancesNodes()
5157 def BuildHooksEnv(self):
5160 This runs on the master, primary and secondaries.
5164 if constants.BE_MEMORY in self.be_new:
5165 args['memory'] = self.be_new[constants.BE_MEMORY]
5166 if constants.BE_VCPUS in self.be_new:
5167 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5168 # FIXME: readd disk/nic changes
5169 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5170 nl = [self.cfg.GetMasterNode(),
5171 self.instance.primary_node] + list(self.instance.secondary_nodes)
5174 def CheckPrereq(self):
5175 """Check prerequisites.
5177 This only checks the instance list against the existing names.
5180 force = self.force = self.op.force
5182 # checking the new params on the primary/secondary nodes
5184 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5185 assert self.instance is not None, \
5186 "Cannot retrieve locked instance %s" % self.op.instance_name
5187 pnode = self.instance.primary_node
5189 nodelist.extend(instance.secondary_nodes)
5191 # hvparams processing
5192 if self.op.hvparams:
5193 i_hvdict = copy.deepcopy(instance.hvparams)
5194 for key, val in self.op.hvparams.iteritems():
5195 if val == constants.VALUE_DEFAULT:
5200 elif val == constants.VALUE_NONE:
5201 i_hvdict[key] = None
5204 cluster = self.cfg.GetClusterInfo()
5205 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5208 hypervisor.GetHypervisor(
5209 instance.hypervisor).CheckParameterSyntax(hv_new)
5210 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5211 self.hv_new = hv_new # the new actual values
5212 self.hv_inst = i_hvdict # the new dict (without defaults)
5214 self.hv_new = self.hv_inst = {}
5216 # beparams processing
5217 if self.op.beparams:
5218 i_bedict = copy.deepcopy(instance.beparams)
5219 for key, val in self.op.beparams.iteritems():
5220 if val == constants.VALUE_DEFAULT:
5227 cluster = self.cfg.GetClusterInfo()
5228 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5230 self.be_new = be_new # the new actual values
5231 self.be_inst = i_bedict # the new dict (without defaults)
5233 self.be_new = self.be_inst = {}
5237 if constants.BE_MEMORY in self.op.beparams and not self.force:
5238 mem_check_list = [pnode]
5239 if be_new[constants.BE_AUTO_BALANCE]:
5240 # either we changed auto_balance to yes or it was from before
5241 mem_check_list.extend(instance.secondary_nodes)
5242 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5243 instance.hypervisor)
5244 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5245 instance.hypervisor)
5246 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5247 # Assume the primary node is unreachable and go ahead
5248 self.warn.append("Can't get info from primary node %s" % pnode)
5250 if not instance_info.failed and instance_info.data:
5251 current_mem = instance_info.data['memory']
5253 # Assume instance not running
5254 # (there is a slight race condition here, but it's not very probable,
5255 # and we have no other way to check)
5257 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5258 nodeinfo[pnode].data['memory_free'])
5260 raise errors.OpPrereqError("This change will prevent the instance"
5261 " from starting, due to %d MB of memory"
5262 " missing on its primary node" % miss_mem)
5264 if be_new[constants.BE_AUTO_BALANCE]:
5265 for node, nres in instance.secondary_nodes.iteritems():
5266 if nres.failed or not isinstance(nres.data, dict):
5267 self.warn.append("Can't get info from secondary node %s" % node)
5268 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5269 self.warn.append("Not enough memory to failover instance to"
5270 " secondary node %s" % node)
5273 for nic_op, nic_dict in self.op.nics:
5274 if nic_op == constants.DDM_REMOVE:
5275 if not instance.nics:
5276 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5278 if nic_op != constants.DDM_ADD:
5280 if nic_op < 0 or nic_op >= len(instance.nics):
5281 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5283 (nic_op, len(instance.nics)))
5284 nic_bridge = nic_dict.get('bridge', None)
5285 if nic_bridge is not None:
5286 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5287 msg = ("Bridge '%s' doesn't exist on one of"
5288 " the instance nodes" % nic_bridge)
5290 self.warn.append(msg)
5292 raise errors.OpPrereqError(msg)
5295 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5296 raise errors.OpPrereqError("Disk operations not supported for"
5297 " diskless instances")
5298 for disk_op, disk_dict in self.op.disks:
5299 if disk_op == constants.DDM_REMOVE:
5300 if len(instance.disks) == 1:
5301 raise errors.OpPrereqError("Cannot remove the last disk of"
5303 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5304 ins_l = ins_l[pnode]
5305 if not type(ins_l) is list:
5306 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5307 if instance.name in ins_l:
5308 raise errors.OpPrereqError("Instance is running, can't remove"
5311 if (disk_op == constants.DDM_ADD and
5312 len(instance.nics) >= constants.MAX_DISKS):
5313 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5314 " add more" % constants.MAX_DISKS)
5315 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5317 if disk_op < 0 or disk_op >= len(instance.disks):
5318 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5320 (disk_op, len(instance.disks)))
5324 def Exec(self, feedback_fn):
5325 """Modifies an instance.
5327 All parameters take effect only at the next restart of the instance.
5330 # Process here the warnings from CheckPrereq, as we don't have a
5331 # feedback_fn there.
5332 for warn in self.warn:
5333 feedback_fn("WARNING: %s" % warn)
5336 instance = self.instance
5338 for disk_op, disk_dict in self.op.disks:
5339 if disk_op == constants.DDM_REMOVE:
5340 # remove the last disk
5341 device = instance.disks.pop()
5342 device_idx = len(instance.disks)
5343 for node, disk in device.ComputeNodeTree(instance.primary_node):
5344 self.cfg.SetDiskID(disk, node)
5345 result = self.rpc.call_blockdev_remove(node, disk)
5346 if result.failed or not result.data:
5347 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5348 " continuing anyway", device_idx, node)
5349 result.append(("disk/%d" % device_idx, "remove"))
5350 elif disk_op == constants.DDM_ADD:
5352 if instance.disk_template == constants.DT_FILE:
5353 file_driver, file_path = instance.disks[0].logical_id
5354 file_path = os.path.dirname(file_path)
5356 file_driver = file_path = None
5357 disk_idx_base = len(instance.disks)
5358 new_disk = _GenerateDiskTemplate(self,
5359 instance.disk_template,
5360 instance, instance.primary_node,
5361 instance.secondary_nodes,
5366 new_disk.mode = disk_dict['mode']
5367 instance.disks.append(new_disk)
5368 info = _GetInstanceInfoText(instance)
5370 logging.info("Creating volume %s for instance %s",
5371 new_disk.iv_name, instance.name)
5372 # Note: this needs to be kept in sync with _CreateDisks
5374 for secondary_node in instance.secondary_nodes:
5375 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5376 new_disk, False, info):
5377 self.LogWarning("Failed to create volume %s (%s) on"
5378 " secondary node %s!",
5379 new_disk.iv_name, new_disk, secondary_node)
5381 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5382 instance, new_disk, info):
5383 self.LogWarning("Failed to create volume %s on primary!",
5385 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5386 (new_disk.size, new_disk.mode)))
5388 # change a given disk
5389 instance.disks[disk_op].mode = disk_dict['mode']
5390 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5392 for nic_op, nic_dict in self.op.nics:
5393 if nic_op == constants.DDM_REMOVE:
5394 # remove the last nic
5395 del instance.nics[-1]
5396 result.append(("nic.%d" % len(instance.nics), "remove"))
5397 elif nic_op == constants.DDM_ADD:
5399 if 'mac' not in nic_dict:
5400 mac = constants.VALUE_GENERATE
5402 mac = nic_dict['mac']
5403 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5404 mac = self.cfg.GenerateMAC()
5405 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5406 bridge=nic_dict.get('bridge', None))
5407 instance.nics.append(new_nic)
5408 result.append(("nic.%d" % (len(instance.nics) - 1),
5409 "add:mac=%s,ip=%s,bridge=%s" %
5410 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5412 # change a given nic
5413 for key in 'mac', 'ip', 'bridge':
5415 setattr(instance.nics[nic_op], key, nic_dict[key])
5416 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5419 if self.op.hvparams:
5420 instance.hvparams = self.hv_new
5421 for key, val in self.op.hvparams.iteritems():
5422 result.append(("hv/%s" % key, val))
5425 if self.op.beparams:
5426 instance.beparams = self.be_inst
5427 for key, val in self.op.beparams.iteritems():
5428 result.append(("be/%s" % key, val))
5430 self.cfg.Update(instance)
5435 class LUQueryExports(NoHooksLU):
5436 """Query the exports list
5439 _OP_REQP = ['nodes']
5442 def ExpandNames(self):
5443 self.needed_locks = {}
5444 self.share_locks[locking.LEVEL_NODE] = 1
5445 if not self.op.nodes:
5446 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5448 self.needed_locks[locking.LEVEL_NODE] = \
5449 _GetWantedNodes(self, self.op.nodes)
5451 def CheckPrereq(self):
5452 """Check prerequisites.
5455 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5457 def Exec(self, feedback_fn):
5458 """Compute the list of all the exported system images.
5461 @return: a dictionary with the structure node->(export-list)
5462 where export-list is a list of the instances exported on
5466 rpcresult = self.rpc.call_export_list(self.nodes)
5468 for node in rpcresult:
5469 if rpcresult[node].failed:
5470 result[node] = False
5472 result[node] = rpcresult[node].data
5477 class LUExportInstance(LogicalUnit):
5478 """Export an instance to an image in the cluster.
5481 HPATH = "instance-export"
5482 HTYPE = constants.HTYPE_INSTANCE
5483 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5486 def ExpandNames(self):
5487 self._ExpandAndLockInstance()
5488 # FIXME: lock only instance primary and destination node
5490 # Sad but true, for now we have do lock all nodes, as we don't know where
5491 # the previous export might be, and and in this LU we search for it and
5492 # remove it from its current node. In the future we could fix this by:
5493 # - making a tasklet to search (share-lock all), then create the new one,
5494 # then one to remove, after
5495 # - removing the removal operation altoghether
5496 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5498 def DeclareLocks(self, level):
5499 """Last minute lock declaration."""
5500 # All nodes are locked anyway, so nothing to do here.
5502 def BuildHooksEnv(self):
5505 This will run on the master, primary node and target node.
5509 "EXPORT_NODE": self.op.target_node,
5510 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5512 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5513 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5514 self.op.target_node]
5517 def CheckPrereq(self):
5518 """Check prerequisites.
5520 This checks that the instance and node names are valid.
5523 instance_name = self.op.instance_name
5524 self.instance = self.cfg.GetInstanceInfo(instance_name)
5525 assert self.instance is not None, \
5526 "Cannot retrieve locked instance %s" % self.op.instance_name
5527 _CheckNodeOnline(self, self.instance.primary_node)
5529 self.dst_node = self.cfg.GetNodeInfo(
5530 self.cfg.ExpandNodeName(self.op.target_node))
5532 if self.dst_node is None:
5533 # This is wrong node name, not a non-locked node
5534 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5535 _CheckNodeOnline(self, self.op.target_node)
5537 # instance disk type verification
5538 for disk in self.instance.disks:
5539 if disk.dev_type == constants.LD_FILE:
5540 raise errors.OpPrereqError("Export not supported for instances with"
5541 " file-based disks")
5543 def Exec(self, feedback_fn):
5544 """Export an instance to an image in the cluster.
5547 instance = self.instance
5548 dst_node = self.dst_node
5549 src_node = instance.primary_node
5550 if self.op.shutdown:
5551 # shutdown the instance, but not the disks
5552 result = self.rpc.call_instance_shutdown(src_node, instance)
5555 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5556 (instance.name, src_node))
5558 vgname = self.cfg.GetVGName()
5563 for disk in instance.disks:
5564 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5565 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5566 if new_dev_name.failed or not new_dev_name.data:
5567 self.LogWarning("Could not snapshot block device %s on node %s",
5568 disk.logical_id[1], src_node)
5569 snap_disks.append(False)
5571 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5572 logical_id=(vgname, new_dev_name.data),
5573 physical_id=(vgname, new_dev_name.data),
5574 iv_name=disk.iv_name)
5575 snap_disks.append(new_dev)
5578 if self.op.shutdown and instance.status == "up":
5579 result = self.rpc.call_instance_start(src_node, instance, None)
5580 if result.failed or not result.data:
5581 _ShutdownInstanceDisks(self, instance)
5582 raise errors.OpExecError("Could not start instance")
5584 # TODO: check for size
5586 cluster_name = self.cfg.GetClusterName()
5587 for idx, dev in enumerate(snap_disks):
5589 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5590 instance, cluster_name, idx)
5591 if result.failed or not result.data:
5592 self.LogWarning("Could not export block device %s from node %s to"
5593 " node %s", dev.logical_id[1], src_node,
5595 result = self.rpc.call_blockdev_remove(src_node, dev)
5596 if result.failed or not result.data:
5597 self.LogWarning("Could not remove snapshot block device %s from node"
5598 " %s", dev.logical_id[1], src_node)
5600 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5601 if result.failed or not result.data:
5602 self.LogWarning("Could not finalize export for instance %s on node %s",
5603 instance.name, dst_node.name)
5605 nodelist = self.cfg.GetNodeList()
5606 nodelist.remove(dst_node.name)
5608 # on one-node clusters nodelist will be empty after the removal
5609 # if we proceed the backup would be removed because OpQueryExports
5610 # substitutes an empty list with the full cluster node list.
5612 exportlist = self.rpc.call_export_list(nodelist)
5613 for node in exportlist:
5614 if exportlist[node].failed:
5616 if instance.name in exportlist[node].data:
5617 if not self.rpc.call_export_remove(node, instance.name):
5618 self.LogWarning("Could not remove older export for instance %s"
5619 " on node %s", instance.name, node)
5622 class LURemoveExport(NoHooksLU):
5623 """Remove exports related to the named instance.
5626 _OP_REQP = ["instance_name"]
5629 def ExpandNames(self):
5630 self.needed_locks = {}
5631 # We need all nodes to be locked in order for RemoveExport to work, but we
5632 # don't need to lock the instance itself, as nothing will happen to it (and
5633 # we can remove exports also for a removed instance)
5634 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5636 def CheckPrereq(self):
5637 """Check prerequisites.
5641 def Exec(self, feedback_fn):
5642 """Remove any export.
5645 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5646 # If the instance was not found we'll try with the name that was passed in.
5647 # This will only work if it was an FQDN, though.
5649 if not instance_name:
5651 instance_name = self.op.instance_name
5653 exportlist = self.rpc.call_export_list(self.acquired_locks[
5654 locking.LEVEL_NODE])
5656 for node in exportlist:
5657 if exportlist[node].failed:
5658 self.LogWarning("Failed to query node %s, continuing" % node)
5660 if instance_name in exportlist[node].data:
5662 result = self.rpc.call_export_remove(node, instance_name)
5663 if result.failed or not result.data:
5664 logging.error("Could not remove export for instance %s"
5665 " on node %s", instance_name, node)
5667 if fqdn_warn and not found:
5668 feedback_fn("Export not found. If trying to remove an export belonging"
5669 " to a deleted instance please use its Fully Qualified"
5673 class TagsLU(NoHooksLU):
5676 This is an abstract class which is the parent of all the other tags LUs.
5680 def ExpandNames(self):
5681 self.needed_locks = {}
5682 if self.op.kind == constants.TAG_NODE:
5683 name = self.cfg.ExpandNodeName(self.op.name)
5685 raise errors.OpPrereqError("Invalid node name (%s)" %
5688 self.needed_locks[locking.LEVEL_NODE] = name
5689 elif self.op.kind == constants.TAG_INSTANCE:
5690 name = self.cfg.ExpandInstanceName(self.op.name)
5692 raise errors.OpPrereqError("Invalid instance name (%s)" %
5695 self.needed_locks[locking.LEVEL_INSTANCE] = name
5697 def CheckPrereq(self):
5698 """Check prerequisites.
5701 if self.op.kind == constants.TAG_CLUSTER:
5702 self.target = self.cfg.GetClusterInfo()
5703 elif self.op.kind == constants.TAG_NODE:
5704 self.target = self.cfg.GetNodeInfo(self.op.name)
5705 elif self.op.kind == constants.TAG_INSTANCE:
5706 self.target = self.cfg.GetInstanceInfo(self.op.name)
5708 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5712 class LUGetTags(TagsLU):
5713 """Returns the tags of a given object.
5716 _OP_REQP = ["kind", "name"]
5719 def Exec(self, feedback_fn):
5720 """Returns the tag list.
5723 return list(self.target.GetTags())
5726 class LUSearchTags(NoHooksLU):
5727 """Searches the tags for a given pattern.
5730 _OP_REQP = ["pattern"]
5733 def ExpandNames(self):
5734 self.needed_locks = {}
5736 def CheckPrereq(self):
5737 """Check prerequisites.
5739 This checks the pattern passed for validity by compiling it.
5743 self.re = re.compile(self.op.pattern)
5744 except re.error, err:
5745 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5746 (self.op.pattern, err))
5748 def Exec(self, feedback_fn):
5749 """Returns the tag list.
5753 tgts = [("/cluster", cfg.GetClusterInfo())]
5754 ilist = cfg.GetAllInstancesInfo().values()
5755 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5756 nlist = cfg.GetAllNodesInfo().values()
5757 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5759 for path, target in tgts:
5760 for tag in target.GetTags():
5761 if self.re.search(tag):
5762 results.append((path, tag))
5766 class LUAddTags(TagsLU):
5767 """Sets a tag on a given object.
5770 _OP_REQP = ["kind", "name", "tags"]
5773 def CheckPrereq(self):
5774 """Check prerequisites.
5776 This checks the type and length of the tag name and value.
5779 TagsLU.CheckPrereq(self)
5780 for tag in self.op.tags:
5781 objects.TaggableObject.ValidateTag(tag)
5783 def Exec(self, feedback_fn):
5788 for tag in self.op.tags:
5789 self.target.AddTag(tag)
5790 except errors.TagError, err:
5791 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5793 self.cfg.Update(self.target)
5794 except errors.ConfigurationError:
5795 raise errors.OpRetryError("There has been a modification to the"
5796 " config file and the operation has been"
5797 " aborted. Please retry.")
5800 class LUDelTags(TagsLU):
5801 """Delete a list of tags from a given object.
5804 _OP_REQP = ["kind", "name", "tags"]
5807 def CheckPrereq(self):
5808 """Check prerequisites.
5810 This checks that we have the given tag.
5813 TagsLU.CheckPrereq(self)
5814 for tag in self.op.tags:
5815 objects.TaggableObject.ValidateTag(tag)
5816 del_tags = frozenset(self.op.tags)
5817 cur_tags = self.target.GetTags()
5818 if not del_tags <= cur_tags:
5819 diff_tags = del_tags - cur_tags
5820 diff_names = ["'%s'" % tag for tag in diff_tags]
5822 raise errors.OpPrereqError("Tag(s) %s not found" %
5823 (",".join(diff_names)))
5825 def Exec(self, feedback_fn):
5826 """Remove the tag from the object.
5829 for tag in self.op.tags:
5830 self.target.RemoveTag(tag)
5832 self.cfg.Update(self.target)
5833 except errors.ConfigurationError:
5834 raise errors.OpRetryError("There has been a modification to the"
5835 " config file and the operation has been"
5836 " aborted. Please retry.")
5839 class LUTestDelay(NoHooksLU):
5840 """Sleep for a specified amount of time.
5842 This LU sleeps on the master and/or nodes for a specified amount of
5846 _OP_REQP = ["duration", "on_master", "on_nodes"]
5849 def ExpandNames(self):
5850 """Expand names and set required locks.
5852 This expands the node list, if any.
5855 self.needed_locks = {}
5856 if self.op.on_nodes:
5857 # _GetWantedNodes can be used here, but is not always appropriate to use
5858 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5860 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5861 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5863 def CheckPrereq(self):
5864 """Check prerequisites.
5868 def Exec(self, feedback_fn):
5869 """Do the actual sleep.
5872 if self.op.on_master:
5873 if not utils.TestDelay(self.op.duration):
5874 raise errors.OpExecError("Error during master delay test")
5875 if self.op.on_nodes:
5876 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5878 raise errors.OpExecError("Complete failure from rpc call")
5879 for node, node_result in result.items():
5881 if not node_result.data:
5882 raise errors.OpExecError("Failure during rpc call to node %s,"
5883 " result: %s" % (node, node_result.data))
5886 class IAllocator(object):
5887 """IAllocator framework.
5889 An IAllocator instance has three sets of attributes:
5890 - cfg that is needed to query the cluster
5891 - input data (all members of the _KEYS class attribute are required)
5892 - four buffer attributes (in|out_data|text), that represent the
5893 input (to the external script) in text and data structure format,
5894 and the output from it, again in two formats
5895 - the result variables from the script (success, info, nodes) for
5900 "mem_size", "disks", "disk_template",
5901 "os", "tags", "nics", "vcpus", "hypervisor",
5907 def __init__(self, lu, mode, name, **kwargs):
5909 # init buffer variables
5910 self.in_text = self.out_text = self.in_data = self.out_data = None
5911 # init all input fields so that pylint is happy
5914 self.mem_size = self.disks = self.disk_template = None
5915 self.os = self.tags = self.nics = self.vcpus = None
5916 self.hypervisor = None
5917 self.relocate_from = None
5919 self.required_nodes = None
5920 # init result fields
5921 self.success = self.info = self.nodes = None
5922 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5923 keyset = self._ALLO_KEYS
5924 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5925 keyset = self._RELO_KEYS
5927 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5928 " IAllocator" % self.mode)
5930 if key not in keyset:
5931 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5932 " IAllocator" % key)
5933 setattr(self, key, kwargs[key])
5935 if key not in kwargs:
5936 raise errors.ProgrammerError("Missing input parameter '%s' to"
5937 " IAllocator" % key)
5938 self._BuildInputData()
5940 def _ComputeClusterData(self):
5941 """Compute the generic allocator input data.
5943 This is the data that is independent of the actual operation.
5947 cluster_info = cfg.GetClusterInfo()
5951 "cluster_name": cfg.GetClusterName(),
5952 "cluster_tags": list(cluster_info.GetTags()),
5953 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5954 # we don't have job IDs
5956 iinfo = cfg.GetAllInstancesInfo().values()
5957 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5961 node_list = cfg.GetNodeList()
5963 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5964 hypervisor_name = self.hypervisor
5965 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5966 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
5968 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5970 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5971 cluster_info.enabled_hypervisors)
5972 for nname in node_list:
5973 ninfo = cfg.GetNodeInfo(nname)
5974 node_data[nname].Raise()
5975 if not isinstance(node_data[nname].data, dict):
5976 raise errors.OpExecError("Can't get data for node %s" % nname)
5977 remote_info = node_data[nname].data
5978 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5979 'vg_size', 'vg_free', 'cpu_total']:
5980 if attr not in remote_info:
5981 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5984 remote_info[attr] = int(remote_info[attr])
5985 except ValueError, err:
5986 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5987 " %s" % (nname, attr, str(err)))
5988 # compute memory used by primary instances
5989 i_p_mem = i_p_up_mem = 0
5990 for iinfo, beinfo in i_list:
5991 if iinfo.primary_node == nname:
5992 i_p_mem += beinfo[constants.BE_MEMORY]
5993 if iinfo.name not in node_iinfo[nname]:
5996 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5997 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5998 remote_info['memory_free'] -= max(0, i_mem_diff)
6000 if iinfo.status == "up":
6001 i_p_up_mem += beinfo[constants.BE_MEMORY]
6003 # compute memory used by instances
6005 "tags": list(ninfo.GetTags()),
6006 "total_memory": remote_info['memory_total'],
6007 "reserved_memory": remote_info['memory_dom0'],
6008 "free_memory": remote_info['memory_free'],
6009 "i_pri_memory": i_p_mem,
6010 "i_pri_up_memory": i_p_up_mem,
6011 "total_disk": remote_info['vg_size'],
6012 "free_disk": remote_info['vg_free'],
6013 "primary_ip": ninfo.primary_ip,
6014 "secondary_ip": ninfo.secondary_ip,
6015 "total_cpus": remote_info['cpu_total'],
6016 "offline": ninfo.offline,
6018 node_results[nname] = pnr
6019 data["nodes"] = node_results
6023 for iinfo, beinfo in i_list:
6024 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6025 for n in iinfo.nics]
6027 "tags": list(iinfo.GetTags()),
6028 "should_run": iinfo.status == "up",
6029 "vcpus": beinfo[constants.BE_VCPUS],
6030 "memory": beinfo[constants.BE_MEMORY],
6032 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6034 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6035 "disk_template": iinfo.disk_template,
6036 "hypervisor": iinfo.hypervisor,
6038 instance_data[iinfo.name] = pir
6040 data["instances"] = instance_data
6044 def _AddNewInstance(self):
6045 """Add new instance data to allocator structure.
6047 This in combination with _AllocatorGetClusterData will create the
6048 correct structure needed as input for the allocator.
6050 The checks for the completeness of the opcode must have already been
6055 if len(self.disks) != 2:
6056 raise errors.OpExecError("Only two-disk configurations supported")
6058 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6060 if self.disk_template in constants.DTS_NET_MIRROR:
6061 self.required_nodes = 2
6063 self.required_nodes = 1
6067 "disk_template": self.disk_template,
6070 "vcpus": self.vcpus,
6071 "memory": self.mem_size,
6072 "disks": self.disks,
6073 "disk_space_total": disk_space,
6075 "required_nodes": self.required_nodes,
6077 data["request"] = request
6079 def _AddRelocateInstance(self):
6080 """Add relocate instance data to allocator structure.
6082 This in combination with _IAllocatorGetClusterData will create the
6083 correct structure needed as input for the allocator.
6085 The checks for the completeness of the opcode must have already been
6089 instance = self.lu.cfg.GetInstanceInfo(self.name)
6090 if instance is None:
6091 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6092 " IAllocator" % self.name)
6094 if instance.disk_template not in constants.DTS_NET_MIRROR:
6095 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6097 if len(instance.secondary_nodes) != 1:
6098 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6100 self.required_nodes = 1
6101 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6102 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6107 "disk_space_total": disk_space,
6108 "required_nodes": self.required_nodes,
6109 "relocate_from": self.relocate_from,
6111 self.in_data["request"] = request
6113 def _BuildInputData(self):
6114 """Build input data structures.
6117 self._ComputeClusterData()
6119 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6120 self._AddNewInstance()
6122 self._AddRelocateInstance()
6124 self.in_text = serializer.Dump(self.in_data)
6126 def Run(self, name, validate=True, call_fn=None):
6127 """Run an instance allocator and return the results.
6131 call_fn = self.lu.rpc.call_iallocator_runner
6134 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6137 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6138 raise errors.OpExecError("Invalid result from master iallocator runner")
6140 rcode, stdout, stderr, fail = result.data
6142 if rcode == constants.IARUN_NOTFOUND:
6143 raise errors.OpExecError("Can't find allocator '%s'" % name)
6144 elif rcode == constants.IARUN_FAILURE:
6145 raise errors.OpExecError("Instance allocator call failed: %s,"
6146 " output: %s" % (fail, stdout+stderr))
6147 self.out_text = stdout
6149 self._ValidateResult()
6151 def _ValidateResult(self):
6152 """Process the allocator results.
6154 This will process and if successful save the result in
6155 self.out_data and the other parameters.
6159 rdict = serializer.Load(self.out_text)
6160 except Exception, err:
6161 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6163 if not isinstance(rdict, dict):
6164 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6166 for key in "success", "info", "nodes":
6167 if key not in rdict:
6168 raise errors.OpExecError("Can't parse iallocator results:"
6169 " missing key '%s'" % key)
6170 setattr(self, key, rdict[key])
6172 if not isinstance(rdict["nodes"], list):
6173 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6175 self.out_data = rdict
6178 class LUTestAllocator(NoHooksLU):
6179 """Run allocator tests.
6181 This LU runs the allocator tests
6184 _OP_REQP = ["direction", "mode", "name"]
6186 def CheckPrereq(self):
6187 """Check prerequisites.
6189 This checks the opcode parameters depending on the director and mode test.
6192 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6193 for attr in ["name", "mem_size", "disks", "disk_template",
6194 "os", "tags", "nics", "vcpus"]:
6195 if not hasattr(self.op, attr):
6196 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6198 iname = self.cfg.ExpandInstanceName(self.op.name)
6199 if iname is not None:
6200 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6202 if not isinstance(self.op.nics, list):
6203 raise errors.OpPrereqError("Invalid parameter 'nics'")
6204 for row in self.op.nics:
6205 if (not isinstance(row, dict) or
6208 "bridge" not in row):
6209 raise errors.OpPrereqError("Invalid contents of the"
6210 " 'nics' parameter")
6211 if not isinstance(self.op.disks, list):
6212 raise errors.OpPrereqError("Invalid parameter 'disks'")
6213 if len(self.op.disks) != 2:
6214 raise errors.OpPrereqError("Only two-disk configurations supported")
6215 for row in self.op.disks:
6216 if (not isinstance(row, dict) or
6217 "size" not in row or
6218 not isinstance(row["size"], int) or
6219 "mode" not in row or
6220 row["mode"] not in ['r', 'w']):
6221 raise errors.OpPrereqError("Invalid contents of the"
6222 " 'disks' parameter")
6223 if self.op.hypervisor is None:
6224 self.op.hypervisor = self.cfg.GetHypervisorType()
6225 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6226 if not hasattr(self.op, "name"):
6227 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6228 fname = self.cfg.ExpandInstanceName(self.op.name)
6230 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6232 self.op.name = fname
6233 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6235 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6238 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6239 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6240 raise errors.OpPrereqError("Missing allocator name")
6241 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6242 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6245 def Exec(self, feedback_fn):
6246 """Run the allocator test.
6249 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6250 ial = IAllocator(self,
6253 mem_size=self.op.mem_size,
6254 disks=self.op.disks,
6255 disk_template=self.op.disk_template,
6259 vcpus=self.op.vcpus,
6260 hypervisor=self.op.hypervisor,
6263 ial = IAllocator(self,
6266 relocate_from=list(self.relocate_from),
6269 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6270 result = ial.in_text
6272 ial.Run(self.op.allocator, validate=False)
6273 result = ial.out_text