4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the nodes is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445 memory, vcpus, nics):
446 """Builds instance related env variables for hooks
448 This builds the hook environment from individual variables.
451 @param name: the name of the instance
452 @type primary_node: string
453 @param primary_node: the name of the instance's primary node
454 @type secondary_nodes: list
455 @param secondary_nodes: list of secondary nodes as strings
456 @type os_type: string
457 @param os_type: the name of the instance's OS
459 @param status: the desired status of the instances
461 @param memory: the memory size of the instance
463 @param vcpus: the count of VCPUs the instance has
465 @param nics: list of tuples (ip, bridge, mac) representing
466 the NICs the instance has
468 @return: the hook environment for this instance
473 "INSTANCE_NAME": name,
474 "INSTANCE_PRIMARY": primary_node,
475 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476 "INSTANCE_OS_TYPE": os_type,
477 "INSTANCE_STATUS": status,
478 "INSTANCE_MEMORY": memory,
479 "INSTANCE_VCPUS": vcpus,
483 nic_count = len(nics)
484 for idx, (ip, bridge, mac) in enumerate(nics):
487 env["INSTANCE_NIC%d_IP" % idx] = ip
488 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
493 env["INSTANCE_NIC_COUNT"] = nic_count
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499 """Builds instance related env variables for hooks from an object.
501 @type lu: L{LogicalUnit}
502 @param lu: the logical unit on whose behalf we execute
503 @type instance: L{objects.Instance}
504 @param instance: the instance for which we should build the
507 @param override: dictionary with key/values that will override
510 @return: the hook environment dictionary
513 bep = lu.cfg.GetClusterInfo().FillBE(instance)
515 'name': instance.name,
516 'primary_node': instance.primary_node,
517 'secondary_nodes': instance.secondary_nodes,
518 'os_type': instance.os,
519 'status': instance.os,
520 'memory': bep[constants.BE_MEMORY],
521 'vcpus': bep[constants.BE_VCPUS],
522 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
525 args.update(override)
526 return _BuildInstanceHookEnv(**args)
529 def _AdjustCandidatePool(lu):
530 """Adjust the candidate pool after node operations.
533 mod_list = lu.cfg.MaintainCandidatePool()
535 lu.LogInfo("Promoted nodes to master candidate role: %s",
536 ", ".join(node.name for node in mod_list))
537 for name in mod_list:
538 lu.context.ReaddNode(name)
539 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
541 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
545 def _CheckInstanceBridgesExist(lu, instance):
546 """Check that the brigdes needed by an instance exist.
549 # check bridges existance
550 brlist = [nic.bridge for nic in instance.nics]
551 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
554 raise errors.OpPrereqError("One or more target bridges %s does not"
555 " exist on destination node '%s'" %
556 (brlist, instance.primary_node))
559 class LUDestroyCluster(NoHooksLU):
560 """Logical unit for destroying the cluster.
565 def CheckPrereq(self):
566 """Check prerequisites.
568 This checks whether the cluster is empty.
570 Any errors are signalled by raising errors.OpPrereqError.
573 master = self.cfg.GetMasterNode()
575 nodelist = self.cfg.GetNodeList()
576 if len(nodelist) != 1 or nodelist[0] != master:
577 raise errors.OpPrereqError("There are still %d node(s) in"
578 " this cluster." % (len(nodelist) - 1))
579 instancelist = self.cfg.GetInstanceList()
581 raise errors.OpPrereqError("There are still %d instance(s) in"
582 " this cluster." % len(instancelist))
584 def Exec(self, feedback_fn):
585 """Destroys the cluster.
588 master = self.cfg.GetMasterNode()
589 result = self.rpc.call_node_stop_master(master, False)
592 raise errors.OpExecError("Could not disable the master role")
593 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594 utils.CreateBackup(priv_key)
595 utils.CreateBackup(pub_key)
599 class LUVerifyCluster(LogicalUnit):
600 """Verifies the cluster status.
603 HPATH = "cluster-verify"
604 HTYPE = constants.HTYPE_CLUSTER
605 _OP_REQP = ["skip_checks"]
608 def ExpandNames(self):
609 self.needed_locks = {
610 locking.LEVEL_NODE: locking.ALL_SET,
611 locking.LEVEL_INSTANCE: locking.ALL_SET,
613 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
615 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616 node_result, feedback_fn, master_files):
617 """Run multiple tests against a node.
621 - compares ganeti version
622 - checks vg existance and size > 20G
623 - checks config file checksum
624 - checks ssh to other nodes
626 @type nodeinfo: L{objects.Node}
627 @param nodeinfo: the node to check
628 @param file_list: required list of files
629 @param local_cksum: dictionary of local files and their checksums
630 @param node_result: the results from the node
631 @param feedback_fn: function used to accumulate results
632 @param master_files: list of files that only masters should have
637 # main result, node_result should be a non-empty dict
638 if not node_result or not isinstance(node_result, dict):
639 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
642 # compares ganeti version
643 local_version = constants.PROTOCOL_VERSION
644 remote_version = node_result.get('version', None)
645 if not remote_version:
646 feedback_fn(" - ERROR: connection to %s failed" % (node))
649 if local_version != remote_version:
650 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
651 (local_version, node, remote_version))
654 # checks vg existance and size > 20G
657 vglist = node_result.get(constants.NV_VGLIST, None)
659 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
663 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664 constants.MIN_VG_SIZE)
666 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
669 # checks config file checksum
671 remote_cksum = node_result.get(constants.NV_FILELIST, None)
672 if not isinstance(remote_cksum, dict):
674 feedback_fn(" - ERROR: node hasn't returned file checksum data")
676 for file_name in file_list:
677 node_is_mc = nodeinfo.master_candidate
678 must_have_file = file_name not in master_files
679 if file_name not in remote_cksum:
680 if node_is_mc or must_have_file:
682 feedback_fn(" - ERROR: file '%s' missing" % file_name)
683 elif remote_cksum[file_name] != local_cksum[file_name]:
684 if node_is_mc or must_have_file:
686 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
688 # not candidate and this is not a must-have file
690 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
693 # all good, except non-master/non-must have combination
694 if not node_is_mc and not must_have_file:
695 feedback_fn(" - ERROR: file '%s' should not exist on non master"
696 " candidates" % file_name)
700 if constants.NV_NODELIST not in node_result:
702 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
704 if node_result[constants.NV_NODELIST]:
706 for node in node_result[constants.NV_NODELIST]:
707 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
708 (node, node_result[constants.NV_NODELIST][node]))
710 if constants.NV_NODENETTEST not in node_result:
712 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
714 if node_result[constants.NV_NODENETTEST]:
716 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
718 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
719 (node, node_result[constants.NV_NODENETTEST][node]))
721 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722 if isinstance(hyp_result, dict):
723 for hv_name, hv_result in hyp_result.iteritems():
724 if hv_result is not None:
725 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
726 (hv_name, hv_result))
729 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730 node_instance, feedback_fn, n_offline):
731 """Verify an instance.
733 This function checks to see if the required block devices are
734 available on the instance's node.
739 node_current = instanceconfig.primary_node
742 instanceconfig.MapLVsByNode(node_vol_should)
744 for node in node_vol_should:
745 if node in n_offline:
746 # ignore missing volumes on offline nodes
748 for volume in node_vol_should[node]:
749 if node not in node_vol_is or volume not in node_vol_is[node]:
750 feedback_fn(" - ERROR: volume %s missing on node %s" %
754 if not instanceconfig.status == 'down':
755 if ((node_current not in node_instance or
756 not instance in node_instance[node_current]) and
757 node_current not in n_offline):
758 feedback_fn(" - ERROR: instance %s not running on node %s" %
759 (instance, node_current))
762 for node in node_instance:
763 if (not node == node_current):
764 if instance in node_instance[node]:
765 feedback_fn(" - ERROR: instance %s should not run on node %s" %
771 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772 """Verify if there are any unknown volumes in the cluster.
774 The .os, .swap and backup volumes are ignored. All other volumes are
780 for node in node_vol_is:
781 for volume in node_vol_is[node]:
782 if node not in node_vol_should or volume not in node_vol_should[node]:
783 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
788 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789 """Verify the list of running instances.
791 This checks what instances are running but unknown to the cluster.
795 for node in node_instance:
796 for runninginstance in node_instance[node]:
797 if runninginstance not in instancelist:
798 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
799 (runninginstance, node))
803 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804 """Verify N+1 Memory Resilience.
806 Check that if one single node dies we can still start all the instances it
812 for node, nodeinfo in node_info.iteritems():
813 # This code checks that every node which is now listed as secondary has
814 # enough memory to host all instances it is supposed to should a single
815 # other node in the cluster fail.
816 # FIXME: not ready for failover to an arbitrary node
817 # FIXME: does not support file-backed instances
818 # WARNING: we currently take into account down instances as well as up
819 # ones, considering that even if they're down someone might want to start
820 # them even in the event of a node failure.
821 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
823 for instance in instances:
824 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825 if bep[constants.BE_AUTO_BALANCE]:
826 needed_mem += bep[constants.BE_MEMORY]
827 if nodeinfo['mfree'] < needed_mem:
828 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
829 " failovers should node %s fail" % (node, prinode))
833 def CheckPrereq(self):
834 """Check prerequisites.
836 Transform the list of checks we're going to skip into a set and check that
837 all its members are valid.
840 self.skip_set = frozenset(self.op.skip_checks)
841 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842 raise errors.OpPrereqError("Invalid checks to be skipped specified")
844 def BuildHooksEnv(self):
847 Cluster-Verify hooks just rone in the post phase and their failure makes
848 the output be logged in the verify output and the verification to fail.
851 all_nodes = self.cfg.GetNodeList()
852 # TODO: populate the environment with useful information for verify hooks
854 return env, [], all_nodes
856 def Exec(self, feedback_fn):
857 """Verify integrity of cluster, performing various test on nodes.
861 feedback_fn("* Verifying global settings")
862 for msg in self.cfg.VerifyConfig():
863 feedback_fn(" - ERROR: %s" % msg)
865 vg_name = self.cfg.GetVGName()
866 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867 nodelist = utils.NiceSort(self.cfg.GetNodeList())
868 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870 i_non_redundant = [] # Non redundant instances
871 i_non_a_balanced = [] # Non auto-balanced instances
872 n_offline = [] # List of offline nodes
878 # FIXME: verify OS list
880 master_files = [constants.CLUSTER_CONF_FILE]
882 file_names = ssconf.SimpleStore().GetFileList()
883 file_names.append(constants.SSL_CERT_FILE)
884 file_names.append(constants.RAPI_CERT_FILE)
885 file_names.extend(master_files)
887 local_checksums = utils.FingerprintFiles(file_names)
889 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
890 node_verify_param = {
891 constants.NV_FILELIST: file_names,
892 constants.NV_NODELIST: nodelist,
893 constants.NV_HYPERVISOR: hypervisors,
894 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
895 node.secondary_ip) for node in nodeinfo],
896 constants.NV_LVLIST: vg_name,
897 constants.NV_INSTANCELIST: hypervisors,
898 constants.NV_VGLIST: None,
899 constants.NV_VERSION: None,
900 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
902 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
903 self.cfg.GetClusterName())
905 cluster = self.cfg.GetClusterInfo()
906 master_node = self.cfg.GetMasterNode()
907 for node_i in nodeinfo:
909 nresult = all_nvinfo[node].data
912 feedback_fn("* Skipping offline node %s" % (node,))
913 n_offline.append(node)
916 if node == master_node:
918 elif node_i.master_candidate:
919 ntype = "master candidate"
922 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
924 if all_nvinfo[node].failed or not isinstance(nresult, dict):
925 feedback_fn(" - ERROR: connection to %s failed" % (node,))
929 result = self._VerifyNode(node_i, file_names, local_checksums,
930 nresult, feedback_fn, master_files)
933 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
934 if isinstance(lvdata, basestring):
935 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
936 (node, lvdata.encode('string_escape')))
938 node_volume[node] = {}
939 elif not isinstance(lvdata, dict):
940 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
944 node_volume[node] = lvdata
947 idata = nresult.get(constants.NV_INSTANCELIST, None)
948 if not isinstance(idata, list):
949 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
954 node_instance[node] = idata
957 nodeinfo = nresult.get(constants.NV_HVINFO, None)
958 if not isinstance(nodeinfo, dict):
959 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
965 "mfree": int(nodeinfo['memory_free']),
966 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
969 # dictionary holding all instances this node is secondary for,
970 # grouped by their primary node. Each key is a cluster node, and each
971 # value is a list of instances which have the key as primary and the
972 # current node as secondary. this is handy to calculate N+1 memory
973 # availability if you can only failover from a primary to its
975 "sinst-by-pnode": {},
978 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
984 for instance in instancelist:
985 feedback_fn("* Verifying instance %s" % instance)
986 inst_config = self.cfg.GetInstanceInfo(instance)
987 result = self._VerifyInstance(instance, inst_config, node_volume,
988 node_instance, feedback_fn, n_offline)
990 inst_nodes_offline = []
992 inst_config.MapLVsByNode(node_vol_should)
994 instance_cfg[instance] = inst_config
996 pnode = inst_config.primary_node
997 if pnode in node_info:
998 node_info[pnode]['pinst'].append(instance)
999 elif pnode not in n_offline:
1000 feedback_fn(" - ERROR: instance %s, connection to primary node"
1001 " %s failed" % (instance, pnode))
1004 if pnode in n_offline:
1005 inst_nodes_offline.append(pnode)
1007 # If the instance is non-redundant we cannot survive losing its primary
1008 # node, so we are not N+1 compliant. On the other hand we have no disk
1009 # templates with more than one secondary so that situation is not well
1011 # FIXME: does not support file-backed instances
1012 if len(inst_config.secondary_nodes) == 0:
1013 i_non_redundant.append(instance)
1014 elif len(inst_config.secondary_nodes) > 1:
1015 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1018 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1019 i_non_a_balanced.append(instance)
1021 for snode in inst_config.secondary_nodes:
1022 if snode in node_info:
1023 node_info[snode]['sinst'].append(instance)
1024 if pnode not in node_info[snode]['sinst-by-pnode']:
1025 node_info[snode]['sinst-by-pnode'][pnode] = []
1026 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1027 elif snode not in n_offline:
1028 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1029 " %s failed" % (instance, snode))
1031 if snode in n_offline:
1032 inst_nodes_offline.append(snode)
1034 if inst_nodes_offline:
1035 # warn that the instance lives on offline nodes, and set bad=True
1036 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1037 ", ".join(inst_nodes_offline))
1040 feedback_fn("* Verifying orphan volumes")
1041 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1045 feedback_fn("* Verifying remaining instances")
1046 result = self._VerifyOrphanInstances(instancelist, node_instance,
1050 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1051 feedback_fn("* Verifying N+1 Memory redundancy")
1052 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1055 feedback_fn("* Other Notes")
1057 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1058 % len(i_non_redundant))
1060 if i_non_a_balanced:
1061 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1062 % len(i_non_a_balanced))
1065 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1069 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1070 """Analize the post-hooks' result
1072 This method analyses the hook result, handles it, and sends some
1073 nicely-formatted feedback back to the user.
1075 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1076 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1077 @param hooks_results: the results of the multi-node hooks rpc call
1078 @param feedback_fn: function used send feedback back to the caller
1079 @param lu_result: previous Exec result
1080 @return: the new Exec result, based on the previous result
1084 # We only really run POST phase hooks, and are only interested in
1086 if phase == constants.HOOKS_PHASE_POST:
1087 # Used to change hooks' output to proper indentation
1088 indent_re = re.compile('^', re.M)
1089 feedback_fn("* Hooks Results")
1090 if not hooks_results:
1091 feedback_fn(" - ERROR: general communication failure")
1094 for node_name in hooks_results:
1095 show_node_header = True
1096 res = hooks_results[node_name]
1097 if res.failed or res.data is False or not isinstance(res.data, list):
1099 # no need to warn or set fail return value
1101 feedback_fn(" Communication failure in hooks execution")
1104 for script, hkr, output in res.data:
1105 if hkr == constants.HKR_FAIL:
1106 # The node header is only shown once, if there are
1107 # failing hooks on that node
1108 if show_node_header:
1109 feedback_fn(" Node %s:" % node_name)
1110 show_node_header = False
1111 feedback_fn(" ERROR: Script %s failed, output:" % script)
1112 output = indent_re.sub(' ', output)
1113 feedback_fn("%s" % output)
1119 class LUVerifyDisks(NoHooksLU):
1120 """Verifies the cluster disks status.
1126 def ExpandNames(self):
1127 self.needed_locks = {
1128 locking.LEVEL_NODE: locking.ALL_SET,
1129 locking.LEVEL_INSTANCE: locking.ALL_SET,
1131 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1133 def CheckPrereq(self):
1134 """Check prerequisites.
1136 This has no prerequisites.
1141 def Exec(self, feedback_fn):
1142 """Verify integrity of cluster disks.
1145 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1147 vg_name = self.cfg.GetVGName()
1148 nodes = utils.NiceSort(self.cfg.GetNodeList())
1149 instances = [self.cfg.GetInstanceInfo(name)
1150 for name in self.cfg.GetInstanceList()]
1153 for inst in instances:
1155 if (inst.status != "up" or
1156 inst.disk_template not in constants.DTS_NET_MIRROR):
1158 inst.MapLVsByNode(inst_lvs)
1159 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1160 for node, vol_list in inst_lvs.iteritems():
1161 for vol in vol_list:
1162 nv_dict[(node, vol)] = inst
1167 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1172 lvs = node_lvs[node]
1175 self.LogWarning("Connection to node %s failed: %s" %
1179 if isinstance(lvs, basestring):
1180 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1181 res_nlvm[node] = lvs
1182 elif not isinstance(lvs, dict):
1183 logging.warning("Connection to node %s failed or invalid data"
1185 res_nodes.append(node)
1188 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1189 inst = nv_dict.pop((node, lv_name), None)
1190 if (not lv_online and inst is not None
1191 and inst.name not in res_instances):
1192 res_instances.append(inst.name)
1194 # any leftover items in nv_dict are missing LVs, let's arrange the
1196 for key, inst in nv_dict.iteritems():
1197 if inst.name not in res_missing:
1198 res_missing[inst.name] = []
1199 res_missing[inst.name].append(key)
1204 class LURenameCluster(LogicalUnit):
1205 """Rename the cluster.
1208 HPATH = "cluster-rename"
1209 HTYPE = constants.HTYPE_CLUSTER
1212 def BuildHooksEnv(self):
1217 "OP_TARGET": self.cfg.GetClusterName(),
1218 "NEW_NAME": self.op.name,
1220 mn = self.cfg.GetMasterNode()
1221 return env, [mn], [mn]
1223 def CheckPrereq(self):
1224 """Verify that the passed name is a valid one.
1227 hostname = utils.HostInfo(self.op.name)
1229 new_name = hostname.name
1230 self.ip = new_ip = hostname.ip
1231 old_name = self.cfg.GetClusterName()
1232 old_ip = self.cfg.GetMasterIP()
1233 if new_name == old_name and new_ip == old_ip:
1234 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1235 " cluster has changed")
1236 if new_ip != old_ip:
1237 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1238 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1239 " reachable on the network. Aborting." %
1242 self.op.name = new_name
1244 def Exec(self, feedback_fn):
1245 """Rename the cluster.
1248 clustername = self.op.name
1251 # shutdown the master IP
1252 master = self.cfg.GetMasterNode()
1253 result = self.rpc.call_node_stop_master(master, False)
1254 if result.failed or not result.data:
1255 raise errors.OpExecError("Could not disable the master role")
1258 cluster = self.cfg.GetClusterInfo()
1259 cluster.cluster_name = clustername
1260 cluster.master_ip = ip
1261 self.cfg.Update(cluster)
1263 # update the known hosts file
1264 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1265 node_list = self.cfg.GetNodeList()
1267 node_list.remove(master)
1270 result = self.rpc.call_upload_file(node_list,
1271 constants.SSH_KNOWN_HOSTS_FILE)
1272 for to_node, to_result in result.iteritems():
1273 if to_result.failed or not to_result.data:
1274 logging.error("Copy of file %s to node %s failed",
1275 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1278 result = self.rpc.call_node_start_master(master, False)
1279 if result.failed or not result.data:
1280 self.LogWarning("Could not re-enable the master role on"
1281 " the master, please restart manually.")
1284 def _RecursiveCheckIfLVMBased(disk):
1285 """Check if the given disk or its children are lvm-based.
1287 @type disk: L{objects.Disk}
1288 @param disk: the disk to check
1290 @return: boolean indicating whether a LD_LV dev_type was found or not
1294 for chdisk in disk.children:
1295 if _RecursiveCheckIfLVMBased(chdisk):
1297 return disk.dev_type == constants.LD_LV
1300 class LUSetClusterParams(LogicalUnit):
1301 """Change the parameters of the cluster.
1304 HPATH = "cluster-modify"
1305 HTYPE = constants.HTYPE_CLUSTER
1309 def CheckParameters(self):
1313 if not hasattr(self.op, "candidate_pool_size"):
1314 self.op.candidate_pool_size = None
1315 if self.op.candidate_pool_size is not None:
1317 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1318 except ValueError, err:
1319 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1321 if self.op.candidate_pool_size < 1:
1322 raise errors.OpPrereqError("At least one master candidate needed")
1324 def ExpandNames(self):
1325 # FIXME: in the future maybe other cluster params won't require checking on
1326 # all nodes to be modified.
1327 self.needed_locks = {
1328 locking.LEVEL_NODE: locking.ALL_SET,
1330 self.share_locks[locking.LEVEL_NODE] = 1
1332 def BuildHooksEnv(self):
1337 "OP_TARGET": self.cfg.GetClusterName(),
1338 "NEW_VG_NAME": self.op.vg_name,
1340 mn = self.cfg.GetMasterNode()
1341 return env, [mn], [mn]
1343 def CheckPrereq(self):
1344 """Check prerequisites.
1346 This checks whether the given params don't conflict and
1347 if the given volume group is valid.
1350 # FIXME: This only works because there is only one parameter that can be
1351 # changed or removed.
1352 if self.op.vg_name is not None and not self.op.vg_name:
1353 instances = self.cfg.GetAllInstancesInfo().values()
1354 for inst in instances:
1355 for disk in inst.disks:
1356 if _RecursiveCheckIfLVMBased(disk):
1357 raise errors.OpPrereqError("Cannot disable lvm storage while"
1358 " lvm-based instances exist")
1360 node_list = self.acquired_locks[locking.LEVEL_NODE]
1362 # if vg_name not None, checks given volume group on all nodes
1364 vglist = self.rpc.call_vg_list(node_list)
1365 for node in node_list:
1366 if vglist[node].failed:
1367 # ignoring down node
1368 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1370 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1372 constants.MIN_VG_SIZE)
1374 raise errors.OpPrereqError("Error on node '%s': %s" %
1377 self.cluster = cluster = self.cfg.GetClusterInfo()
1378 # validate beparams changes
1379 if self.op.beparams:
1380 utils.CheckBEParams(self.op.beparams)
1381 self.new_beparams = cluster.FillDict(
1382 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1384 # hypervisor list/parameters
1385 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1386 if self.op.hvparams:
1387 if not isinstance(self.op.hvparams, dict):
1388 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1389 for hv_name, hv_dict in self.op.hvparams.items():
1390 if hv_name not in self.new_hvparams:
1391 self.new_hvparams[hv_name] = hv_dict
1393 self.new_hvparams[hv_name].update(hv_dict)
1395 if self.op.enabled_hypervisors is not None:
1396 self.hv_list = self.op.enabled_hypervisors
1398 self.hv_list = cluster.enabled_hypervisors
1400 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1401 # either the enabled list has changed, or the parameters have, validate
1402 for hv_name, hv_params in self.new_hvparams.items():
1403 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1404 (self.op.enabled_hypervisors and
1405 hv_name in self.op.enabled_hypervisors)):
1406 # either this is a new hypervisor, or its parameters have changed
1407 hv_class = hypervisor.GetHypervisor(hv_name)
1408 hv_class.CheckParameterSyntax(hv_params)
1409 _CheckHVParams(self, node_list, hv_name, hv_params)
1411 def Exec(self, feedback_fn):
1412 """Change the parameters of the cluster.
1415 if self.op.vg_name is not None:
1416 if self.op.vg_name != self.cfg.GetVGName():
1417 self.cfg.SetVGName(self.op.vg_name)
1419 feedback_fn("Cluster LVM configuration already in desired"
1420 " state, not changing")
1421 if self.op.hvparams:
1422 self.cluster.hvparams = self.new_hvparams
1423 if self.op.enabled_hypervisors is not None:
1424 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1425 if self.op.beparams:
1426 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1427 if self.op.candidate_pool_size is not None:
1428 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1430 self.cfg.Update(self.cluster)
1432 # we want to update nodes after the cluster so that if any errors
1433 # happen, we have recorded and saved the cluster info
1434 if self.op.candidate_pool_size is not None:
1435 _AdjustCandidatePool(self)
1438 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1439 """Sleep and poll for an instance's disk to sync.
1442 if not instance.disks:
1446 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1448 node = instance.primary_node
1450 for dev in instance.disks:
1451 lu.cfg.SetDiskID(dev, node)
1457 cumul_degraded = False
1458 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1459 if rstats.failed or not rstats.data:
1460 lu.LogWarning("Can't get any data from node %s", node)
1463 raise errors.RemoteError("Can't contact node %s for mirror data,"
1464 " aborting." % node)
1467 rstats = rstats.data
1469 for i in range(len(rstats)):
1472 lu.LogWarning("Can't compute data for node %s/%s",
1473 node, instance.disks[i].iv_name)
1475 # we ignore the ldisk parameter
1476 perc_done, est_time, is_degraded, _ = mstat
1477 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1478 if perc_done is not None:
1480 if est_time is not None:
1481 rem_time = "%d estimated seconds remaining" % est_time
1484 rem_time = "no time estimate"
1485 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1486 (instance.disks[i].iv_name, perc_done, rem_time))
1490 time.sleep(min(60, max_time))
1493 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1494 return not cumul_degraded
1497 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1498 """Check that mirrors are not degraded.
1500 The ldisk parameter, if True, will change the test from the
1501 is_degraded attribute (which represents overall non-ok status for
1502 the device(s)) to the ldisk (representing the local storage status).
1505 lu.cfg.SetDiskID(dev, node)
1512 if on_primary or dev.AssembleOnSecondary():
1513 rstats = lu.rpc.call_blockdev_find(node, dev)
1514 if rstats.failed or not rstats.data:
1515 logging.warning("Node %s: disk degraded, not found or node down", node)
1518 result = result and (not rstats.data[idx])
1520 for child in dev.children:
1521 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1526 class LUDiagnoseOS(NoHooksLU):
1527 """Logical unit for OS diagnose/query.
1530 _OP_REQP = ["output_fields", "names"]
1532 _FIELDS_STATIC = utils.FieldSet()
1533 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1535 def ExpandNames(self):
1537 raise errors.OpPrereqError("Selective OS query not supported")
1539 _CheckOutputFields(static=self._FIELDS_STATIC,
1540 dynamic=self._FIELDS_DYNAMIC,
1541 selected=self.op.output_fields)
1543 # Lock all nodes, in shared mode
1544 self.needed_locks = {}
1545 self.share_locks[locking.LEVEL_NODE] = 1
1546 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1548 def CheckPrereq(self):
1549 """Check prerequisites.
1554 def _DiagnoseByOS(node_list, rlist):
1555 """Remaps a per-node return list into an a per-os per-node dictionary
1557 @param node_list: a list with the names of all nodes
1558 @param rlist: a map with node names as keys and OS objects as values
1561 @returns: a dictionary with osnames as keys and as value another map, with
1562 nodes as keys and list of OS objects as values, eg::
1564 {"debian-etch": {"node1": [<object>,...],
1565 "node2": [<object>,]}
1570 for node_name, nr in rlist.iteritems():
1571 if nr.failed or not nr.data:
1573 for os_obj in nr.data:
1574 if os_obj.name not in all_os:
1575 # build a list of nodes for this os containing empty lists
1576 # for each node in node_list
1577 all_os[os_obj.name] = {}
1578 for nname in node_list:
1579 all_os[os_obj.name][nname] = []
1580 all_os[os_obj.name][node_name].append(os_obj)
1583 def Exec(self, feedback_fn):
1584 """Compute the list of OSes.
1587 node_list = self.acquired_locks[locking.LEVEL_NODE]
1588 node_data = self.rpc.call_os_diagnose(node_list)
1589 if node_data == False:
1590 raise errors.OpExecError("Can't gather the list of OSes")
1591 pol = self._DiagnoseByOS(node_list, node_data)
1593 for os_name, os_data in pol.iteritems():
1595 for field in self.op.output_fields:
1598 elif field == "valid":
1599 val = utils.all([osl and osl[0] for osl in os_data.values()])
1600 elif field == "node_status":
1602 for node_name, nos_list in os_data.iteritems():
1603 val[node_name] = [(v.status, v.path) for v in nos_list]
1605 raise errors.ParameterError(field)
1612 class LURemoveNode(LogicalUnit):
1613 """Logical unit for removing a node.
1616 HPATH = "node-remove"
1617 HTYPE = constants.HTYPE_NODE
1618 _OP_REQP = ["node_name"]
1620 def BuildHooksEnv(self):
1623 This doesn't run on the target node in the pre phase as a failed
1624 node would then be impossible to remove.
1628 "OP_TARGET": self.op.node_name,
1629 "NODE_NAME": self.op.node_name,
1631 all_nodes = self.cfg.GetNodeList()
1632 all_nodes.remove(self.op.node_name)
1633 return env, all_nodes, all_nodes
1635 def CheckPrereq(self):
1636 """Check prerequisites.
1639 - the node exists in the configuration
1640 - it does not have primary or secondary instances
1641 - it's not the master
1643 Any errors are signalled by raising errors.OpPrereqError.
1646 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1648 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1650 instance_list = self.cfg.GetInstanceList()
1652 masternode = self.cfg.GetMasterNode()
1653 if node.name == masternode:
1654 raise errors.OpPrereqError("Node is the master node,"
1655 " you need to failover first.")
1657 for instance_name in instance_list:
1658 instance = self.cfg.GetInstanceInfo(instance_name)
1659 if node.name == instance.primary_node:
1660 raise errors.OpPrereqError("Instance %s still running on the node,"
1661 " please remove first." % instance_name)
1662 if node.name in instance.secondary_nodes:
1663 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1664 " please remove first." % instance_name)
1665 self.op.node_name = node.name
1668 def Exec(self, feedback_fn):
1669 """Removes the node from the cluster.
1673 logging.info("Stopping the node daemon and removing configs from node %s",
1676 self.context.RemoveNode(node.name)
1678 self.rpc.call_node_leave_cluster(node.name)
1680 # Promote nodes to master candidate as needed
1681 _AdjustCandidatePool(self)
1684 class LUQueryNodes(NoHooksLU):
1685 """Logical unit for querying nodes.
1688 _OP_REQP = ["output_fields", "names"]
1690 _FIELDS_DYNAMIC = utils.FieldSet(
1692 "mtotal", "mnode", "mfree",
1697 _FIELDS_STATIC = utils.FieldSet(
1698 "name", "pinst_cnt", "sinst_cnt",
1699 "pinst_list", "sinst_list",
1700 "pip", "sip", "tags",
1707 def ExpandNames(self):
1708 _CheckOutputFields(static=self._FIELDS_STATIC,
1709 dynamic=self._FIELDS_DYNAMIC,
1710 selected=self.op.output_fields)
1712 self.needed_locks = {}
1713 self.share_locks[locking.LEVEL_NODE] = 1
1716 self.wanted = _GetWantedNodes(self, self.op.names)
1718 self.wanted = locking.ALL_SET
1720 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1722 # if we don't request only static fields, we need to lock the nodes
1723 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1726 def CheckPrereq(self):
1727 """Check prerequisites.
1730 # The validation of the node list is done in the _GetWantedNodes,
1731 # if non empty, and if empty, there's no validation to do
1734 def Exec(self, feedback_fn):
1735 """Computes the list of nodes and their attributes.
1738 all_info = self.cfg.GetAllNodesInfo()
1740 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1741 elif self.wanted != locking.ALL_SET:
1742 nodenames = self.wanted
1743 missing = set(nodenames).difference(all_info.keys())
1745 raise errors.OpExecError(
1746 "Some nodes were removed before retrieving their data: %s" % missing)
1748 nodenames = all_info.keys()
1750 nodenames = utils.NiceSort(nodenames)
1751 nodelist = [all_info[name] for name in nodenames]
1753 # begin data gathering
1757 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1758 self.cfg.GetHypervisorType())
1759 for name in nodenames:
1760 nodeinfo = node_data[name]
1761 if not nodeinfo.failed and nodeinfo.data:
1762 nodeinfo = nodeinfo.data
1763 fn = utils.TryConvert
1765 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1766 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1767 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1768 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1769 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1770 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1771 "bootid": nodeinfo.get('bootid', None),
1774 live_data[name] = {}
1776 live_data = dict.fromkeys(nodenames, {})
1778 node_to_primary = dict([(name, set()) for name in nodenames])
1779 node_to_secondary = dict([(name, set()) for name in nodenames])
1781 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1782 "sinst_cnt", "sinst_list"))
1783 if inst_fields & frozenset(self.op.output_fields):
1784 instancelist = self.cfg.GetInstanceList()
1786 for instance_name in instancelist:
1787 inst = self.cfg.GetInstanceInfo(instance_name)
1788 if inst.primary_node in node_to_primary:
1789 node_to_primary[inst.primary_node].add(inst.name)
1790 for secnode in inst.secondary_nodes:
1791 if secnode in node_to_secondary:
1792 node_to_secondary[secnode].add(inst.name)
1794 master_node = self.cfg.GetMasterNode()
1796 # end data gathering
1799 for node in nodelist:
1801 for field in self.op.output_fields:
1804 elif field == "pinst_list":
1805 val = list(node_to_primary[node.name])
1806 elif field == "sinst_list":
1807 val = list(node_to_secondary[node.name])
1808 elif field == "pinst_cnt":
1809 val = len(node_to_primary[node.name])
1810 elif field == "sinst_cnt":
1811 val = len(node_to_secondary[node.name])
1812 elif field == "pip":
1813 val = node.primary_ip
1814 elif field == "sip":
1815 val = node.secondary_ip
1816 elif field == "tags":
1817 val = list(node.GetTags())
1818 elif field == "serial_no":
1819 val = node.serial_no
1820 elif field == "master_candidate":
1821 val = node.master_candidate
1822 elif field == "master":
1823 val = node.name == master_node
1824 elif field == "offline":
1826 elif self._FIELDS_DYNAMIC.Matches(field):
1827 val = live_data[node.name].get(field, None)
1829 raise errors.ParameterError(field)
1830 node_output.append(val)
1831 output.append(node_output)
1836 class LUQueryNodeVolumes(NoHooksLU):
1837 """Logical unit for getting volumes on node(s).
1840 _OP_REQP = ["nodes", "output_fields"]
1842 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1843 _FIELDS_STATIC = utils.FieldSet("node")
1845 def ExpandNames(self):
1846 _CheckOutputFields(static=self._FIELDS_STATIC,
1847 dynamic=self._FIELDS_DYNAMIC,
1848 selected=self.op.output_fields)
1850 self.needed_locks = {}
1851 self.share_locks[locking.LEVEL_NODE] = 1
1852 if not self.op.nodes:
1853 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1855 self.needed_locks[locking.LEVEL_NODE] = \
1856 _GetWantedNodes(self, self.op.nodes)
1858 def CheckPrereq(self):
1859 """Check prerequisites.
1861 This checks that the fields required are valid output fields.
1864 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1866 def Exec(self, feedback_fn):
1867 """Computes the list of nodes and their attributes.
1870 nodenames = self.nodes
1871 volumes = self.rpc.call_node_volumes(nodenames)
1873 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1874 in self.cfg.GetInstanceList()]
1876 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1879 for node in nodenames:
1880 if node not in volumes or volumes[node].failed or not volumes[node].data:
1883 node_vols = volumes[node].data[:]
1884 node_vols.sort(key=lambda vol: vol['dev'])
1886 for vol in node_vols:
1888 for field in self.op.output_fields:
1891 elif field == "phys":
1895 elif field == "name":
1897 elif field == "size":
1898 val = int(float(vol['size']))
1899 elif field == "instance":
1901 if node not in lv_by_node[inst]:
1903 if vol['name'] in lv_by_node[inst][node]:
1909 raise errors.ParameterError(field)
1910 node_output.append(str(val))
1912 output.append(node_output)
1917 class LUAddNode(LogicalUnit):
1918 """Logical unit for adding node to the cluster.
1922 HTYPE = constants.HTYPE_NODE
1923 _OP_REQP = ["node_name"]
1925 def BuildHooksEnv(self):
1928 This will run on all nodes before, and on all nodes + the new node after.
1932 "OP_TARGET": self.op.node_name,
1933 "NODE_NAME": self.op.node_name,
1934 "NODE_PIP": self.op.primary_ip,
1935 "NODE_SIP": self.op.secondary_ip,
1937 nodes_0 = self.cfg.GetNodeList()
1938 nodes_1 = nodes_0 + [self.op.node_name, ]
1939 return env, nodes_0, nodes_1
1941 def CheckPrereq(self):
1942 """Check prerequisites.
1945 - the new node is not already in the config
1947 - its parameters (single/dual homed) matches the cluster
1949 Any errors are signalled by raising errors.OpPrereqError.
1952 node_name = self.op.node_name
1955 dns_data = utils.HostInfo(node_name)
1957 node = dns_data.name
1958 primary_ip = self.op.primary_ip = dns_data.ip
1959 secondary_ip = getattr(self.op, "secondary_ip", None)
1960 if secondary_ip is None:
1961 secondary_ip = primary_ip
1962 if not utils.IsValidIP(secondary_ip):
1963 raise errors.OpPrereqError("Invalid secondary IP given")
1964 self.op.secondary_ip = secondary_ip
1966 node_list = cfg.GetNodeList()
1967 if not self.op.readd and node in node_list:
1968 raise errors.OpPrereqError("Node %s is already in the configuration" %
1970 elif self.op.readd and node not in node_list:
1971 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1973 for existing_node_name in node_list:
1974 existing_node = cfg.GetNodeInfo(existing_node_name)
1976 if self.op.readd and node == existing_node_name:
1977 if (existing_node.primary_ip != primary_ip or
1978 existing_node.secondary_ip != secondary_ip):
1979 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1980 " address configuration as before")
1983 if (existing_node.primary_ip == primary_ip or
1984 existing_node.secondary_ip == primary_ip or
1985 existing_node.primary_ip == secondary_ip or
1986 existing_node.secondary_ip == secondary_ip):
1987 raise errors.OpPrereqError("New node ip address(es) conflict with"
1988 " existing node %s" % existing_node.name)
1990 # check that the type of the node (single versus dual homed) is the
1991 # same as for the master
1992 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1993 master_singlehomed = myself.secondary_ip == myself.primary_ip
1994 newbie_singlehomed = secondary_ip == primary_ip
1995 if master_singlehomed != newbie_singlehomed:
1996 if master_singlehomed:
1997 raise errors.OpPrereqError("The master has no private ip but the"
1998 " new node has one")
2000 raise errors.OpPrereqError("The master has a private ip but the"
2001 " new node doesn't have one")
2003 # checks reachablity
2004 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2005 raise errors.OpPrereqError("Node not reachable by ping")
2007 if not newbie_singlehomed:
2008 # check reachability from my secondary ip to newbie's secondary ip
2009 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2010 source=myself.secondary_ip):
2011 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2012 " based ping to noded port")
2014 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2015 mc_now, _ = self.cfg.GetMasterCandidateStats()
2016 master_candidate = mc_now < cp_size
2018 self.new_node = objects.Node(name=node,
2019 primary_ip=primary_ip,
2020 secondary_ip=secondary_ip,
2021 master_candidate=master_candidate,
2024 def Exec(self, feedback_fn):
2025 """Adds the new node to the cluster.
2028 new_node = self.new_node
2029 node = new_node.name
2031 # check connectivity
2032 result = self.rpc.call_version([node])[node]
2035 if constants.PROTOCOL_VERSION == result.data:
2036 logging.info("Communication to node %s fine, sw version %s match",
2039 raise errors.OpExecError("Version mismatch master version %s,"
2040 " node version %s" %
2041 (constants.PROTOCOL_VERSION, result.data))
2043 raise errors.OpExecError("Cannot get version from the new node")
2046 logging.info("Copy ssh key to node %s", node)
2047 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2049 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2050 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2056 keyarray.append(f.read())
2060 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2062 keyarray[3], keyarray[4], keyarray[5])
2064 if result.failed or not result.data:
2065 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2067 # Add node to our /etc/hosts, and add key to known_hosts
2068 utils.AddHostToEtcHosts(new_node.name)
2070 if new_node.secondary_ip != new_node.primary_ip:
2071 result = self.rpc.call_node_has_ip_address(new_node.name,
2072 new_node.secondary_ip)
2073 if result.failed or not result.data:
2074 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2075 " you gave (%s). Please fix and re-run this"
2076 " command." % new_node.secondary_ip)
2078 node_verify_list = [self.cfg.GetMasterNode()]
2079 node_verify_param = {
2081 # TODO: do a node-net-test as well?
2084 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2085 self.cfg.GetClusterName())
2086 for verifier in node_verify_list:
2087 if result[verifier].failed or not result[verifier].data:
2088 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2089 " for remote verification" % verifier)
2090 if result[verifier].data['nodelist']:
2091 for failed in result[verifier].data['nodelist']:
2092 feedback_fn("ssh/hostname verification failed %s -> %s" %
2093 (verifier, result[verifier]['nodelist'][failed]))
2094 raise errors.OpExecError("ssh/hostname verification failed.")
2096 # Distribute updated /etc/hosts and known_hosts to all nodes,
2097 # including the node just added
2098 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2099 dist_nodes = self.cfg.GetNodeList()
2100 if not self.op.readd:
2101 dist_nodes.append(node)
2102 if myself.name in dist_nodes:
2103 dist_nodes.remove(myself.name)
2105 logging.debug("Copying hosts and known_hosts to all nodes")
2106 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2107 result = self.rpc.call_upload_file(dist_nodes, fname)
2108 for to_node, to_result in result.iteritems():
2109 if to_result.failed or not to_result.data:
2110 logging.error("Copy of file %s to node %s failed", fname, to_node)
2113 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2114 to_copy.append(constants.VNC_PASSWORD_FILE)
2115 for fname in to_copy:
2116 result = self.rpc.call_upload_file([node], fname)
2117 if result[node].failed or not result[node]:
2118 logging.error("Could not copy file %s to node %s", fname, node)
2121 self.context.ReaddNode(new_node)
2123 self.context.AddNode(new_node)
2126 class LUSetNodeParams(LogicalUnit):
2127 """Modifies the parameters of a node.
2130 HPATH = "node-modify"
2131 HTYPE = constants.HTYPE_NODE
2132 _OP_REQP = ["node_name"]
2135 def CheckArguments(self):
2136 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2137 if node_name is None:
2138 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2139 self.op.node_name = node_name
2140 _CheckBooleanOpField(self.op, 'master_candidate')
2141 _CheckBooleanOpField(self.op, 'offline')
2142 if self.op.master_candidate is None and self.op.offline is None:
2143 raise errors.OpPrereqError("Please pass at least one modification")
2144 if self.op.offline == True and self.op.master_candidate == True:
2145 raise errors.OpPrereqError("Can't set the node into offline and"
2146 " master_candidate at the same time")
2148 def ExpandNames(self):
2149 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2151 def BuildHooksEnv(self):
2154 This runs on the master node.
2158 "OP_TARGET": self.op.node_name,
2159 "MASTER_CANDIDATE": str(self.op.master_candidate),
2160 "OFFLINE": str(self.op.offline),
2162 nl = [self.cfg.GetMasterNode(),
2166 def CheckPrereq(self):
2167 """Check prerequisites.
2169 This only checks the instance list against the existing names.
2172 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2174 if ((self.op.master_candidate == False or self.op.offline == True)
2175 and node.master_candidate):
2176 # we will demote the node from master_candidate
2177 if self.op.node_name == self.cfg.GetMasterNode():
2178 raise errors.OpPrereqError("The master node has to be a"
2179 " master candidate and online")
2180 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2181 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2182 if num_candidates <= cp_size:
2183 msg = ("Not enough master candidates (desired"
2184 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2186 self.LogWarning(msg)
2188 raise errors.OpPrereqError(msg)
2190 if (self.op.master_candidate == True and node.offline and
2191 not self.op.offline == False):
2192 raise errors.OpPrereqError("Can't set an offline node to"
2193 " master_candidate")
2197 def Exec(self, feedback_fn):
2205 if self.op.offline is not None:
2206 node.offline = self.op.offline
2207 result.append(("offline", str(self.op.offline)))
2208 if self.op.offline == True and node.master_candidate:
2209 node.master_candidate = False
2210 result.append(("master_candidate", "auto-demotion due to offline"))
2212 if self.op.master_candidate is not None:
2213 node.master_candidate = self.op.master_candidate
2214 result.append(("master_candidate", str(self.op.master_candidate)))
2215 if self.op.master_candidate == False:
2216 rrc = self.rpc.call_node_demote_from_mc(node.name)
2217 if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2218 or len(rrc.data) != 2):
2219 self.LogWarning("Node rpc error: %s" % rrc.error)
2220 elif not rrc.data[0]:
2221 self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2223 # this will trigger configuration file update, if needed
2224 self.cfg.Update(node)
2225 # this will trigger job queue propagation or cleanup
2226 if self.op.node_name != self.cfg.GetMasterNode():
2227 self.context.ReaddNode(node)
2232 class LUQueryClusterInfo(NoHooksLU):
2233 """Query cluster configuration.
2239 def ExpandNames(self):
2240 self.needed_locks = {}
2242 def CheckPrereq(self):
2243 """No prerequsites needed for this LU.
2248 def Exec(self, feedback_fn):
2249 """Return cluster config.
2252 cluster = self.cfg.GetClusterInfo()
2254 "software_version": constants.RELEASE_VERSION,
2255 "protocol_version": constants.PROTOCOL_VERSION,
2256 "config_version": constants.CONFIG_VERSION,
2257 "os_api_version": constants.OS_API_VERSION,
2258 "export_version": constants.EXPORT_VERSION,
2259 "architecture": (platform.architecture()[0], platform.machine()),
2260 "name": cluster.cluster_name,
2261 "master": cluster.master_node,
2262 "default_hypervisor": cluster.default_hypervisor,
2263 "enabled_hypervisors": cluster.enabled_hypervisors,
2264 "hvparams": cluster.hvparams,
2265 "beparams": cluster.beparams,
2266 "candidate_pool_size": cluster.candidate_pool_size,
2272 class LUQueryConfigValues(NoHooksLU):
2273 """Return configuration values.
2278 _FIELDS_DYNAMIC = utils.FieldSet()
2279 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2281 def ExpandNames(self):
2282 self.needed_locks = {}
2284 _CheckOutputFields(static=self._FIELDS_STATIC,
2285 dynamic=self._FIELDS_DYNAMIC,
2286 selected=self.op.output_fields)
2288 def CheckPrereq(self):
2289 """No prerequisites.
2294 def Exec(self, feedback_fn):
2295 """Dump a representation of the cluster config to the standard output.
2299 for field in self.op.output_fields:
2300 if field == "cluster_name":
2301 entry = self.cfg.GetClusterName()
2302 elif field == "master_node":
2303 entry = self.cfg.GetMasterNode()
2304 elif field == "drain_flag":
2305 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2307 raise errors.ParameterError(field)
2308 values.append(entry)
2312 class LUActivateInstanceDisks(NoHooksLU):
2313 """Bring up an instance's disks.
2316 _OP_REQP = ["instance_name"]
2319 def ExpandNames(self):
2320 self._ExpandAndLockInstance()
2321 self.needed_locks[locking.LEVEL_NODE] = []
2322 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2324 def DeclareLocks(self, level):
2325 if level == locking.LEVEL_NODE:
2326 self._LockInstancesNodes()
2328 def CheckPrereq(self):
2329 """Check prerequisites.
2331 This checks that the instance is in the cluster.
2334 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2335 assert self.instance is not None, \
2336 "Cannot retrieve locked instance %s" % self.op.instance_name
2337 _CheckNodeOnline(self, self.instance.primary_node)
2339 def Exec(self, feedback_fn):
2340 """Activate the disks.
2343 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2345 raise errors.OpExecError("Cannot activate block devices")
2350 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2351 """Prepare the block devices for an instance.
2353 This sets up the block devices on all nodes.
2355 @type lu: L{LogicalUnit}
2356 @param lu: the logical unit on whose behalf we execute
2357 @type instance: L{objects.Instance}
2358 @param instance: the instance for whose disks we assemble
2359 @type ignore_secondaries: boolean
2360 @param ignore_secondaries: if true, errors on secondary nodes
2361 won't result in an error return from the function
2362 @return: False if the operation failed, otherwise a list of
2363 (host, instance_visible_name, node_visible_name)
2364 with the mapping from node devices to instance devices
2369 iname = instance.name
2370 # With the two passes mechanism we try to reduce the window of
2371 # opportunity for the race condition of switching DRBD to primary
2372 # before handshaking occured, but we do not eliminate it
2374 # The proper fix would be to wait (with some limits) until the
2375 # connection has been made and drbd transitions from WFConnection
2376 # into any other network-connected state (Connected, SyncTarget,
2379 # 1st pass, assemble on all nodes in secondary mode
2380 for inst_disk in instance.disks:
2381 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2382 lu.cfg.SetDiskID(node_disk, node)
2383 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2384 if result.failed or not result:
2385 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2386 " (is_primary=False, pass=1)",
2387 inst_disk.iv_name, node)
2388 if not ignore_secondaries:
2391 # FIXME: race condition on drbd migration to primary
2393 # 2nd pass, do only the primary node
2394 for inst_disk in instance.disks:
2395 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2396 if node != instance.primary_node:
2398 lu.cfg.SetDiskID(node_disk, node)
2399 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2400 if result.failed or not result:
2401 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2402 " (is_primary=True, pass=2)",
2403 inst_disk.iv_name, node)
2405 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2407 # leave the disks configured for the primary node
2408 # this is a workaround that would be fixed better by
2409 # improving the logical/physical id handling
2410 for disk in instance.disks:
2411 lu.cfg.SetDiskID(disk, instance.primary_node)
2413 return disks_ok, device_info
2416 def _StartInstanceDisks(lu, instance, force):
2417 """Start the disks of an instance.
2420 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2421 ignore_secondaries=force)
2423 _ShutdownInstanceDisks(lu, instance)
2424 if force is not None and not force:
2425 lu.proc.LogWarning("", hint="If the message above refers to a"
2427 " you can retry the operation using '--force'.")
2428 raise errors.OpExecError("Disk consistency error")
2431 class LUDeactivateInstanceDisks(NoHooksLU):
2432 """Shutdown an instance's disks.
2435 _OP_REQP = ["instance_name"]
2438 def ExpandNames(self):
2439 self._ExpandAndLockInstance()
2440 self.needed_locks[locking.LEVEL_NODE] = []
2441 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2443 def DeclareLocks(self, level):
2444 if level == locking.LEVEL_NODE:
2445 self._LockInstancesNodes()
2447 def CheckPrereq(self):
2448 """Check prerequisites.
2450 This checks that the instance is in the cluster.
2453 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2454 assert self.instance is not None, \
2455 "Cannot retrieve locked instance %s" % self.op.instance_name
2457 def Exec(self, feedback_fn):
2458 """Deactivate the disks
2461 instance = self.instance
2462 _SafeShutdownInstanceDisks(self, instance)
2465 def _SafeShutdownInstanceDisks(lu, instance):
2466 """Shutdown block devices of an instance.
2468 This function checks if an instance is running, before calling
2469 _ShutdownInstanceDisks.
2472 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2473 [instance.hypervisor])
2474 ins_l = ins_l[instance.primary_node]
2475 if ins_l.failed or not isinstance(ins_l.data, list):
2476 raise errors.OpExecError("Can't contact node '%s'" %
2477 instance.primary_node)
2479 if instance.name in ins_l.data:
2480 raise errors.OpExecError("Instance is running, can't shutdown"
2483 _ShutdownInstanceDisks(lu, instance)
2486 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2487 """Shutdown block devices of an instance.
2489 This does the shutdown on all nodes of the instance.
2491 If the ignore_primary is false, errors on the primary node are
2496 for disk in instance.disks:
2497 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2498 lu.cfg.SetDiskID(top_disk, node)
2499 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2500 if result.failed or not result.data:
2501 logging.error("Could not shutdown block device %s on node %s",
2503 if not ignore_primary or node != instance.primary_node:
2508 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2509 """Checks if a node has enough free memory.
2511 This function check if a given node has the needed amount of free
2512 memory. In case the node has less memory or we cannot get the
2513 information from the node, this function raise an OpPrereqError
2516 @type lu: C{LogicalUnit}
2517 @param lu: a logical unit from which we get configuration data
2519 @param node: the node to check
2520 @type reason: C{str}
2521 @param reason: string to use in the error message
2522 @type requested: C{int}
2523 @param requested: the amount of memory in MiB to check for
2524 @type hypervisor_name: C{str}
2525 @param hypervisor_name: the hypervisor to ask for memory stats
2526 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2527 we cannot check the node
2530 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2531 nodeinfo[node].Raise()
2532 free_mem = nodeinfo[node].data.get('memory_free')
2533 if not isinstance(free_mem, int):
2534 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2535 " was '%s'" % (node, free_mem))
2536 if requested > free_mem:
2537 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2538 " needed %s MiB, available %s MiB" %
2539 (node, reason, requested, free_mem))
2542 class LUStartupInstance(LogicalUnit):
2543 """Starts an instance.
2546 HPATH = "instance-start"
2547 HTYPE = constants.HTYPE_INSTANCE
2548 _OP_REQP = ["instance_name", "force"]
2551 def ExpandNames(self):
2552 self._ExpandAndLockInstance()
2554 def BuildHooksEnv(self):
2557 This runs on master, primary and secondary nodes of the instance.
2561 "FORCE": self.op.force,
2563 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2564 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2565 list(self.instance.secondary_nodes))
2568 def CheckPrereq(self):
2569 """Check prerequisites.
2571 This checks that the instance is in the cluster.
2574 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2575 assert self.instance is not None, \
2576 "Cannot retrieve locked instance %s" % self.op.instance_name
2578 _CheckNodeOnline(self, instance.primary_node)
2580 bep = self.cfg.GetClusterInfo().FillBE(instance)
2581 # check bridges existance
2582 _CheckInstanceBridgesExist(self, instance)
2584 _CheckNodeFreeMemory(self, instance.primary_node,
2585 "starting instance %s" % instance.name,
2586 bep[constants.BE_MEMORY], instance.hypervisor)
2588 def Exec(self, feedback_fn):
2589 """Start the instance.
2592 instance = self.instance
2593 force = self.op.force
2594 extra_args = getattr(self.op, "extra_args", "")
2596 self.cfg.MarkInstanceUp(instance.name)
2598 node_current = instance.primary_node
2600 _StartInstanceDisks(self, instance, force)
2602 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2603 if result.failed or not result.data:
2604 _ShutdownInstanceDisks(self, instance)
2605 raise errors.OpExecError("Could not start instance")
2608 class LURebootInstance(LogicalUnit):
2609 """Reboot an instance.
2612 HPATH = "instance-reboot"
2613 HTYPE = constants.HTYPE_INSTANCE
2614 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2617 def ExpandNames(self):
2618 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2619 constants.INSTANCE_REBOOT_HARD,
2620 constants.INSTANCE_REBOOT_FULL]:
2621 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2622 (constants.INSTANCE_REBOOT_SOFT,
2623 constants.INSTANCE_REBOOT_HARD,
2624 constants.INSTANCE_REBOOT_FULL))
2625 self._ExpandAndLockInstance()
2627 def BuildHooksEnv(self):
2630 This runs on master, primary and secondary nodes of the instance.
2634 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2636 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2637 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2638 list(self.instance.secondary_nodes))
2641 def CheckPrereq(self):
2642 """Check prerequisites.
2644 This checks that the instance is in the cluster.
2647 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2648 assert self.instance is not None, \
2649 "Cannot retrieve locked instance %s" % self.op.instance_name
2651 _CheckNodeOnline(self, instance.primary_node)
2653 # check bridges existance
2654 _CheckInstanceBridgesExist(self, instance)
2656 def Exec(self, feedback_fn):
2657 """Reboot the instance.
2660 instance = self.instance
2661 ignore_secondaries = self.op.ignore_secondaries
2662 reboot_type = self.op.reboot_type
2663 extra_args = getattr(self.op, "extra_args", "")
2665 node_current = instance.primary_node
2667 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2668 constants.INSTANCE_REBOOT_HARD]:
2669 result = self.rpc.call_instance_reboot(node_current, instance,
2670 reboot_type, extra_args)
2671 if result.failed or not result.data:
2672 raise errors.OpExecError("Could not reboot instance")
2674 if not self.rpc.call_instance_shutdown(node_current, instance):
2675 raise errors.OpExecError("could not shutdown instance for full reboot")
2676 _ShutdownInstanceDisks(self, instance)
2677 _StartInstanceDisks(self, instance, ignore_secondaries)
2678 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2679 if result.failed or not result.data:
2680 _ShutdownInstanceDisks(self, instance)
2681 raise errors.OpExecError("Could not start instance for full reboot")
2683 self.cfg.MarkInstanceUp(instance.name)
2686 class LUShutdownInstance(LogicalUnit):
2687 """Shutdown an instance.
2690 HPATH = "instance-stop"
2691 HTYPE = constants.HTYPE_INSTANCE
2692 _OP_REQP = ["instance_name"]
2695 def ExpandNames(self):
2696 self._ExpandAndLockInstance()
2698 def BuildHooksEnv(self):
2701 This runs on master, primary and secondary nodes of the instance.
2704 env = _BuildInstanceHookEnvByObject(self, self.instance)
2705 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2706 list(self.instance.secondary_nodes))
2709 def CheckPrereq(self):
2710 """Check prerequisites.
2712 This checks that the instance is in the cluster.
2715 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716 assert self.instance is not None, \
2717 "Cannot retrieve locked instance %s" % self.op.instance_name
2718 _CheckNodeOnline(self, self.instance.primary_node)
2720 def Exec(self, feedback_fn):
2721 """Shutdown the instance.
2724 instance = self.instance
2725 node_current = instance.primary_node
2726 self.cfg.MarkInstanceDown(instance.name)
2727 result = self.rpc.call_instance_shutdown(node_current, instance)
2728 if result.failed or not result.data:
2729 self.proc.LogWarning("Could not shutdown instance")
2731 _ShutdownInstanceDisks(self, instance)
2734 class LUReinstallInstance(LogicalUnit):
2735 """Reinstall an instance.
2738 HPATH = "instance-reinstall"
2739 HTYPE = constants.HTYPE_INSTANCE
2740 _OP_REQP = ["instance_name"]
2743 def ExpandNames(self):
2744 self._ExpandAndLockInstance()
2746 def BuildHooksEnv(self):
2749 This runs on master, primary and secondary nodes of the instance.
2752 env = _BuildInstanceHookEnvByObject(self, self.instance)
2753 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2754 list(self.instance.secondary_nodes))
2757 def CheckPrereq(self):
2758 """Check prerequisites.
2760 This checks that the instance is in the cluster and is not running.
2763 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2764 assert instance is not None, \
2765 "Cannot retrieve locked instance %s" % self.op.instance_name
2766 _CheckNodeOnline(self, instance.primary_node)
2768 if instance.disk_template == constants.DT_DISKLESS:
2769 raise errors.OpPrereqError("Instance '%s' has no disks" %
2770 self.op.instance_name)
2771 if instance.status != "down":
2772 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2773 self.op.instance_name)
2774 remote_info = self.rpc.call_instance_info(instance.primary_node,
2776 instance.hypervisor)
2777 if remote_info.failed or remote_info.data:
2778 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2779 (self.op.instance_name,
2780 instance.primary_node))
2782 self.op.os_type = getattr(self.op, "os_type", None)
2783 if self.op.os_type is not None:
2785 pnode = self.cfg.GetNodeInfo(
2786 self.cfg.ExpandNodeName(instance.primary_node))
2788 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2790 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2792 if not isinstance(result.data, objects.OS):
2793 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2794 " primary node" % self.op.os_type)
2796 self.instance = instance
2798 def Exec(self, feedback_fn):
2799 """Reinstall the instance.
2802 inst = self.instance
2804 if self.op.os_type is not None:
2805 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2806 inst.os = self.op.os_type
2807 self.cfg.Update(inst)
2809 _StartInstanceDisks(self, inst, None)
2811 feedback_fn("Running the instance OS create scripts...")
2812 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2815 raise errors.OpExecError("Could not install OS for instance %s"
2817 (inst.name, inst.primary_node))
2819 _ShutdownInstanceDisks(self, inst)
2822 class LURenameInstance(LogicalUnit):
2823 """Rename an instance.
2826 HPATH = "instance-rename"
2827 HTYPE = constants.HTYPE_INSTANCE
2828 _OP_REQP = ["instance_name", "new_name"]
2830 def BuildHooksEnv(self):
2833 This runs on master, primary and secondary nodes of the instance.
2836 env = _BuildInstanceHookEnvByObject(self, self.instance)
2837 env["INSTANCE_NEW_NAME"] = self.op.new_name
2838 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2839 list(self.instance.secondary_nodes))
2842 def CheckPrereq(self):
2843 """Check prerequisites.
2845 This checks that the instance is in the cluster and is not running.
2848 instance = self.cfg.GetInstanceInfo(
2849 self.cfg.ExpandInstanceName(self.op.instance_name))
2850 if instance is None:
2851 raise errors.OpPrereqError("Instance '%s' not known" %
2852 self.op.instance_name)
2853 _CheckNodeOnline(self, instance.primary_node)
2855 if instance.status != "down":
2856 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2857 self.op.instance_name)
2858 remote_info = self.rpc.call_instance_info(instance.primary_node,
2860 instance.hypervisor)
2862 if remote_info.data:
2863 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2864 (self.op.instance_name,
2865 instance.primary_node))
2866 self.instance = instance
2868 # new name verification
2869 name_info = utils.HostInfo(self.op.new_name)
2871 self.op.new_name = new_name = name_info.name
2872 instance_list = self.cfg.GetInstanceList()
2873 if new_name in instance_list:
2874 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2877 if not getattr(self.op, "ignore_ip", False):
2878 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2879 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2880 (name_info.ip, new_name))
2883 def Exec(self, feedback_fn):
2884 """Reinstall the instance.
2887 inst = self.instance
2888 old_name = inst.name
2890 if inst.disk_template == constants.DT_FILE:
2891 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2893 self.cfg.RenameInstance(inst.name, self.op.new_name)
2894 # Change the instance lock. This is definitely safe while we hold the BGL
2895 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2896 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2898 # re-read the instance from the configuration after rename
2899 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2901 if inst.disk_template == constants.DT_FILE:
2902 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2903 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2904 old_file_storage_dir,
2905 new_file_storage_dir)
2908 raise errors.OpExecError("Could not connect to node '%s' to rename"
2909 " directory '%s' to '%s' (but the instance"
2910 " has been renamed in Ganeti)" % (
2911 inst.primary_node, old_file_storage_dir,
2912 new_file_storage_dir))
2914 if not result.data[0]:
2915 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2916 " (but the instance has been renamed in"
2917 " Ganeti)" % (old_file_storage_dir,
2918 new_file_storage_dir))
2920 _StartInstanceDisks(self, inst, None)
2922 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2924 if result.failed or not result.data:
2925 msg = ("Could not run OS rename script for instance %s on node %s"
2926 " (but the instance has been renamed in Ganeti)" %
2927 (inst.name, inst.primary_node))
2928 self.proc.LogWarning(msg)
2930 _ShutdownInstanceDisks(self, inst)
2933 class LURemoveInstance(LogicalUnit):
2934 """Remove an instance.
2937 HPATH = "instance-remove"
2938 HTYPE = constants.HTYPE_INSTANCE
2939 _OP_REQP = ["instance_name", "ignore_failures"]
2942 def ExpandNames(self):
2943 self._ExpandAndLockInstance()
2944 self.needed_locks[locking.LEVEL_NODE] = []
2945 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2947 def DeclareLocks(self, level):
2948 if level == locking.LEVEL_NODE:
2949 self._LockInstancesNodes()
2951 def BuildHooksEnv(self):
2954 This runs on master, primary and secondary nodes of the instance.
2957 env = _BuildInstanceHookEnvByObject(self, self.instance)
2958 nl = [self.cfg.GetMasterNode()]
2961 def CheckPrereq(self):
2962 """Check prerequisites.
2964 This checks that the instance is in the cluster.
2967 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2968 assert self.instance is not None, \
2969 "Cannot retrieve locked instance %s" % self.op.instance_name
2971 def Exec(self, feedback_fn):
2972 """Remove the instance.
2975 instance = self.instance
2976 logging.info("Shutting down instance %s on node %s",
2977 instance.name, instance.primary_node)
2979 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2980 if result.failed or not result.data:
2981 if self.op.ignore_failures:
2982 feedback_fn("Warning: can't shutdown instance")
2984 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2985 (instance.name, instance.primary_node))
2987 logging.info("Removing block devices for instance %s", instance.name)
2989 if not _RemoveDisks(self, instance):
2990 if self.op.ignore_failures:
2991 feedback_fn("Warning: can't remove instance's disks")
2993 raise errors.OpExecError("Can't remove instance's disks")
2995 logging.info("Removing instance %s out of cluster config", instance.name)
2997 self.cfg.RemoveInstance(instance.name)
2998 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3001 class LUQueryInstances(NoHooksLU):
3002 """Logical unit for querying instances.
3005 _OP_REQP = ["output_fields", "names"]
3007 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3008 "admin_state", "admin_ram",
3009 "disk_template", "ip", "mac", "bridge",
3010 "sda_size", "sdb_size", "vcpus", "tags",
3011 "network_port", "beparams",
3012 "(disk).(size)/([0-9]+)",
3014 "(nic).(mac|ip|bridge)/([0-9]+)",
3015 "(nic).(macs|ips|bridges)",
3016 "(disk|nic).(count)",
3017 "serial_no", "hypervisor", "hvparams",] +
3019 for name in constants.HVS_PARAMETERS] +
3021 for name in constants.BES_PARAMETERS])
3022 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3025 def ExpandNames(self):
3026 _CheckOutputFields(static=self._FIELDS_STATIC,
3027 dynamic=self._FIELDS_DYNAMIC,
3028 selected=self.op.output_fields)
3030 self.needed_locks = {}
3031 self.share_locks[locking.LEVEL_INSTANCE] = 1
3032 self.share_locks[locking.LEVEL_NODE] = 1
3035 self.wanted = _GetWantedInstances(self, self.op.names)
3037 self.wanted = locking.ALL_SET
3039 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3041 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3042 self.needed_locks[locking.LEVEL_NODE] = []
3043 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3045 def DeclareLocks(self, level):
3046 if level == locking.LEVEL_NODE and self.do_locking:
3047 self._LockInstancesNodes()
3049 def CheckPrereq(self):
3050 """Check prerequisites.
3055 def Exec(self, feedback_fn):
3056 """Computes the list of nodes and their attributes.
3059 all_info = self.cfg.GetAllInstancesInfo()
3061 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3062 elif self.wanted != locking.ALL_SET:
3063 instance_names = self.wanted
3064 missing = set(instance_names).difference(all_info.keys())
3066 raise errors.OpExecError(
3067 "Some instances were removed before retrieving their data: %s"
3070 instance_names = all_info.keys()
3072 instance_names = utils.NiceSort(instance_names)
3073 instance_list = [all_info[iname] for iname in instance_names]
3075 # begin data gathering
3077 nodes = frozenset([inst.primary_node for inst in instance_list])
3078 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3084 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3086 result = node_data[name]
3088 # offline nodes will be in both lists
3089 off_nodes.append(name)
3091 bad_nodes.append(name)
3094 live_data.update(result.data)
3095 # else no instance is alive
3097 live_data = dict([(name, {}) for name in instance_names])
3099 # end data gathering
3104 for instance in instance_list:
3106 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3107 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3108 for field in self.op.output_fields:
3109 st_match = self._FIELDS_STATIC.Matches(field)
3114 elif field == "pnode":
3115 val = instance.primary_node
3116 elif field == "snodes":
3117 val = list(instance.secondary_nodes)
3118 elif field == "admin_state":
3119 val = (instance.status != "down")
3120 elif field == "oper_state":
3121 if instance.primary_node in bad_nodes:
3124 val = bool(live_data.get(instance.name))
3125 elif field == "status":
3126 if instance.primary_node in off_nodes:
3127 val = "ERROR_nodeoffline"
3128 elif instance.primary_node in bad_nodes:
3129 val = "ERROR_nodedown"
3131 running = bool(live_data.get(instance.name))
3133 if instance.status != "down":
3138 if instance.status != "down":
3142 elif field == "oper_ram":
3143 if instance.primary_node in bad_nodes:
3145 elif instance.name in live_data:
3146 val = live_data[instance.name].get("memory", "?")
3149 elif field == "disk_template":
3150 val = instance.disk_template
3152 val = instance.nics[0].ip
3153 elif field == "bridge":
3154 val = instance.nics[0].bridge
3155 elif field == "mac":
3156 val = instance.nics[0].mac
3157 elif field == "sda_size" or field == "sdb_size":
3158 idx = ord(field[2]) - ord('a')
3160 val = instance.FindDisk(idx).size
3161 except errors.OpPrereqError:
3163 elif field == "tags":
3164 val = list(instance.GetTags())
3165 elif field == "serial_no":
3166 val = instance.serial_no
3167 elif field == "network_port":
3168 val = instance.network_port
3169 elif field == "hypervisor":
3170 val = instance.hypervisor
3171 elif field == "hvparams":
3173 elif (field.startswith(HVPREFIX) and
3174 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3175 val = i_hv.get(field[len(HVPREFIX):], None)
3176 elif field == "beparams":
3178 elif (field.startswith(BEPREFIX) and
3179 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3180 val = i_be.get(field[len(BEPREFIX):], None)
3181 elif st_match and st_match.groups():
3182 # matches a variable list
3183 st_groups = st_match.groups()
3184 if st_groups and st_groups[0] == "disk":
3185 if st_groups[1] == "count":
3186 val = len(instance.disks)
3187 elif st_groups[1] == "sizes":
3188 val = [disk.size for disk in instance.disks]
3189 elif st_groups[1] == "size":
3191 val = instance.FindDisk(st_groups[2]).size
3192 except errors.OpPrereqError:
3195 assert False, "Unhandled disk parameter"
3196 elif st_groups[0] == "nic":
3197 if st_groups[1] == "count":
3198 val = len(instance.nics)
3199 elif st_groups[1] == "macs":
3200 val = [nic.mac for nic in instance.nics]
3201 elif st_groups[1] == "ips":
3202 val = [nic.ip for nic in instance.nics]
3203 elif st_groups[1] == "bridges":
3204 val = [nic.bridge for nic in instance.nics]
3207 nic_idx = int(st_groups[2])
3208 if nic_idx >= len(instance.nics):
3211 if st_groups[1] == "mac":
3212 val = instance.nics[nic_idx].mac
3213 elif st_groups[1] == "ip":
3214 val = instance.nics[nic_idx].ip
3215 elif st_groups[1] == "bridge":
3216 val = instance.nics[nic_idx].bridge
3218 assert False, "Unhandled NIC parameter"
3220 assert False, "Unhandled variable parameter"
3222 raise errors.ParameterError(field)
3229 class LUFailoverInstance(LogicalUnit):
3230 """Failover an instance.
3233 HPATH = "instance-failover"
3234 HTYPE = constants.HTYPE_INSTANCE
3235 _OP_REQP = ["instance_name", "ignore_consistency"]
3238 def ExpandNames(self):
3239 self._ExpandAndLockInstance()
3240 self.needed_locks[locking.LEVEL_NODE] = []
3241 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3243 def DeclareLocks(self, level):
3244 if level == locking.LEVEL_NODE:
3245 self._LockInstancesNodes()
3247 def BuildHooksEnv(self):
3250 This runs on master, primary and secondary nodes of the instance.
3254 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3256 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3257 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3260 def CheckPrereq(self):
3261 """Check prerequisites.
3263 This checks that the instance is in the cluster.
3266 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3267 assert self.instance is not None, \
3268 "Cannot retrieve locked instance %s" % self.op.instance_name
3270 bep = self.cfg.GetClusterInfo().FillBE(instance)
3271 if instance.disk_template not in constants.DTS_NET_MIRROR:
3272 raise errors.OpPrereqError("Instance's disk layout is not"
3273 " network mirrored, cannot failover.")
3275 secondary_nodes = instance.secondary_nodes
3276 if not secondary_nodes:
3277 raise errors.ProgrammerError("no secondary node but using "
3278 "a mirrored disk template")
3280 target_node = secondary_nodes[0]
3281 _CheckNodeOnline(self, target_node)
3282 # check memory requirements on the secondary node
3283 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3284 instance.name, bep[constants.BE_MEMORY],
3285 instance.hypervisor)
3287 # check bridge existance
3288 brlist = [nic.bridge for nic in instance.nics]
3289 result = self.rpc.call_bridges_exist(target_node, brlist)
3292 raise errors.OpPrereqError("One or more target bridges %s does not"
3293 " exist on destination node '%s'" %
3294 (brlist, target_node))
3296 def Exec(self, feedback_fn):
3297 """Failover an instance.
3299 The failover is done by shutting it down on its present node and
3300 starting it on the secondary.
3303 instance = self.instance
3305 source_node = instance.primary_node
3306 target_node = instance.secondary_nodes[0]
3308 feedback_fn("* checking disk consistency between source and target")
3309 for dev in instance.disks:
3310 # for drbd, these are drbd over lvm
3311 if not _CheckDiskConsistency(self, dev, target_node, False):
3312 if instance.status == "up" and not self.op.ignore_consistency:
3313 raise errors.OpExecError("Disk %s is degraded on target node,"
3314 " aborting failover." % dev.iv_name)
3316 feedback_fn("* shutting down instance on source node")
3317 logging.info("Shutting down instance %s on node %s",
3318 instance.name, source_node)
3320 result = self.rpc.call_instance_shutdown(source_node, instance)
3321 if result.failed or not result.data:
3322 if self.op.ignore_consistency:
3323 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3325 " anyway. Please make sure node %s is down",
3326 instance.name, source_node, source_node)
3328 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3329 (instance.name, source_node))
3331 feedback_fn("* deactivating the instance's disks on source node")
3332 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3333 raise errors.OpExecError("Can't shut down the instance's disks.")
3335 instance.primary_node = target_node
3336 # distribute new instance config to the other nodes
3337 self.cfg.Update(instance)
3339 # Only start the instance if it's marked as up
3340 if instance.status == "up":
3341 feedback_fn("* activating the instance's disks on target node")
3342 logging.info("Starting instance %s on node %s",
3343 instance.name, target_node)
3345 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3346 ignore_secondaries=True)
3348 _ShutdownInstanceDisks(self, instance)
3349 raise errors.OpExecError("Can't activate the instance's disks")
3351 feedback_fn("* starting the instance on the target node")
3352 result = self.rpc.call_instance_start(target_node, instance, None)
3353 if result.failed or not result.data:
3354 _ShutdownInstanceDisks(self, instance)
3355 raise errors.OpExecError("Could not start instance %s on node %s." %
3356 (instance.name, target_node))
3359 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3360 """Create a tree of block devices on the primary node.
3362 This always creates all devices.
3366 for child in device.children:
3367 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3370 lu.cfg.SetDiskID(device, node)
3371 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3372 instance.name, True, info)
3373 if new_id.failed or not new_id.data:
3375 if device.physical_id is None:
3376 device.physical_id = new_id
3380 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3381 """Create a tree of block devices on a secondary node.
3383 If this device type has to be created on secondaries, create it and
3386 If not, just recurse to children keeping the same 'force' value.
3389 if device.CreateOnSecondary():
3392 for child in device.children:
3393 if not _CreateBlockDevOnSecondary(lu, node, instance,
3394 child, force, info):
3399 lu.cfg.SetDiskID(device, node)
3400 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3401 instance.name, False, info)
3402 if new_id.failed or not new_id.data:
3404 if device.physical_id is None:
3405 device.physical_id = new_id
3409 def _GenerateUniqueNames(lu, exts):
3410 """Generate a suitable LV name.
3412 This will generate a logical volume name for the given instance.
3417 new_id = lu.cfg.GenerateUniqueID()
3418 results.append("%s%s" % (new_id, val))
3422 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3424 """Generate a drbd8 device complete with its children.
3427 port = lu.cfg.AllocatePort()
3428 vgname = lu.cfg.GetVGName()
3429 shared_secret = lu.cfg.GenerateDRBDSecret()
3430 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3431 logical_id=(vgname, names[0]))
3432 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3433 logical_id=(vgname, names[1]))
3434 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3435 logical_id=(primary, secondary, port,
3438 children=[dev_data, dev_meta],
3443 def _GenerateDiskTemplate(lu, template_name,
3444 instance_name, primary_node,
3445 secondary_nodes, disk_info,
3446 file_storage_dir, file_driver,
3448 """Generate the entire disk layout for a given template type.
3451 #TODO: compute space requirements
3453 vgname = lu.cfg.GetVGName()
3454 disk_count = len(disk_info)
3456 if template_name == constants.DT_DISKLESS:
3458 elif template_name == constants.DT_PLAIN:
3459 if len(secondary_nodes) != 0:
3460 raise errors.ProgrammerError("Wrong template configuration")
3462 names = _GenerateUniqueNames(lu, [".disk%d" % i
3463 for i in range(disk_count)])
3464 for idx, disk in enumerate(disk_info):
3465 disk_index = idx + base_index
3466 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3467 logical_id=(vgname, names[idx]),
3468 iv_name="disk/%d" % disk_index)
3469 disks.append(disk_dev)
3470 elif template_name == constants.DT_DRBD8:
3471 if len(secondary_nodes) != 1:
3472 raise errors.ProgrammerError("Wrong template configuration")
3473 remote_node = secondary_nodes[0]
3474 minors = lu.cfg.AllocateDRBDMinor(
3475 [primary_node, remote_node] * len(disk_info), instance_name)
3477 names = _GenerateUniqueNames(lu,
3478 [".disk%d_%s" % (i, s)
3479 for i in range(disk_count)
3480 for s in ("data", "meta")
3482 for idx, disk in enumerate(disk_info):
3483 disk_index = idx + base_index
3484 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3485 disk["size"], names[idx*2:idx*2+2],
3486 "disk/%d" % disk_index,
3487 minors[idx*2], minors[idx*2+1])
3488 disks.append(disk_dev)
3489 elif template_name == constants.DT_FILE:
3490 if len(secondary_nodes) != 0:
3491 raise errors.ProgrammerError("Wrong template configuration")
3493 for idx, disk in enumerate(disk_info):
3494 disk_index = idx + base_index
3495 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3496 iv_name="disk/%d" % disk_index,
3497 logical_id=(file_driver,
3498 "%s/disk%d" % (file_storage_dir,
3500 disks.append(disk_dev)
3502 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3506 def _GetInstanceInfoText(instance):
3507 """Compute that text that should be added to the disk's metadata.
3510 return "originstname+%s" % instance.name
3513 def _CreateDisks(lu, instance):
3514 """Create all disks for an instance.
3516 This abstracts away some work from AddInstance.
3518 @type lu: L{LogicalUnit}
3519 @param lu: the logical unit on whose behalf we execute
3520 @type instance: L{objects.Instance}
3521 @param instance: the instance whose disks we should create
3523 @return: the success of the creation
3526 info = _GetInstanceInfoText(instance)
3528 if instance.disk_template == constants.DT_FILE:
3529 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3530 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3533 if result.failed or not result.data:
3534 logging.error("Could not connect to node '%s'", instance.primary_node)
3537 if not result.data[0]:
3538 logging.error("Failed to create directory '%s'", file_storage_dir)
3541 # Note: this needs to be kept in sync with adding of disks in
3542 # LUSetInstanceParams
3543 for device in instance.disks:
3544 logging.info("Creating volume %s for instance %s",
3545 device.iv_name, instance.name)
3547 for secondary_node in instance.secondary_nodes:
3548 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3549 device, False, info):
3550 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3551 device.iv_name, device, secondary_node)
3554 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3555 instance, device, info):
3556 logging.error("Failed to create volume %s on primary!", device.iv_name)
3562 def _RemoveDisks(lu, instance):
3563 """Remove all disks for an instance.
3565 This abstracts away some work from `AddInstance()` and
3566 `RemoveInstance()`. Note that in case some of the devices couldn't
3567 be removed, the removal will continue with the other ones (compare
3568 with `_CreateDisks()`).
3570 @type lu: L{LogicalUnit}
3571 @param lu: the logical unit on whose behalf we execute
3572 @type instance: L{objects.Instance}
3573 @param instance: the instance whose disks we should remove
3575 @return: the success of the removal
3578 logging.info("Removing block devices for instance %s", instance.name)
3581 for device in instance.disks:
3582 for node, disk in device.ComputeNodeTree(instance.primary_node):
3583 lu.cfg.SetDiskID(disk, node)
3584 result = lu.rpc.call_blockdev_remove(node, disk)
3585 if result.failed or not result.data:
3586 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3587 " continuing anyway", device.iv_name, node)
3590 if instance.disk_template == constants.DT_FILE:
3591 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3592 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3594 if result.failed or not result.data:
3595 logging.error("Could not remove directory '%s'", file_storage_dir)
3601 def _ComputeDiskSize(disk_template, disks):
3602 """Compute disk size requirements in the volume group
3605 # Required free disk space as a function of disk and swap space
3607 constants.DT_DISKLESS: None,
3608 constants.DT_PLAIN: sum(d["size"] for d in disks),
3609 # 128 MB are added for drbd metadata for each disk
3610 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3611 constants.DT_FILE: None,
3614 if disk_template not in req_size_dict:
3615 raise errors.ProgrammerError("Disk template '%s' size requirement"
3616 " is unknown" % disk_template)
3618 return req_size_dict[disk_template]
3621 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3622 """Hypervisor parameter validation.
3624 This function abstract the hypervisor parameter validation to be
3625 used in both instance create and instance modify.
3627 @type lu: L{LogicalUnit}
3628 @param lu: the logical unit for which we check
3629 @type nodenames: list
3630 @param nodenames: the list of nodes on which we should check
3631 @type hvname: string
3632 @param hvname: the name of the hypervisor we should use
3633 @type hvparams: dict
3634 @param hvparams: the parameters which we need to check
3635 @raise errors.OpPrereqError: if the parameters are not valid
3638 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3641 for node in nodenames:
3644 if not info.data or not isinstance(info.data, (tuple, list)):
3645 raise errors.OpPrereqError("Cannot get current information"
3646 " from node '%s' (%s)" % (node, info.data))
3647 if not info.data[0]:
3648 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3649 " %s" % info.data[1])
3652 class LUCreateInstance(LogicalUnit):
3653 """Create an instance.
3656 HPATH = "instance-add"
3657 HTYPE = constants.HTYPE_INSTANCE
3658 _OP_REQP = ["instance_name", "disks", "disk_template",
3660 "wait_for_sync", "ip_check", "nics",
3661 "hvparams", "beparams"]
3664 def _ExpandNode(self, node):
3665 """Expands and checks one node name.
3668 node_full = self.cfg.ExpandNodeName(node)
3669 if node_full is None:
3670 raise errors.OpPrereqError("Unknown node %s" % node)
3673 def ExpandNames(self):
3674 """ExpandNames for CreateInstance.
3676 Figure out the right locks for instance creation.
3679 self.needed_locks = {}
3681 # set optional parameters to none if they don't exist
3682 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3683 if not hasattr(self.op, attr):
3684 setattr(self.op, attr, None)
3686 # cheap checks, mostly valid constants given
3688 # verify creation mode
3689 if self.op.mode not in (constants.INSTANCE_CREATE,
3690 constants.INSTANCE_IMPORT):
3691 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3694 # disk template and mirror node verification
3695 if self.op.disk_template not in constants.DISK_TEMPLATES:
3696 raise errors.OpPrereqError("Invalid disk template name")
3698 if self.op.hypervisor is None:
3699 self.op.hypervisor = self.cfg.GetHypervisorType()
3701 cluster = self.cfg.GetClusterInfo()
3702 enabled_hvs = cluster.enabled_hypervisors
3703 if self.op.hypervisor not in enabled_hvs:
3704 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3705 " cluster (%s)" % (self.op.hypervisor,
3706 ",".join(enabled_hvs)))
3708 # check hypervisor parameter syntax (locally)
3710 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3712 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3713 hv_type.CheckParameterSyntax(filled_hvp)
3715 # fill and remember the beparams dict
3716 utils.CheckBEParams(self.op.beparams)
3717 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3720 #### instance parameters check
3722 # instance name verification
3723 hostname1 = utils.HostInfo(self.op.instance_name)
3724 self.op.instance_name = instance_name = hostname1.name
3726 # this is just a preventive check, but someone might still add this
3727 # instance in the meantime, and creation will fail at lock-add time
3728 if instance_name in self.cfg.GetInstanceList():
3729 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3732 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3736 for nic in self.op.nics:
3737 # ip validity checks
3738 ip = nic.get("ip", None)
3739 if ip is None or ip.lower() == "none":
3741 elif ip.lower() == constants.VALUE_AUTO:
3742 nic_ip = hostname1.ip
3744 if not utils.IsValidIP(ip):
3745 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3746 " like a valid IP" % ip)
3749 # MAC address verification
3750 mac = nic.get("mac", constants.VALUE_AUTO)
3751 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3752 if not utils.IsValidMac(mac.lower()):
3753 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3755 # bridge verification
3756 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3757 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3759 # disk checks/pre-build
3761 for disk in self.op.disks:
3762 mode = disk.get("mode", constants.DISK_RDWR)
3763 if mode not in constants.DISK_ACCESS_SET:
3764 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3766 size = disk.get("size", None)
3768 raise errors.OpPrereqError("Missing disk size")
3772 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3773 self.disks.append({"size": size, "mode": mode})
3775 # used in CheckPrereq for ip ping check
3776 self.check_ip = hostname1.ip
3778 # file storage checks
3779 if (self.op.file_driver and
3780 not self.op.file_driver in constants.FILE_DRIVER):
3781 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3782 self.op.file_driver)
3784 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3785 raise errors.OpPrereqError("File storage directory path not absolute")
3787 ### Node/iallocator related checks
3788 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3789 raise errors.OpPrereqError("One and only one of iallocator and primary"
3790 " node must be given")
3792 if self.op.iallocator:
3793 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3795 self.op.pnode = self._ExpandNode(self.op.pnode)
3796 nodelist = [self.op.pnode]
3797 if self.op.snode is not None:
3798 self.op.snode = self._ExpandNode(self.op.snode)
3799 nodelist.append(self.op.snode)
3800 self.needed_locks[locking.LEVEL_NODE] = nodelist
3802 # in case of import lock the source node too
3803 if self.op.mode == constants.INSTANCE_IMPORT:
3804 src_node = getattr(self.op, "src_node", None)
3805 src_path = getattr(self.op, "src_path", None)
3807 if src_path is None:
3808 self.op.src_path = src_path = self.op.instance_name
3810 if src_node is None:
3811 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3812 self.op.src_node = None
3813 if os.path.isabs(src_path):
3814 raise errors.OpPrereqError("Importing an instance from an absolute"
3815 " path requires a source node option.")
3817 self.op.src_node = src_node = self._ExpandNode(src_node)
3818 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3819 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3820 if not os.path.isabs(src_path):
3821 self.op.src_path = src_path = \
3822 os.path.join(constants.EXPORT_DIR, src_path)
3824 else: # INSTANCE_CREATE
3825 if getattr(self.op, "os_type", None) is None:
3826 raise errors.OpPrereqError("No guest OS specified")
3828 def _RunAllocator(self):
3829 """Run the allocator based on input opcode.
3832 nics = [n.ToDict() for n in self.nics]
3833 ial = IAllocator(self,
3834 mode=constants.IALLOCATOR_MODE_ALLOC,
3835 name=self.op.instance_name,
3836 disk_template=self.op.disk_template,
3839 vcpus=self.be_full[constants.BE_VCPUS],
3840 mem_size=self.be_full[constants.BE_MEMORY],
3843 hypervisor=self.op.hypervisor,
3846 ial.Run(self.op.iallocator)
3849 raise errors.OpPrereqError("Can't compute nodes using"
3850 " iallocator '%s': %s" % (self.op.iallocator,
3852 if len(ial.nodes) != ial.required_nodes:
3853 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3854 " of nodes (%s), required %s" %
3855 (self.op.iallocator, len(ial.nodes),
3856 ial.required_nodes))
3857 self.op.pnode = ial.nodes[0]
3858 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3859 self.op.instance_name, self.op.iallocator,
3860 ", ".join(ial.nodes))
3861 if ial.required_nodes == 2:
3862 self.op.snode = ial.nodes[1]
3864 def BuildHooksEnv(self):
3867 This runs on master, primary and secondary nodes of the instance.
3871 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3872 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3873 "INSTANCE_ADD_MODE": self.op.mode,
3875 if self.op.mode == constants.INSTANCE_IMPORT:
3876 env["INSTANCE_SRC_NODE"] = self.op.src_node
3877 env["INSTANCE_SRC_PATH"] = self.op.src_path
3878 env["INSTANCE_SRC_IMAGES"] = self.src_images
3880 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3881 primary_node=self.op.pnode,
3882 secondary_nodes=self.secondaries,
3883 status=self.instance_status,
3884 os_type=self.op.os_type,
3885 memory=self.be_full[constants.BE_MEMORY],
3886 vcpus=self.be_full[constants.BE_VCPUS],
3887 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3890 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3895 def CheckPrereq(self):
3896 """Check prerequisites.
3899 if (not self.cfg.GetVGName() and
3900 self.op.disk_template not in constants.DTS_NOT_LVM):
3901 raise errors.OpPrereqError("Cluster does not support lvm-based"
3905 if self.op.mode == constants.INSTANCE_IMPORT:
3906 src_node = self.op.src_node
3907 src_path = self.op.src_path
3909 if src_node is None:
3910 exp_list = self.rpc.call_export_list(
3911 self.acquired_locks[locking.LEVEL_NODE])
3913 for node in exp_list:
3914 if not exp_list[node].failed and src_path in exp_list[node].data:
3916 self.op.src_node = src_node = node
3917 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3921 raise errors.OpPrereqError("No export found for relative path %s" %
3924 _CheckNodeOnline(self, src_node)
3925 result = self.rpc.call_export_info(src_node, src_path)
3928 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3930 export_info = result.data
3931 if not export_info.has_section(constants.INISECT_EXP):
3932 raise errors.ProgrammerError("Corrupted export config")
3934 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3935 if (int(ei_version) != constants.EXPORT_VERSION):
3936 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3937 (ei_version, constants.EXPORT_VERSION))
3939 # Check that the new instance doesn't have less disks than the export
3940 instance_disks = len(self.disks)
3941 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3942 if instance_disks < export_disks:
3943 raise errors.OpPrereqError("Not enough disks to import."
3944 " (instance: %d, export: %d)" %
3945 (instance_disks, export_disks))
3947 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3949 for idx in range(export_disks):
3950 option = 'disk%d_dump' % idx
3951 if export_info.has_option(constants.INISECT_INS, option):
3952 # FIXME: are the old os-es, disk sizes, etc. useful?
3953 export_name = export_info.get(constants.INISECT_INS, option)
3954 image = os.path.join(src_path, export_name)
3955 disk_images.append(image)
3957 disk_images.append(False)
3959 self.src_images = disk_images
3961 old_name = export_info.get(constants.INISECT_INS, 'name')
3962 # FIXME: int() here could throw a ValueError on broken exports
3963 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3964 if self.op.instance_name == old_name:
3965 for idx, nic in enumerate(self.nics):
3966 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3967 nic_mac_ini = 'nic%d_mac' % idx
3968 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3970 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3971 if self.op.start and not self.op.ip_check:
3972 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3973 " adding an instance in start mode")
3975 if self.op.ip_check:
3976 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3977 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3978 (self.check_ip, self.op.instance_name))
3982 if self.op.iallocator is not None:
3983 self._RunAllocator()
3985 #### node related checks
3987 # check primary node
3988 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3989 assert self.pnode is not None, \
3990 "Cannot retrieve locked node %s" % self.op.pnode
3992 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3995 self.secondaries = []
3997 # mirror node verification
3998 if self.op.disk_template in constants.DTS_NET_MIRROR:
3999 if self.op.snode is None:
4000 raise errors.OpPrereqError("The networked disk templates need"
4002 if self.op.snode == pnode.name:
4003 raise errors.OpPrereqError("The secondary node cannot be"
4004 " the primary node.")
4005 self.secondaries.append(self.op.snode)
4006 _CheckNodeOnline(self, self.op.snode)
4008 nodenames = [pnode.name] + self.secondaries
4010 req_size = _ComputeDiskSize(self.op.disk_template,
4013 # Check lv size requirements
4014 if req_size is not None:
4015 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4017 for node in nodenames:
4018 info = nodeinfo[node]
4022 raise errors.OpPrereqError("Cannot get current information"
4023 " from node '%s'" % node)
4024 vg_free = info.get('vg_free', None)
4025 if not isinstance(vg_free, int):
4026 raise errors.OpPrereqError("Can't compute free disk space on"
4028 if req_size > info['vg_free']:
4029 raise errors.OpPrereqError("Not enough disk space on target node %s."
4030 " %d MB available, %d MB required" %
4031 (node, info['vg_free'], req_size))
4033 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4036 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4038 if not isinstance(result.data, objects.OS):
4039 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4040 " primary node" % self.op.os_type)
4042 # bridge check on primary node
4043 bridges = [n.bridge for n in self.nics]
4044 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4047 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4048 " exist on destination node '%s'" %
4049 (",".join(bridges), pnode.name))
4051 # memory check on primary node
4053 _CheckNodeFreeMemory(self, self.pnode.name,
4054 "creating instance %s" % self.op.instance_name,
4055 self.be_full[constants.BE_MEMORY],
4059 self.instance_status = 'up'
4061 self.instance_status = 'down'
4063 def Exec(self, feedback_fn):
4064 """Create and add the instance to the cluster.
4067 instance = self.op.instance_name
4068 pnode_name = self.pnode.name
4070 for nic in self.nics:
4071 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4072 nic.mac = self.cfg.GenerateMAC()
4074 ht_kind = self.op.hypervisor
4075 if ht_kind in constants.HTS_REQ_PORT:
4076 network_port = self.cfg.AllocatePort()
4080 ##if self.op.vnc_bind_address is None:
4081 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4083 # this is needed because os.path.join does not accept None arguments
4084 if self.op.file_storage_dir is None:
4085 string_file_storage_dir = ""
4087 string_file_storage_dir = self.op.file_storage_dir
4089 # build the full file storage dir path
4090 file_storage_dir = os.path.normpath(os.path.join(
4091 self.cfg.GetFileStorageDir(),
4092 string_file_storage_dir, instance))
4095 disks = _GenerateDiskTemplate(self,
4096 self.op.disk_template,
4097 instance, pnode_name,
4101 self.op.file_driver,
4104 iobj = objects.Instance(name=instance, os=self.op.os_type,
4105 primary_node=pnode_name,
4106 nics=self.nics, disks=disks,
4107 disk_template=self.op.disk_template,
4108 status=self.instance_status,
4109 network_port=network_port,
4110 beparams=self.op.beparams,
4111 hvparams=self.op.hvparams,
4112 hypervisor=self.op.hypervisor,
4115 feedback_fn("* creating instance disks...")
4116 if not _CreateDisks(self, iobj):
4117 _RemoveDisks(self, iobj)
4118 self.cfg.ReleaseDRBDMinors(instance)
4119 raise errors.OpExecError("Device creation failed, reverting...")
4121 feedback_fn("adding instance %s to cluster config" % instance)
4123 self.cfg.AddInstance(iobj)
4124 # Declare that we don't want to remove the instance lock anymore, as we've
4125 # added the instance to the config
4126 del self.remove_locks[locking.LEVEL_INSTANCE]
4127 # Remove the temp. assignements for the instance's drbds
4128 self.cfg.ReleaseDRBDMinors(instance)
4129 # Unlock all the nodes
4130 if self.op.mode == constants.INSTANCE_IMPORT:
4131 nodes_keep = [self.op.src_node]
4132 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4133 if node != self.op.src_node]
4134 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4135 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4137 self.context.glm.release(locking.LEVEL_NODE)
4138 del self.acquired_locks[locking.LEVEL_NODE]
4140 if self.op.wait_for_sync:
4141 disk_abort = not _WaitForSync(self, iobj)
4142 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4143 # make sure the disks are not degraded (still sync-ing is ok)
4145 feedback_fn("* checking mirrors status")
4146 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4151 _RemoveDisks(self, iobj)
4152 self.cfg.RemoveInstance(iobj.name)
4153 # Make sure the instance lock gets removed
4154 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4155 raise errors.OpExecError("There are some degraded disks for"
4158 feedback_fn("creating os for instance %s on node %s" %
4159 (instance, pnode_name))
4161 if iobj.disk_template != constants.DT_DISKLESS:
4162 if self.op.mode == constants.INSTANCE_CREATE:
4163 feedback_fn("* running the instance OS create scripts...")
4164 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4167 raise errors.OpExecError("Could not add os for instance %s"
4169 (instance, pnode_name))
4171 elif self.op.mode == constants.INSTANCE_IMPORT:
4172 feedback_fn("* running the instance OS import scripts...")
4173 src_node = self.op.src_node
4174 src_images = self.src_images
4175 cluster_name = self.cfg.GetClusterName()
4176 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4177 src_node, src_images,
4179 import_result.Raise()
4180 for idx, result in enumerate(import_result.data):
4182 self.LogWarning("Could not import the image %s for instance"
4183 " %s, disk %d, on node %s" %
4184 (src_images[idx], instance, idx, pnode_name))
4186 # also checked in the prereq part
4187 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4191 logging.info("Starting instance %s on node %s", instance, pnode_name)
4192 feedback_fn("* starting instance...")
4193 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4196 raise errors.OpExecError("Could not start instance")
4199 class LUConnectConsole(NoHooksLU):
4200 """Connect to an instance's console.
4202 This is somewhat special in that it returns the command line that
4203 you need to run on the master node in order to connect to the
4207 _OP_REQP = ["instance_name"]
4210 def ExpandNames(self):
4211 self._ExpandAndLockInstance()
4213 def CheckPrereq(self):
4214 """Check prerequisites.
4216 This checks that the instance is in the cluster.
4219 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4220 assert self.instance is not None, \
4221 "Cannot retrieve locked instance %s" % self.op.instance_name
4222 _CheckNodeOnline(self, self.instance.primary_node)
4224 def Exec(self, feedback_fn):
4225 """Connect to the console of an instance
4228 instance = self.instance
4229 node = instance.primary_node
4231 node_insts = self.rpc.call_instance_list([node],
4232 [instance.hypervisor])[node]
4235 if instance.name not in node_insts.data:
4236 raise errors.OpExecError("Instance %s is not running." % instance.name)
4238 logging.debug("Connecting to console of %s on %s", instance.name, node)
4240 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4241 console_cmd = hyper.GetShellCommandForConsole(instance)
4244 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4247 class LUReplaceDisks(LogicalUnit):
4248 """Replace the disks of an instance.
4251 HPATH = "mirrors-replace"
4252 HTYPE = constants.HTYPE_INSTANCE
4253 _OP_REQP = ["instance_name", "mode", "disks"]
4256 def ExpandNames(self):
4257 self._ExpandAndLockInstance()
4259 if not hasattr(self.op, "remote_node"):
4260 self.op.remote_node = None
4262 ia_name = getattr(self.op, "iallocator", None)
4263 if ia_name is not None:
4264 if self.op.remote_node is not None:
4265 raise errors.OpPrereqError("Give either the iallocator or the new"
4266 " secondary, not both")
4267 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4268 elif self.op.remote_node is not None:
4269 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4270 if remote_node is None:
4271 raise errors.OpPrereqError("Node '%s' not known" %
4272 self.op.remote_node)
4273 self.op.remote_node = remote_node
4274 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4275 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4277 self.needed_locks[locking.LEVEL_NODE] = []
4278 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4280 def DeclareLocks(self, level):
4281 # If we're not already locking all nodes in the set we have to declare the
4282 # instance's primary/secondary nodes.
4283 if (level == locking.LEVEL_NODE and
4284 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4285 self._LockInstancesNodes()
4287 def _RunAllocator(self):
4288 """Compute a new secondary node using an IAllocator.
4291 ial = IAllocator(self,
4292 mode=constants.IALLOCATOR_MODE_RELOC,
4293 name=self.op.instance_name,
4294 relocate_from=[self.sec_node])
4296 ial.Run(self.op.iallocator)
4299 raise errors.OpPrereqError("Can't compute nodes using"
4300 " iallocator '%s': %s" % (self.op.iallocator,
4302 if len(ial.nodes) != ial.required_nodes:
4303 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4304 " of nodes (%s), required %s" %
4305 (len(ial.nodes), ial.required_nodes))
4306 self.op.remote_node = ial.nodes[0]
4307 self.LogInfo("Selected new secondary for the instance: %s",
4308 self.op.remote_node)
4310 def BuildHooksEnv(self):
4313 This runs on the master, the primary and all the secondaries.
4317 "MODE": self.op.mode,
4318 "NEW_SECONDARY": self.op.remote_node,
4319 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4321 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4323 self.cfg.GetMasterNode(),
4324 self.instance.primary_node,
4326 if self.op.remote_node is not None:
4327 nl.append(self.op.remote_node)
4330 def CheckPrereq(self):
4331 """Check prerequisites.
4333 This checks that the instance is in the cluster.
4336 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4337 assert instance is not None, \
4338 "Cannot retrieve locked instance %s" % self.op.instance_name
4339 self.instance = instance
4341 if instance.disk_template not in constants.DTS_NET_MIRROR:
4342 raise errors.OpPrereqError("Instance's disk layout is not"
4343 " network mirrored.")
4345 if len(instance.secondary_nodes) != 1:
4346 raise errors.OpPrereqError("The instance has a strange layout,"
4347 " expected one secondary but found %d" %
4348 len(instance.secondary_nodes))
4350 self.sec_node = instance.secondary_nodes[0]
4352 ia_name = getattr(self.op, "iallocator", None)
4353 if ia_name is not None:
4354 self._RunAllocator()
4356 remote_node = self.op.remote_node
4357 if remote_node is not None:
4358 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4359 assert self.remote_node_info is not None, \
4360 "Cannot retrieve locked node %s" % remote_node
4362 self.remote_node_info = None
4363 if remote_node == instance.primary_node:
4364 raise errors.OpPrereqError("The specified node is the primary node of"
4366 elif remote_node == self.sec_node:
4367 if self.op.mode == constants.REPLACE_DISK_SEC:
4368 # this is for DRBD8, where we can't execute the same mode of
4369 # replacement as for drbd7 (no different port allocated)
4370 raise errors.OpPrereqError("Same secondary given, cannot execute"
4372 if instance.disk_template == constants.DT_DRBD8:
4373 if (self.op.mode == constants.REPLACE_DISK_ALL and
4374 remote_node is not None):
4375 # switch to replace secondary mode
4376 self.op.mode = constants.REPLACE_DISK_SEC
4378 if self.op.mode == constants.REPLACE_DISK_ALL:
4379 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4380 " secondary disk replacement, not"
4382 elif self.op.mode == constants.REPLACE_DISK_PRI:
4383 if remote_node is not None:
4384 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4385 " the secondary while doing a primary"
4386 " node disk replacement")
4387 self.tgt_node = instance.primary_node
4388 self.oth_node = instance.secondary_nodes[0]
4389 _CheckNodeOnline(self, self.tgt_node)
4390 _CheckNodeOnline(self, self.oth_node)
4391 elif self.op.mode == constants.REPLACE_DISK_SEC:
4392 self.new_node = remote_node # this can be None, in which case
4393 # we don't change the secondary
4394 self.tgt_node = instance.secondary_nodes[0]
4395 self.oth_node = instance.primary_node
4396 _CheckNodeOnline(self, self.oth_node)
4397 if self.new_node is not None:
4398 _CheckNodeOnline(self, self.new_node)
4400 _CheckNodeOnline(self, self.tgt_node)
4402 raise errors.ProgrammerError("Unhandled disk replace mode")
4404 if not self.op.disks:
4405 self.op.disks = range(len(instance.disks))
4407 for disk_idx in self.op.disks:
4408 instance.FindDisk(disk_idx)
4410 def _ExecD8DiskOnly(self, feedback_fn):
4411 """Replace a disk on the primary or secondary for dbrd8.
4413 The algorithm for replace is quite complicated:
4415 1. for each disk to be replaced:
4417 1. create new LVs on the target node with unique names
4418 1. detach old LVs from the drbd device
4419 1. rename old LVs to name_replaced.<time_t>
4420 1. rename new LVs to old LVs
4421 1. attach the new LVs (with the old names now) to the drbd device
4423 1. wait for sync across all devices
4425 1. for each modified disk:
4427 1. remove old LVs (which have the name name_replaces.<time_t>)
4429 Failures are not very well handled.
4433 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4434 instance = self.instance
4436 vgname = self.cfg.GetVGName()
4439 tgt_node = self.tgt_node
4440 oth_node = self.oth_node
4442 # Step: check device activation
4443 self.proc.LogStep(1, steps_total, "check device existence")
4444 info("checking volume groups")
4445 my_vg = cfg.GetVGName()
4446 results = self.rpc.call_vg_list([oth_node, tgt_node])
4448 raise errors.OpExecError("Can't list volume groups on the nodes")
4449 for node in oth_node, tgt_node:
4451 if res.failed or not res.data or my_vg not in res.data:
4452 raise errors.OpExecError("Volume group '%s' not found on %s" %
4454 for idx, dev in enumerate(instance.disks):
4455 if idx not in self.op.disks:
4457 for node in tgt_node, oth_node:
4458 info("checking disk/%d on %s" % (idx, node))
4459 cfg.SetDiskID(dev, node)
4460 if not self.rpc.call_blockdev_find(node, dev):
4461 raise errors.OpExecError("Can't find disk/%d on node %s" %
4464 # Step: check other node consistency
4465 self.proc.LogStep(2, steps_total, "check peer consistency")
4466 for idx, dev in enumerate(instance.disks):
4467 if idx not in self.op.disks:
4469 info("checking disk/%d consistency on %s" % (idx, oth_node))
4470 if not _CheckDiskConsistency(self, dev, oth_node,
4471 oth_node==instance.primary_node):
4472 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4473 " to replace disks on this node (%s)" %
4474 (oth_node, tgt_node))
4476 # Step: create new storage
4477 self.proc.LogStep(3, steps_total, "allocate new storage")
4478 for idx, dev in enumerate(instance.disks):
4479 if idx not in self.op.disks:
4482 cfg.SetDiskID(dev, tgt_node)
4483 lv_names = [".disk%d_%s" % (idx, suf)
4484 for suf in ["data", "meta"]]
4485 names = _GenerateUniqueNames(self, lv_names)
4486 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4487 logical_id=(vgname, names[0]))
4488 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4489 logical_id=(vgname, names[1]))
4490 new_lvs = [lv_data, lv_meta]
4491 old_lvs = dev.children
4492 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4493 info("creating new local storage on %s for %s" %
4494 (tgt_node, dev.iv_name))
4495 # since we *always* want to create this LV, we use the
4496 # _Create...OnPrimary (which forces the creation), even if we
4497 # are talking about the secondary node
4498 for new_lv in new_lvs:
4499 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4500 _GetInstanceInfoText(instance)):
4501 raise errors.OpExecError("Failed to create new LV named '%s' on"
4503 (new_lv.logical_id[1], tgt_node))
4505 # Step: for each lv, detach+rename*2+attach
4506 self.proc.LogStep(4, steps_total, "change drbd configuration")
4507 for dev, old_lvs, new_lvs in iv_names.itervalues():
4508 info("detaching %s drbd from local storage" % dev.iv_name)
4509 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4512 raise errors.OpExecError("Can't detach drbd from local storage on node"
4513 " %s for device %s" % (tgt_node, dev.iv_name))
4515 #cfg.Update(instance)
4517 # ok, we created the new LVs, so now we know we have the needed
4518 # storage; as such, we proceed on the target node to rename
4519 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4520 # using the assumption that logical_id == physical_id (which in
4521 # turn is the unique_id on that node)
4523 # FIXME(iustin): use a better name for the replaced LVs
4524 temp_suffix = int(time.time())
4525 ren_fn = lambda d, suff: (d.physical_id[0],
4526 d.physical_id[1] + "_replaced-%s" % suff)
4527 # build the rename list based on what LVs exist on the node
4529 for to_ren in old_lvs:
4530 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4531 if not find_res.failed and find_res.data is not None: # device exists
4532 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4534 info("renaming the old LVs on the target node")
4535 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4538 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4539 # now we rename the new LVs to the old LVs
4540 info("renaming the new LVs on the target node")
4541 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4542 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4545 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4547 for old, new in zip(old_lvs, new_lvs):
4548 new.logical_id = old.logical_id
4549 cfg.SetDiskID(new, tgt_node)
4551 for disk in old_lvs:
4552 disk.logical_id = ren_fn(disk, temp_suffix)
4553 cfg.SetDiskID(disk, tgt_node)
4555 # now that the new lvs have the old name, we can add them to the device
4556 info("adding new mirror component on %s" % tgt_node)
4557 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4558 if result.failed or not result.data:
4559 for new_lv in new_lvs:
4560 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4561 if result.failed or not result.data:
4562 warning("Can't rollback device %s", hint="manually cleanup unused"
4564 raise errors.OpExecError("Can't add local storage to drbd")
4566 dev.children = new_lvs
4567 cfg.Update(instance)
4569 # Step: wait for sync
4571 # this can fail as the old devices are degraded and _WaitForSync
4572 # does a combined result over all disks, so we don't check its
4574 self.proc.LogStep(5, steps_total, "sync devices")
4575 _WaitForSync(self, instance, unlock=True)
4577 # so check manually all the devices
4578 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4579 cfg.SetDiskID(dev, instance.primary_node)
4580 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4581 if result.failed or result.data[5]:
4582 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4584 # Step: remove old storage
4585 self.proc.LogStep(6, steps_total, "removing old storage")
4586 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4587 info("remove logical volumes for %s" % name)
4589 cfg.SetDiskID(lv, tgt_node)
4590 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4591 if result.failed or not result.data:
4592 warning("Can't remove old LV", hint="manually remove unused LVs")
4595 def _ExecD8Secondary(self, feedback_fn):
4596 """Replace the secondary node for drbd8.
4598 The algorithm for replace is quite complicated:
4599 - for all disks of the instance:
4600 - create new LVs on the new node with same names
4601 - shutdown the drbd device on the old secondary
4602 - disconnect the drbd network on the primary
4603 - create the drbd device on the new secondary
4604 - network attach the drbd on the primary, using an artifice:
4605 the drbd code for Attach() will connect to the network if it
4606 finds a device which is connected to the good local disks but
4608 - wait for sync across all devices
4609 - remove all disks from the old secondary
4611 Failures are not very well handled.
4615 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4616 instance = self.instance
4620 old_node = self.tgt_node
4621 new_node = self.new_node
4622 pri_node = instance.primary_node
4624 # Step: check device activation
4625 self.proc.LogStep(1, steps_total, "check device existence")
4626 info("checking volume groups")
4627 my_vg = cfg.GetVGName()
4628 results = self.rpc.call_vg_list([pri_node, new_node])
4629 for node in pri_node, new_node:
4631 if res.failed or not res.data or my_vg not in res.data:
4632 raise errors.OpExecError("Volume group '%s' not found on %s" %
4634 for idx, dev in enumerate(instance.disks):
4635 if idx not in self.op.disks:
4637 info("checking disk/%d on %s" % (idx, pri_node))
4638 cfg.SetDiskID(dev, pri_node)
4639 result = self.rpc.call_blockdev_find(pri_node, dev)
4642 raise errors.OpExecError("Can't find disk/%d on node %s" %
4645 # Step: check other node consistency
4646 self.proc.LogStep(2, steps_total, "check peer consistency")
4647 for idx, dev in enumerate(instance.disks):
4648 if idx not in self.op.disks:
4650 info("checking disk/%d consistency on %s" % (idx, pri_node))
4651 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4652 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4653 " unsafe to replace the secondary" %
4656 # Step: create new storage
4657 self.proc.LogStep(3, steps_total, "allocate new storage")
4658 for idx, dev in enumerate(instance.disks):
4659 info("adding new local storage on %s for disk/%d" %
4661 # since we *always* want to create this LV, we use the
4662 # _Create...OnPrimary (which forces the creation), even if we
4663 # are talking about the secondary node
4664 for new_lv in dev.children:
4665 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4666 _GetInstanceInfoText(instance)):
4667 raise errors.OpExecError("Failed to create new LV named '%s' on"
4669 (new_lv.logical_id[1], new_node))
4671 # Step 4: dbrd minors and drbd setups changes
4672 # after this, we must manually remove the drbd minors on both the
4673 # error and the success paths
4674 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4676 logging.debug("Allocated minors %s" % (minors,))
4677 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4678 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4680 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4681 # create new devices on new_node
4682 if pri_node == dev.logical_id[0]:
4683 new_logical_id = (pri_node, new_node,
4684 dev.logical_id[2], dev.logical_id[3], new_minor,
4687 new_logical_id = (new_node, pri_node,
4688 dev.logical_id[2], new_minor, dev.logical_id[4],
4690 iv_names[idx] = (dev, dev.children, new_logical_id)
4691 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4693 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4694 logical_id=new_logical_id,
4695 children=dev.children)
4696 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4698 _GetInstanceInfoText(instance)):
4699 self.cfg.ReleaseDRBDMinors(instance.name)
4700 raise errors.OpExecError("Failed to create new DRBD on"
4701 " node '%s'" % new_node)
4703 for idx, dev in enumerate(instance.disks):
4704 # we have new devices, shutdown the drbd on the old secondary
4705 info("shutting down drbd for disk/%d on old node" % idx)
4706 cfg.SetDiskID(dev, old_node)
4707 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4708 if result.failed or not result.data:
4709 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4710 hint="Please cleanup this device manually as soon as possible")
4712 info("detaching primary drbds from the network (=> standalone)")
4714 for idx, dev in enumerate(instance.disks):
4715 cfg.SetDiskID(dev, pri_node)
4716 # set the network part of the physical (unique in bdev terms) id
4717 # to None, meaning detach from network
4718 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4719 # and 'find' the device, which will 'fix' it to match the
4721 result = self.rpc.call_blockdev_find(pri_node, dev)
4722 if not result.failed and result.data:
4725 warning("Failed to detach drbd disk/%d from network, unusual case" %
4729 # no detaches succeeded (very unlikely)
4730 self.cfg.ReleaseDRBDMinors(instance.name)
4731 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4733 # if we managed to detach at least one, we update all the disks of
4734 # the instance to point to the new secondary
4735 info("updating instance configuration")
4736 for dev, _, new_logical_id in iv_names.itervalues():
4737 dev.logical_id = new_logical_id
4738 cfg.SetDiskID(dev, pri_node)
4739 cfg.Update(instance)
4740 # we can remove now the temp minors as now the new values are
4741 # written to the config file (and therefore stable)
4742 self.cfg.ReleaseDRBDMinors(instance.name)
4744 # and now perform the drbd attach
4745 info("attaching primary drbds to new secondary (standalone => connected)")
4746 for idx, dev in enumerate(instance.disks):
4747 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4748 # since the attach is smart, it's enough to 'find' the device,
4749 # it will automatically activate the network, if the physical_id
4751 cfg.SetDiskID(dev, pri_node)
4752 logging.debug("Disk to attach: %s", dev)
4753 result = self.rpc.call_blockdev_find(pri_node, dev)
4754 if result.failed or not result.data:
4755 warning("can't attach drbd disk/%d to new secondary!" % idx,
4756 "please do a gnt-instance info to see the status of disks")
4758 # this can fail as the old devices are degraded and _WaitForSync
4759 # does a combined result over all disks, so we don't check its
4761 self.proc.LogStep(5, steps_total, "sync devices")
4762 _WaitForSync(self, instance, unlock=True)
4764 # so check manually all the devices
4765 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4766 cfg.SetDiskID(dev, pri_node)
4767 result = self.rpc.call_blockdev_find(pri_node, dev)
4770 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4772 self.proc.LogStep(6, steps_total, "removing old storage")
4773 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4774 info("remove logical volumes for disk/%d" % idx)
4776 cfg.SetDiskID(lv, old_node)
4777 result = self.rpc.call_blockdev_remove(old_node, lv)
4778 if result.failed or not result.data:
4779 warning("Can't remove LV on old secondary",
4780 hint="Cleanup stale volumes by hand")
4782 def Exec(self, feedback_fn):
4783 """Execute disk replacement.
4785 This dispatches the disk replacement to the appropriate handler.
4788 instance = self.instance
4790 # Activate the instance disks if we're replacing them on a down instance
4791 if instance.status == "down":
4792 _StartInstanceDisks(self, instance, True)
4794 if instance.disk_template == constants.DT_DRBD8:
4795 if self.op.remote_node is None:
4796 fn = self._ExecD8DiskOnly
4798 fn = self._ExecD8Secondary
4800 raise errors.ProgrammerError("Unhandled disk replacement case")
4802 ret = fn(feedback_fn)
4804 # Deactivate the instance disks if we're replacing them on a down instance
4805 if instance.status == "down":
4806 _SafeShutdownInstanceDisks(self, instance)
4811 class LUGrowDisk(LogicalUnit):
4812 """Grow a disk of an instance.
4816 HTYPE = constants.HTYPE_INSTANCE
4817 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4820 def ExpandNames(self):
4821 self._ExpandAndLockInstance()
4822 self.needed_locks[locking.LEVEL_NODE] = []
4823 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4825 def DeclareLocks(self, level):
4826 if level == locking.LEVEL_NODE:
4827 self._LockInstancesNodes()
4829 def BuildHooksEnv(self):
4832 This runs on the master, the primary and all the secondaries.
4836 "DISK": self.op.disk,
4837 "AMOUNT": self.op.amount,
4839 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4841 self.cfg.GetMasterNode(),
4842 self.instance.primary_node,
4846 def CheckPrereq(self):
4847 """Check prerequisites.
4849 This checks that the instance is in the cluster.
4852 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4853 assert instance is not None, \
4854 "Cannot retrieve locked instance %s" % self.op.instance_name
4855 _CheckNodeOnline(self, instance.primary_node)
4856 for node in instance.secondary_nodes:
4857 _CheckNodeOnline(self, node)
4860 self.instance = instance
4862 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4863 raise errors.OpPrereqError("Instance's disk layout does not support"
4866 self.disk = instance.FindDisk(self.op.disk)
4868 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4869 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4870 instance.hypervisor)
4871 for node in nodenames:
4872 info = nodeinfo[node]
4873 if info.failed or not info.data:
4874 raise errors.OpPrereqError("Cannot get current information"
4875 " from node '%s'" % node)
4876 vg_free = info.data.get('vg_free', None)
4877 if not isinstance(vg_free, int):
4878 raise errors.OpPrereqError("Can't compute free disk space on"
4880 if self.op.amount > vg_free:
4881 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4882 " %d MiB available, %d MiB required" %
4883 (node, vg_free, self.op.amount))
4885 def Exec(self, feedback_fn):
4886 """Execute disk grow.
4889 instance = self.instance
4891 for node in (instance.secondary_nodes + (instance.primary_node,)):
4892 self.cfg.SetDiskID(disk, node)
4893 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4895 if (not result.data or not isinstance(result.data, (list, tuple)) or
4896 len(result.data) != 2):
4897 raise errors.OpExecError("Grow request failed to node %s" % node)
4898 elif not result.data[0]:
4899 raise errors.OpExecError("Grow request failed to node %s: %s" %
4900 (node, result.data[1]))
4901 disk.RecordGrow(self.op.amount)
4902 self.cfg.Update(instance)
4903 if self.op.wait_for_sync:
4904 disk_abort = not _WaitForSync(self, instance)
4906 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4907 " status.\nPlease check the instance.")
4910 class LUQueryInstanceData(NoHooksLU):
4911 """Query runtime instance data.
4914 _OP_REQP = ["instances", "static"]
4917 def ExpandNames(self):
4918 self.needed_locks = {}
4919 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4921 if not isinstance(self.op.instances, list):
4922 raise errors.OpPrereqError("Invalid argument type 'instances'")
4924 if self.op.instances:
4925 self.wanted_names = []
4926 for name in self.op.instances:
4927 full_name = self.cfg.ExpandInstanceName(name)
4928 if full_name is None:
4929 raise errors.OpPrereqError("Instance '%s' not known" %
4930 self.op.instance_name)
4931 self.wanted_names.append(full_name)
4932 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4934 self.wanted_names = None
4935 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4937 self.needed_locks[locking.LEVEL_NODE] = []
4938 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4940 def DeclareLocks(self, level):
4941 if level == locking.LEVEL_NODE:
4942 self._LockInstancesNodes()
4944 def CheckPrereq(self):
4945 """Check prerequisites.
4947 This only checks the optional instance list against the existing names.
4950 if self.wanted_names is None:
4951 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4953 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4954 in self.wanted_names]
4957 def _ComputeDiskStatus(self, instance, snode, dev):
4958 """Compute block device status.
4961 static = self.op.static
4963 self.cfg.SetDiskID(dev, instance.primary_node)
4964 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4966 dev_pstatus = dev_pstatus.data
4970 if dev.dev_type in constants.LDS_DRBD:
4971 # we change the snode then (otherwise we use the one passed in)
4972 if dev.logical_id[0] == instance.primary_node:
4973 snode = dev.logical_id[1]
4975 snode = dev.logical_id[0]
4977 if snode and not static:
4978 self.cfg.SetDiskID(dev, snode)
4979 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4981 dev_sstatus = dev_sstatus.data
4986 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4987 for child in dev.children]
4992 "iv_name": dev.iv_name,
4993 "dev_type": dev.dev_type,
4994 "logical_id": dev.logical_id,
4995 "physical_id": dev.physical_id,
4996 "pstatus": dev_pstatus,
4997 "sstatus": dev_sstatus,
4998 "children": dev_children,
5004 def Exec(self, feedback_fn):
5005 """Gather and return data"""
5008 cluster = self.cfg.GetClusterInfo()
5010 for instance in self.wanted_instances:
5011 if not self.op.static:
5012 remote_info = self.rpc.call_instance_info(instance.primary_node,
5014 instance.hypervisor)
5016 remote_info = remote_info.data
5017 if remote_info and "state" in remote_info:
5020 remote_state = "down"
5023 if instance.status == "down":
5024 config_state = "down"
5028 disks = [self._ComputeDiskStatus(instance, None, device)
5029 for device in instance.disks]
5032 "name": instance.name,
5033 "config_state": config_state,
5034 "run_state": remote_state,
5035 "pnode": instance.primary_node,
5036 "snodes": instance.secondary_nodes,
5038 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5040 "hypervisor": instance.hypervisor,
5041 "network_port": instance.network_port,
5042 "hv_instance": instance.hvparams,
5043 "hv_actual": cluster.FillHV(instance),
5044 "be_instance": instance.beparams,
5045 "be_actual": cluster.FillBE(instance),
5048 result[instance.name] = idict
5053 class LUSetInstanceParams(LogicalUnit):
5054 """Modifies an instances's parameters.
5057 HPATH = "instance-modify"
5058 HTYPE = constants.HTYPE_INSTANCE
5059 _OP_REQP = ["instance_name"]
5062 def CheckArguments(self):
5063 if not hasattr(self.op, 'nics'):
5065 if not hasattr(self.op, 'disks'):
5067 if not hasattr(self.op, 'beparams'):
5068 self.op.beparams = {}
5069 if not hasattr(self.op, 'hvparams'):
5070 self.op.hvparams = {}
5071 self.op.force = getattr(self.op, "force", False)
5072 if not (self.op.nics or self.op.disks or
5073 self.op.hvparams or self.op.beparams):
5074 raise errors.OpPrereqError("No changes submitted")
5076 utils.CheckBEParams(self.op.beparams)
5080 for disk_op, disk_dict in self.op.disks:
5081 if disk_op == constants.DDM_REMOVE:
5084 elif disk_op == constants.DDM_ADD:
5087 if not isinstance(disk_op, int):
5088 raise errors.OpPrereqError("Invalid disk index")
5089 if disk_op == constants.DDM_ADD:
5090 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5091 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5092 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5093 size = disk_dict.get('size', None)
5095 raise errors.OpPrereqError("Required disk parameter size missing")
5098 except ValueError, err:
5099 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5101 disk_dict['size'] = size
5103 # modification of disk
5104 if 'size' in disk_dict:
5105 raise errors.OpPrereqError("Disk size change not possible, use"
5108 if disk_addremove > 1:
5109 raise errors.OpPrereqError("Only one disk add or remove operation"
5110 " supported at a time")
5114 for nic_op, nic_dict in self.op.nics:
5115 if nic_op == constants.DDM_REMOVE:
5118 elif nic_op == constants.DDM_ADD:
5121 if not isinstance(nic_op, int):
5122 raise errors.OpPrereqError("Invalid nic index")
5124 # nic_dict should be a dict
5125 nic_ip = nic_dict.get('ip', None)
5126 if nic_ip is not None:
5127 if nic_ip.lower() == "none":
5128 nic_dict['ip'] = None
5130 if not utils.IsValidIP(nic_ip):
5131 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5132 # we can only check None bridges and assign the default one
5133 nic_bridge = nic_dict.get('bridge', None)
5134 if nic_bridge is None:
5135 nic_dict['bridge'] = self.cfg.GetDefBridge()
5136 # but we can validate MACs
5137 nic_mac = nic_dict.get('mac', None)
5138 if nic_mac is not None:
5139 if self.cfg.IsMacInUse(nic_mac):
5140 raise errors.OpPrereqError("MAC address %s already in use"
5141 " in cluster" % nic_mac)
5142 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5143 if not utils.IsValidMac(nic_mac):
5144 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5145 if nic_addremove > 1:
5146 raise errors.OpPrereqError("Only one NIC add or remove operation"
5147 " supported at a time")
5149 def ExpandNames(self):
5150 self._ExpandAndLockInstance()
5151 self.needed_locks[locking.LEVEL_NODE] = []
5152 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5154 def DeclareLocks(self, level):
5155 if level == locking.LEVEL_NODE:
5156 self._LockInstancesNodes()
5158 def BuildHooksEnv(self):
5161 This runs on the master, primary and secondaries.
5165 if constants.BE_MEMORY in self.be_new:
5166 args['memory'] = self.be_new[constants.BE_MEMORY]
5167 if constants.BE_VCPUS in self.be_new:
5168 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5169 # FIXME: readd disk/nic changes
5170 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5171 nl = [self.cfg.GetMasterNode(),
5172 self.instance.primary_node] + list(self.instance.secondary_nodes)
5175 def CheckPrereq(self):
5176 """Check prerequisites.
5178 This only checks the instance list against the existing names.
5181 force = self.force = self.op.force
5183 # checking the new params on the primary/secondary nodes
5185 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5186 assert self.instance is not None, \
5187 "Cannot retrieve locked instance %s" % self.op.instance_name
5188 pnode = self.instance.primary_node
5190 nodelist.extend(instance.secondary_nodes)
5192 # hvparams processing
5193 if self.op.hvparams:
5194 i_hvdict = copy.deepcopy(instance.hvparams)
5195 for key, val in self.op.hvparams.iteritems():
5196 if val == constants.VALUE_DEFAULT:
5201 elif val == constants.VALUE_NONE:
5202 i_hvdict[key] = None
5205 cluster = self.cfg.GetClusterInfo()
5206 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5209 hypervisor.GetHypervisor(
5210 instance.hypervisor).CheckParameterSyntax(hv_new)
5211 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5212 self.hv_new = hv_new # the new actual values
5213 self.hv_inst = i_hvdict # the new dict (without defaults)
5215 self.hv_new = self.hv_inst = {}
5217 # beparams processing
5218 if self.op.beparams:
5219 i_bedict = copy.deepcopy(instance.beparams)
5220 for key, val in self.op.beparams.iteritems():
5221 if val == constants.VALUE_DEFAULT:
5228 cluster = self.cfg.GetClusterInfo()
5229 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5231 self.be_new = be_new # the new actual values
5232 self.be_inst = i_bedict # the new dict (without defaults)
5234 self.be_new = self.be_inst = {}
5238 if constants.BE_MEMORY in self.op.beparams and not self.force:
5239 mem_check_list = [pnode]
5240 if be_new[constants.BE_AUTO_BALANCE]:
5241 # either we changed auto_balance to yes or it was from before
5242 mem_check_list.extend(instance.secondary_nodes)
5243 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5244 instance.hypervisor)
5245 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5246 instance.hypervisor)
5247 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5248 # Assume the primary node is unreachable and go ahead
5249 self.warn.append("Can't get info from primary node %s" % pnode)
5251 if not instance_info.failed and instance_info.data:
5252 current_mem = instance_info.data['memory']
5254 # Assume instance not running
5255 # (there is a slight race condition here, but it's not very probable,
5256 # and we have no other way to check)
5258 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5259 nodeinfo[pnode].data['memory_free'])
5261 raise errors.OpPrereqError("This change will prevent the instance"
5262 " from starting, due to %d MB of memory"
5263 " missing on its primary node" % miss_mem)
5265 if be_new[constants.BE_AUTO_BALANCE]:
5266 for node, nres in instance.secondary_nodes.iteritems():
5267 if nres.failed or not isinstance(nres.data, dict):
5268 self.warn.append("Can't get info from secondary node %s" % node)
5269 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5270 self.warn.append("Not enough memory to failover instance to"
5271 " secondary node %s" % node)
5274 for nic_op, nic_dict in self.op.nics:
5275 if nic_op == constants.DDM_REMOVE:
5276 if not instance.nics:
5277 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5279 if nic_op != constants.DDM_ADD:
5281 if nic_op < 0 or nic_op >= len(instance.nics):
5282 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5284 (nic_op, len(instance.nics)))
5285 nic_bridge = nic_dict.get('bridge', None)
5286 if nic_bridge is not None:
5287 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5288 msg = ("Bridge '%s' doesn't exist on one of"
5289 " the instance nodes" % nic_bridge)
5291 self.warn.append(msg)
5293 raise errors.OpPrereqError(msg)
5296 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5297 raise errors.OpPrereqError("Disk operations not supported for"
5298 " diskless instances")
5299 for disk_op, disk_dict in self.op.disks:
5300 if disk_op == constants.DDM_REMOVE:
5301 if len(instance.disks) == 1:
5302 raise errors.OpPrereqError("Cannot remove the last disk of"
5304 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5305 ins_l = ins_l[pnode]
5306 if not type(ins_l) is list:
5307 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5308 if instance.name in ins_l:
5309 raise errors.OpPrereqError("Instance is running, can't remove"
5312 if (disk_op == constants.DDM_ADD and
5313 len(instance.nics) >= constants.MAX_DISKS):
5314 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5315 " add more" % constants.MAX_DISKS)
5316 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5318 if disk_op < 0 or disk_op >= len(instance.disks):
5319 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5321 (disk_op, len(instance.disks)))
5325 def Exec(self, feedback_fn):
5326 """Modifies an instance.
5328 All parameters take effect only at the next restart of the instance.
5331 # Process here the warnings from CheckPrereq, as we don't have a
5332 # feedback_fn there.
5333 for warn in self.warn:
5334 feedback_fn("WARNING: %s" % warn)
5337 instance = self.instance
5339 for disk_op, disk_dict in self.op.disks:
5340 if disk_op == constants.DDM_REMOVE:
5341 # remove the last disk
5342 device = instance.disks.pop()
5343 device_idx = len(instance.disks)
5344 for node, disk in device.ComputeNodeTree(instance.primary_node):
5345 self.cfg.SetDiskID(disk, node)
5346 result = self.rpc.call_blockdev_remove(node, disk)
5347 if result.failed or not result.data:
5348 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5349 " continuing anyway", device_idx, node)
5350 result.append(("disk/%d" % device_idx, "remove"))
5351 elif disk_op == constants.DDM_ADD:
5353 if instance.disk_template == constants.DT_FILE:
5354 file_driver, file_path = instance.disks[0].logical_id
5355 file_path = os.path.dirname(file_path)
5357 file_driver = file_path = None
5358 disk_idx_base = len(instance.disks)
5359 new_disk = _GenerateDiskTemplate(self,
5360 instance.disk_template,
5361 instance, instance.primary_node,
5362 instance.secondary_nodes,
5367 new_disk.mode = disk_dict['mode']
5368 instance.disks.append(new_disk)
5369 info = _GetInstanceInfoText(instance)
5371 logging.info("Creating volume %s for instance %s",
5372 new_disk.iv_name, instance.name)
5373 # Note: this needs to be kept in sync with _CreateDisks
5375 for secondary_node in instance.secondary_nodes:
5376 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5377 new_disk, False, info):
5378 self.LogWarning("Failed to create volume %s (%s) on"
5379 " secondary node %s!",
5380 new_disk.iv_name, new_disk, secondary_node)
5382 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5383 instance, new_disk, info):
5384 self.LogWarning("Failed to create volume %s on primary!",
5386 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5387 (new_disk.size, new_disk.mode)))
5389 # change a given disk
5390 instance.disks[disk_op].mode = disk_dict['mode']
5391 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5393 for nic_op, nic_dict in self.op.nics:
5394 if nic_op == constants.DDM_REMOVE:
5395 # remove the last nic
5396 del instance.nics[-1]
5397 result.append(("nic.%d" % len(instance.nics), "remove"))
5398 elif nic_op == constants.DDM_ADD:
5400 if 'mac' not in nic_dict:
5401 mac = constants.VALUE_GENERATE
5403 mac = nic_dict['mac']
5404 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5405 mac = self.cfg.GenerateMAC()
5406 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5407 bridge=nic_dict.get('bridge', None))
5408 instance.nics.append(new_nic)
5409 result.append(("nic.%d" % (len(instance.nics) - 1),
5410 "add:mac=%s,ip=%s,bridge=%s" %
5411 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5413 # change a given nic
5414 for key in 'mac', 'ip', 'bridge':
5416 setattr(instance.nics[nic_op], key, nic_dict[key])
5417 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5420 if self.op.hvparams:
5421 instance.hvparams = self.hv_new
5422 for key, val in self.op.hvparams.iteritems():
5423 result.append(("hv/%s" % key, val))
5426 if self.op.beparams:
5427 instance.beparams = self.be_inst
5428 for key, val in self.op.beparams.iteritems():
5429 result.append(("be/%s" % key, val))
5431 self.cfg.Update(instance)
5436 class LUQueryExports(NoHooksLU):
5437 """Query the exports list
5440 _OP_REQP = ['nodes']
5443 def ExpandNames(self):
5444 self.needed_locks = {}
5445 self.share_locks[locking.LEVEL_NODE] = 1
5446 if not self.op.nodes:
5447 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5449 self.needed_locks[locking.LEVEL_NODE] = \
5450 _GetWantedNodes(self, self.op.nodes)
5452 def CheckPrereq(self):
5453 """Check prerequisites.
5456 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5458 def Exec(self, feedback_fn):
5459 """Compute the list of all the exported system images.
5462 @return: a dictionary with the structure node->(export-list)
5463 where export-list is a list of the instances exported on
5467 rpcresult = self.rpc.call_export_list(self.nodes)
5469 for node in rpcresult:
5470 if rpcresult[node].failed:
5471 result[node] = False
5473 result[node] = rpcresult[node].data
5478 class LUExportInstance(LogicalUnit):
5479 """Export an instance to an image in the cluster.
5482 HPATH = "instance-export"
5483 HTYPE = constants.HTYPE_INSTANCE
5484 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5487 def ExpandNames(self):
5488 self._ExpandAndLockInstance()
5489 # FIXME: lock only instance primary and destination node
5491 # Sad but true, for now we have do lock all nodes, as we don't know where
5492 # the previous export might be, and and in this LU we search for it and
5493 # remove it from its current node. In the future we could fix this by:
5494 # - making a tasklet to search (share-lock all), then create the new one,
5495 # then one to remove, after
5496 # - removing the removal operation altoghether
5497 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5499 def DeclareLocks(self, level):
5500 """Last minute lock declaration."""
5501 # All nodes are locked anyway, so nothing to do here.
5503 def BuildHooksEnv(self):
5506 This will run on the master, primary node and target node.
5510 "EXPORT_NODE": self.op.target_node,
5511 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5513 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5514 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5515 self.op.target_node]
5518 def CheckPrereq(self):
5519 """Check prerequisites.
5521 This checks that the instance and node names are valid.
5524 instance_name = self.op.instance_name
5525 self.instance = self.cfg.GetInstanceInfo(instance_name)
5526 assert self.instance is not None, \
5527 "Cannot retrieve locked instance %s" % self.op.instance_name
5528 _CheckNodeOnline(self, self.instance.primary_node)
5530 self.dst_node = self.cfg.GetNodeInfo(
5531 self.cfg.ExpandNodeName(self.op.target_node))
5533 if self.dst_node is None:
5534 # This is wrong node name, not a non-locked node
5535 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5536 _CheckNodeOnline(self, self.op.target_node)
5538 # instance disk type verification
5539 for disk in self.instance.disks:
5540 if disk.dev_type == constants.LD_FILE:
5541 raise errors.OpPrereqError("Export not supported for instances with"
5542 " file-based disks")
5544 def Exec(self, feedback_fn):
5545 """Export an instance to an image in the cluster.
5548 instance = self.instance
5549 dst_node = self.dst_node
5550 src_node = instance.primary_node
5551 if self.op.shutdown:
5552 # shutdown the instance, but not the disks
5553 result = self.rpc.call_instance_shutdown(src_node, instance)
5556 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5557 (instance.name, src_node))
5559 vgname = self.cfg.GetVGName()
5564 for disk in instance.disks:
5565 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5566 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5567 if new_dev_name.failed or not new_dev_name.data:
5568 self.LogWarning("Could not snapshot block device %s on node %s",
5569 disk.logical_id[1], src_node)
5570 snap_disks.append(False)
5572 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5573 logical_id=(vgname, new_dev_name.data),
5574 physical_id=(vgname, new_dev_name.data),
5575 iv_name=disk.iv_name)
5576 snap_disks.append(new_dev)
5579 if self.op.shutdown and instance.status == "up":
5580 result = self.rpc.call_instance_start(src_node, instance, None)
5581 if result.failed or not result.data:
5582 _ShutdownInstanceDisks(self, instance)
5583 raise errors.OpExecError("Could not start instance")
5585 # TODO: check for size
5587 cluster_name = self.cfg.GetClusterName()
5588 for idx, dev in enumerate(snap_disks):
5590 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5591 instance, cluster_name, idx)
5592 if result.failed or not result.data:
5593 self.LogWarning("Could not export block device %s from node %s to"
5594 " node %s", dev.logical_id[1], src_node,
5596 result = self.rpc.call_blockdev_remove(src_node, dev)
5597 if result.failed or not result.data:
5598 self.LogWarning("Could not remove snapshot block device %s from node"
5599 " %s", dev.logical_id[1], src_node)
5601 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5602 if result.failed or not result.data:
5603 self.LogWarning("Could not finalize export for instance %s on node %s",
5604 instance.name, dst_node.name)
5606 nodelist = self.cfg.GetNodeList()
5607 nodelist.remove(dst_node.name)
5609 # on one-node clusters nodelist will be empty after the removal
5610 # if we proceed the backup would be removed because OpQueryExports
5611 # substitutes an empty list with the full cluster node list.
5613 exportlist = self.rpc.call_export_list(nodelist)
5614 for node in exportlist:
5615 if exportlist[node].failed:
5617 if instance.name in exportlist[node].data:
5618 if not self.rpc.call_export_remove(node, instance.name):
5619 self.LogWarning("Could not remove older export for instance %s"
5620 " on node %s", instance.name, node)
5623 class LURemoveExport(NoHooksLU):
5624 """Remove exports related to the named instance.
5627 _OP_REQP = ["instance_name"]
5630 def ExpandNames(self):
5631 self.needed_locks = {}
5632 # We need all nodes to be locked in order for RemoveExport to work, but we
5633 # don't need to lock the instance itself, as nothing will happen to it (and
5634 # we can remove exports also for a removed instance)
5635 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5637 def CheckPrereq(self):
5638 """Check prerequisites.
5642 def Exec(self, feedback_fn):
5643 """Remove any export.
5646 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5647 # If the instance was not found we'll try with the name that was passed in.
5648 # This will only work if it was an FQDN, though.
5650 if not instance_name:
5652 instance_name = self.op.instance_name
5654 exportlist = self.rpc.call_export_list(self.acquired_locks[
5655 locking.LEVEL_NODE])
5657 for node in exportlist:
5658 if exportlist[node].failed:
5659 self.LogWarning("Failed to query node %s, continuing" % node)
5661 if instance_name in exportlist[node].data:
5663 result = self.rpc.call_export_remove(node, instance_name)
5664 if result.failed or not result.data:
5665 logging.error("Could not remove export for instance %s"
5666 " on node %s", instance_name, node)
5668 if fqdn_warn and not found:
5669 feedback_fn("Export not found. If trying to remove an export belonging"
5670 " to a deleted instance please use its Fully Qualified"
5674 class TagsLU(NoHooksLU):
5677 This is an abstract class which is the parent of all the other tags LUs.
5681 def ExpandNames(self):
5682 self.needed_locks = {}
5683 if self.op.kind == constants.TAG_NODE:
5684 name = self.cfg.ExpandNodeName(self.op.name)
5686 raise errors.OpPrereqError("Invalid node name (%s)" %
5689 self.needed_locks[locking.LEVEL_NODE] = name
5690 elif self.op.kind == constants.TAG_INSTANCE:
5691 name = self.cfg.ExpandInstanceName(self.op.name)
5693 raise errors.OpPrereqError("Invalid instance name (%s)" %
5696 self.needed_locks[locking.LEVEL_INSTANCE] = name
5698 def CheckPrereq(self):
5699 """Check prerequisites.
5702 if self.op.kind == constants.TAG_CLUSTER:
5703 self.target = self.cfg.GetClusterInfo()
5704 elif self.op.kind == constants.TAG_NODE:
5705 self.target = self.cfg.GetNodeInfo(self.op.name)
5706 elif self.op.kind == constants.TAG_INSTANCE:
5707 self.target = self.cfg.GetInstanceInfo(self.op.name)
5709 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5713 class LUGetTags(TagsLU):
5714 """Returns the tags of a given object.
5717 _OP_REQP = ["kind", "name"]
5720 def Exec(self, feedback_fn):
5721 """Returns the tag list.
5724 return list(self.target.GetTags())
5727 class LUSearchTags(NoHooksLU):
5728 """Searches the tags for a given pattern.
5731 _OP_REQP = ["pattern"]
5734 def ExpandNames(self):
5735 self.needed_locks = {}
5737 def CheckPrereq(self):
5738 """Check prerequisites.
5740 This checks the pattern passed for validity by compiling it.
5744 self.re = re.compile(self.op.pattern)
5745 except re.error, err:
5746 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5747 (self.op.pattern, err))
5749 def Exec(self, feedback_fn):
5750 """Returns the tag list.
5754 tgts = [("/cluster", cfg.GetClusterInfo())]
5755 ilist = cfg.GetAllInstancesInfo().values()
5756 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5757 nlist = cfg.GetAllNodesInfo().values()
5758 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5760 for path, target in tgts:
5761 for tag in target.GetTags():
5762 if self.re.search(tag):
5763 results.append((path, tag))
5767 class LUAddTags(TagsLU):
5768 """Sets a tag on a given object.
5771 _OP_REQP = ["kind", "name", "tags"]
5774 def CheckPrereq(self):
5775 """Check prerequisites.
5777 This checks the type and length of the tag name and value.
5780 TagsLU.CheckPrereq(self)
5781 for tag in self.op.tags:
5782 objects.TaggableObject.ValidateTag(tag)
5784 def Exec(self, feedback_fn):
5789 for tag in self.op.tags:
5790 self.target.AddTag(tag)
5791 except errors.TagError, err:
5792 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5794 self.cfg.Update(self.target)
5795 except errors.ConfigurationError:
5796 raise errors.OpRetryError("There has been a modification to the"
5797 " config file and the operation has been"
5798 " aborted. Please retry.")
5801 class LUDelTags(TagsLU):
5802 """Delete a list of tags from a given object.
5805 _OP_REQP = ["kind", "name", "tags"]
5808 def CheckPrereq(self):
5809 """Check prerequisites.
5811 This checks that we have the given tag.
5814 TagsLU.CheckPrereq(self)
5815 for tag in self.op.tags:
5816 objects.TaggableObject.ValidateTag(tag)
5817 del_tags = frozenset(self.op.tags)
5818 cur_tags = self.target.GetTags()
5819 if not del_tags <= cur_tags:
5820 diff_tags = del_tags - cur_tags
5821 diff_names = ["'%s'" % tag for tag in diff_tags]
5823 raise errors.OpPrereqError("Tag(s) %s not found" %
5824 (",".join(diff_names)))
5826 def Exec(self, feedback_fn):
5827 """Remove the tag from the object.
5830 for tag in self.op.tags:
5831 self.target.RemoveTag(tag)
5833 self.cfg.Update(self.target)
5834 except errors.ConfigurationError:
5835 raise errors.OpRetryError("There has been a modification to the"
5836 " config file and the operation has been"
5837 " aborted. Please retry.")
5840 class LUTestDelay(NoHooksLU):
5841 """Sleep for a specified amount of time.
5843 This LU sleeps on the master and/or nodes for a specified amount of
5847 _OP_REQP = ["duration", "on_master", "on_nodes"]
5850 def ExpandNames(self):
5851 """Expand names and set required locks.
5853 This expands the node list, if any.
5856 self.needed_locks = {}
5857 if self.op.on_nodes:
5858 # _GetWantedNodes can be used here, but is not always appropriate to use
5859 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5861 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5862 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5864 def CheckPrereq(self):
5865 """Check prerequisites.
5869 def Exec(self, feedback_fn):
5870 """Do the actual sleep.
5873 if self.op.on_master:
5874 if not utils.TestDelay(self.op.duration):
5875 raise errors.OpExecError("Error during master delay test")
5876 if self.op.on_nodes:
5877 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5879 raise errors.OpExecError("Complete failure from rpc call")
5880 for node, node_result in result.items():
5882 if not node_result.data:
5883 raise errors.OpExecError("Failure during rpc call to node %s,"
5884 " result: %s" % (node, node_result.data))
5887 class IAllocator(object):
5888 """IAllocator framework.
5890 An IAllocator instance has three sets of attributes:
5891 - cfg that is needed to query the cluster
5892 - input data (all members of the _KEYS class attribute are required)
5893 - four buffer attributes (in|out_data|text), that represent the
5894 input (to the external script) in text and data structure format,
5895 and the output from it, again in two formats
5896 - the result variables from the script (success, info, nodes) for
5901 "mem_size", "disks", "disk_template",
5902 "os", "tags", "nics", "vcpus", "hypervisor",
5908 def __init__(self, lu, mode, name, **kwargs):
5910 # init buffer variables
5911 self.in_text = self.out_text = self.in_data = self.out_data = None
5912 # init all input fields so that pylint is happy
5915 self.mem_size = self.disks = self.disk_template = None
5916 self.os = self.tags = self.nics = self.vcpus = None
5917 self.hypervisor = None
5918 self.relocate_from = None
5920 self.required_nodes = None
5921 # init result fields
5922 self.success = self.info = self.nodes = None
5923 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5924 keyset = self._ALLO_KEYS
5925 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5926 keyset = self._RELO_KEYS
5928 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5929 " IAllocator" % self.mode)
5931 if key not in keyset:
5932 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5933 " IAllocator" % key)
5934 setattr(self, key, kwargs[key])
5936 if key not in kwargs:
5937 raise errors.ProgrammerError("Missing input parameter '%s' to"
5938 " IAllocator" % key)
5939 self._BuildInputData()
5941 def _ComputeClusterData(self):
5942 """Compute the generic allocator input data.
5944 This is the data that is independent of the actual operation.
5948 cluster_info = cfg.GetClusterInfo()
5952 "cluster_name": cfg.GetClusterName(),
5953 "cluster_tags": list(cluster_info.GetTags()),
5954 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5955 # we don't have job IDs
5957 iinfo = cfg.GetAllInstancesInfo().values()
5958 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5962 node_list = cfg.GetNodeList()
5964 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5965 hypervisor_name = self.hypervisor
5966 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5967 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
5969 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5971 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5972 cluster_info.enabled_hypervisors)
5973 for nname in node_list:
5974 ninfo = cfg.GetNodeInfo(nname)
5975 node_data[nname].Raise()
5976 if not isinstance(node_data[nname].data, dict):
5977 raise errors.OpExecError("Can't get data for node %s" % nname)
5978 remote_info = node_data[nname].data
5979 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5980 'vg_size', 'vg_free', 'cpu_total']:
5981 if attr not in remote_info:
5982 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5985 remote_info[attr] = int(remote_info[attr])
5986 except ValueError, err:
5987 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5988 " %s" % (nname, attr, str(err)))
5989 # compute memory used by primary instances
5990 i_p_mem = i_p_up_mem = 0
5991 for iinfo, beinfo in i_list:
5992 if iinfo.primary_node == nname:
5993 i_p_mem += beinfo[constants.BE_MEMORY]
5994 if iinfo.name not in node_iinfo[nname]:
5997 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5998 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5999 remote_info['memory_free'] -= max(0, i_mem_diff)
6001 if iinfo.status == "up":
6002 i_p_up_mem += beinfo[constants.BE_MEMORY]
6004 # compute memory used by instances
6006 "tags": list(ninfo.GetTags()),
6007 "total_memory": remote_info['memory_total'],
6008 "reserved_memory": remote_info['memory_dom0'],
6009 "free_memory": remote_info['memory_free'],
6010 "i_pri_memory": i_p_mem,
6011 "i_pri_up_memory": i_p_up_mem,
6012 "total_disk": remote_info['vg_size'],
6013 "free_disk": remote_info['vg_free'],
6014 "primary_ip": ninfo.primary_ip,
6015 "secondary_ip": ninfo.secondary_ip,
6016 "total_cpus": remote_info['cpu_total'],
6017 "offline": ninfo.offline,
6019 node_results[nname] = pnr
6020 data["nodes"] = node_results
6024 for iinfo, beinfo in i_list:
6025 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6026 for n in iinfo.nics]
6028 "tags": list(iinfo.GetTags()),
6029 "should_run": iinfo.status == "up",
6030 "vcpus": beinfo[constants.BE_VCPUS],
6031 "memory": beinfo[constants.BE_MEMORY],
6033 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6035 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6036 "disk_template": iinfo.disk_template,
6037 "hypervisor": iinfo.hypervisor,
6039 instance_data[iinfo.name] = pir
6041 data["instances"] = instance_data
6045 def _AddNewInstance(self):
6046 """Add new instance data to allocator structure.
6048 This in combination with _AllocatorGetClusterData will create the
6049 correct structure needed as input for the allocator.
6051 The checks for the completeness of the opcode must have already been
6056 if len(self.disks) != 2:
6057 raise errors.OpExecError("Only two-disk configurations supported")
6059 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6061 if self.disk_template in constants.DTS_NET_MIRROR:
6062 self.required_nodes = 2
6064 self.required_nodes = 1
6068 "disk_template": self.disk_template,
6071 "vcpus": self.vcpus,
6072 "memory": self.mem_size,
6073 "disks": self.disks,
6074 "disk_space_total": disk_space,
6076 "required_nodes": self.required_nodes,
6078 data["request"] = request
6080 def _AddRelocateInstance(self):
6081 """Add relocate instance data to allocator structure.
6083 This in combination with _IAllocatorGetClusterData will create the
6084 correct structure needed as input for the allocator.
6086 The checks for the completeness of the opcode must have already been
6090 instance = self.lu.cfg.GetInstanceInfo(self.name)
6091 if instance is None:
6092 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6093 " IAllocator" % self.name)
6095 if instance.disk_template not in constants.DTS_NET_MIRROR:
6096 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6098 if len(instance.secondary_nodes) != 1:
6099 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6101 self.required_nodes = 1
6102 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6103 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6108 "disk_space_total": disk_space,
6109 "required_nodes": self.required_nodes,
6110 "relocate_from": self.relocate_from,
6112 self.in_data["request"] = request
6114 def _BuildInputData(self):
6115 """Build input data structures.
6118 self._ComputeClusterData()
6120 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6121 self._AddNewInstance()
6123 self._AddRelocateInstance()
6125 self.in_text = serializer.Dump(self.in_data)
6127 def Run(self, name, validate=True, call_fn=None):
6128 """Run an instance allocator and return the results.
6132 call_fn = self.lu.rpc.call_iallocator_runner
6135 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6138 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6139 raise errors.OpExecError("Invalid result from master iallocator runner")
6141 rcode, stdout, stderr, fail = result.data
6143 if rcode == constants.IARUN_NOTFOUND:
6144 raise errors.OpExecError("Can't find allocator '%s'" % name)
6145 elif rcode == constants.IARUN_FAILURE:
6146 raise errors.OpExecError("Instance allocator call failed: %s,"
6147 " output: %s" % (fail, stdout+stderr))
6148 self.out_text = stdout
6150 self._ValidateResult()
6152 def _ValidateResult(self):
6153 """Process the allocator results.
6155 This will process and if successful save the result in
6156 self.out_data and the other parameters.
6160 rdict = serializer.Load(self.out_text)
6161 except Exception, err:
6162 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6164 if not isinstance(rdict, dict):
6165 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6167 for key in "success", "info", "nodes":
6168 if key not in rdict:
6169 raise errors.OpExecError("Can't parse iallocator results:"
6170 " missing key '%s'" % key)
6171 setattr(self, key, rdict[key])
6173 if not isinstance(rdict["nodes"], list):
6174 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6176 self.out_data = rdict
6179 class LUTestAllocator(NoHooksLU):
6180 """Run allocator tests.
6182 This LU runs the allocator tests
6185 _OP_REQP = ["direction", "mode", "name"]
6187 def CheckPrereq(self):
6188 """Check prerequisites.
6190 This checks the opcode parameters depending on the director and mode test.
6193 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6194 for attr in ["name", "mem_size", "disks", "disk_template",
6195 "os", "tags", "nics", "vcpus"]:
6196 if not hasattr(self.op, attr):
6197 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6199 iname = self.cfg.ExpandInstanceName(self.op.name)
6200 if iname is not None:
6201 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6203 if not isinstance(self.op.nics, list):
6204 raise errors.OpPrereqError("Invalid parameter 'nics'")
6205 for row in self.op.nics:
6206 if (not isinstance(row, dict) or
6209 "bridge" not in row):
6210 raise errors.OpPrereqError("Invalid contents of the"
6211 " 'nics' parameter")
6212 if not isinstance(self.op.disks, list):
6213 raise errors.OpPrereqError("Invalid parameter 'disks'")
6214 if len(self.op.disks) != 2:
6215 raise errors.OpPrereqError("Only two-disk configurations supported")
6216 for row in self.op.disks:
6217 if (not isinstance(row, dict) or
6218 "size" not in row or
6219 not isinstance(row["size"], int) or
6220 "mode" not in row or
6221 row["mode"] not in ['r', 'w']):
6222 raise errors.OpPrereqError("Invalid contents of the"
6223 " 'disks' parameter")
6224 if self.op.hypervisor is None:
6225 self.op.hypervisor = self.cfg.GetHypervisorType()
6226 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6227 if not hasattr(self.op, "name"):
6228 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6229 fname = self.cfg.ExpandInstanceName(self.op.name)
6231 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6233 self.op.name = fname
6234 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6236 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6239 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6240 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6241 raise errors.OpPrereqError("Missing allocator name")
6242 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6243 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6246 def Exec(self, feedback_fn):
6247 """Run the allocator test.
6250 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6251 ial = IAllocator(self,
6254 mem_size=self.op.mem_size,
6255 disks=self.op.disks,
6256 disk_template=self.op.disk_template,
6260 vcpus=self.op.vcpus,
6261 hypervisor=self.op.hypervisor,
6264 ial = IAllocator(self,
6267 relocate_from=list(self.relocate_from),
6270 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6271 result = ial.in_text
6273 ial.Run(self.op.allocator, validate=False)
6274 result = ial.out_text