4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59 Note that all commands require root permissions.
67 def __init__(self, processor, op, context, rpc):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
76 self.cfg = context.cfg
77 self.context = context
79 # Dicts used to declare locking needs to mcpu
80 self.needed_locks = None
81 self.acquired_locks = {}
82 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84 self.remove_locks = {}
85 # Used to force good behavior when calling helper functions
86 self.recalculate_locks = {}
89 self.LogWarning = processor.LogWarning
90 self.LogInfo = processor.LogInfo
92 for attr_name in self._OP_REQP:
93 attr_val = getattr(op, attr_name, None)
95 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 """Returns the SshRunner object
104 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107 ssh = property(fget=__GetSSH)
109 def CheckArguments(self):
110 """Check syntactic validity for the opcode arguments.
112 This method is for doing a simple syntactic check and ensure
113 validity of opcode parameters, without any cluster-related
114 checks. While the same can be accomplished in ExpandNames and/or
115 CheckPrereq, doing these separate is better because:
117 - ExpandNames is left as as purely a lock-related function
118 - CheckPrereq is run after we have aquired locks (and possible
121 The function is allowed to change the self.op attribute so that
122 later methods can no longer worry about missing parameters.
127 def ExpandNames(self):
128 """Expand names for this LU.
130 This method is called before starting to execute the opcode, and it should
131 update all the parameters of the opcode to their canonical form (e.g. a
132 short node name must be fully expanded after this method has successfully
133 completed). This way locking, hooks, logging, ecc. can work correctly.
135 LUs which implement this method must also populate the self.needed_locks
136 member, as a dict with lock levels as keys, and a list of needed lock names
139 - use an empty dict if you don't need any lock
140 - if you don't need any lock at a particular level omit that level
141 - don't put anything for the BGL level
142 - if you want all locks at a level use locking.ALL_SET as a value
144 If you need to share locks (rather than acquire them exclusively) at one
145 level you can modify self.share_locks, setting a true value (usually 1) for
146 that level. By default locks are not shared.
150 # Acquire all nodes and one instance
151 self.needed_locks = {
152 locking.LEVEL_NODE: locking.ALL_SET,
153 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155 # Acquire just two nodes
156 self.needed_locks = {
157 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160 self.needed_locks = {} # No, you can't leave it to the default value None
163 # The implementation of this method is mandatory only if the new LU is
164 # concurrent, so that old LUs don't need to be changed all at the same
167 self.needed_locks = {} # Exclusive LUs don't need locks.
169 raise NotImplementedError
171 def DeclareLocks(self, level):
172 """Declare LU locking needs for a level
174 While most LUs can just declare their locking needs at ExpandNames time,
175 sometimes there's the need to calculate some locks after having acquired
176 the ones before. This function is called just before acquiring locks at a
177 particular level, but after acquiring the ones at lower levels, and permits
178 such calculations. It can be used to modify self.needed_locks, and by
179 default it does nothing.
181 This function is only called if you have something already set in
182 self.needed_locks for the level.
184 @param level: Locking level which is going to be locked
185 @type level: member of ganeti.locking.LEVELS
189 def CheckPrereq(self):
190 """Check prerequisites for this LU.
192 This method should check that the prerequisites for the execution
193 of this LU are fulfilled. It can do internode communication, but
194 it should be idempotent - no cluster or system changes are
197 The method should raise errors.OpPrereqError in case something is
198 not fulfilled. Its return value is ignored.
200 This method should also update all the parameters of the opcode to
201 their canonical form if it hasn't been done by ExpandNames before.
204 raise NotImplementedError
206 def Exec(self, feedback_fn):
209 This method should implement the actual work. It should raise
210 errors.OpExecError for failures that are somewhat dealt with in
214 raise NotImplementedError
216 def BuildHooksEnv(self):
217 """Build hooks environment for this LU.
219 This method should return a three-node tuple consisting of: a dict
220 containing the environment that will be used for running the
221 specific hook for this LU, a list of node names on which the hook
222 should run before the execution, and a list of node names on which
223 the hook should run after the execution.
225 The keys of the dict must not have 'GANETI_' prefixed as this will
226 be handled in the hooks runner. Also note additional keys will be
227 added by the hooks runner. If the LU doesn't define any
228 environment, an empty dict (and not None) should be returned.
230 No nodes should be returned as an empty list (and not None).
232 Note that if the HPATH for a LU class is None, this function will
236 raise NotImplementedError
238 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239 """Notify the LU about the results of its hooks.
241 This method is called every time a hooks phase is executed, and notifies
242 the Logical Unit about the hooks' result. The LU can then use it to alter
243 its result based on the hooks. By default the method does nothing and the
244 previous result is passed back unchanged but any LU can define it if it
245 wants to use the local cluster hook-scripts somehow.
247 @param phase: one of L{constants.HOOKS_PHASE_POST} or
248 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249 @param hook_results: the results of the multi-node hooks rpc call
250 @param feedback_fn: function used send feedback back to the caller
251 @param lu_result: the previous Exec result this LU had, or None
253 @return: the new Exec result, based on the previous result
259 def _ExpandAndLockInstance(self):
260 """Helper function to expand and lock an instance.
262 Many LUs that work on an instance take its name in self.op.instance_name
263 and need to expand it and then declare the expanded name for locking. This
264 function does it, and then updates self.op.instance_name to the expanded
265 name. It also initializes needed_locks as a dict, if this hasn't been done
269 if self.needed_locks is None:
270 self.needed_locks = {}
272 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273 "_ExpandAndLockInstance called with instance-level locks set"
274 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275 if expanded_name is None:
276 raise errors.OpPrereqError("Instance '%s' not known" %
277 self.op.instance_name)
278 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279 self.op.instance_name = expanded_name
281 def _LockInstancesNodes(self, primary_only=False):
282 """Helper function to declare instances' nodes for locking.
284 This function should be called after locking one or more instances to lock
285 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286 with all primary or secondary nodes for instances already locked and
287 present in self.needed_locks[locking.LEVEL_INSTANCE].
289 It should be called from DeclareLocks, and for safety only works if
290 self.recalculate_locks[locking.LEVEL_NODE] is set.
292 In the future it may grow parameters to just lock some instance's nodes, or
293 to just lock primaries or secondary nodes, if needed.
295 If should be called in DeclareLocks in a way similar to::
297 if level == locking.LEVEL_NODE:
298 self._LockInstancesNodes()
300 @type primary_only: boolean
301 @param primary_only: only lock primary nodes of locked instances
304 assert locking.LEVEL_NODE in self.recalculate_locks, \
305 "_LockInstancesNodes helper function called with no nodes to recalculate"
307 # TODO: check if we're really been called with the instance locks held
309 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310 # future we might want to have different behaviors depending on the value
311 # of self.recalculate_locks[locking.LEVEL_NODE]
313 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314 instance = self.context.cfg.GetInstanceInfo(instance_name)
315 wanted_nodes.append(instance.primary_node)
317 wanted_nodes.extend(instance.secondary_nodes)
319 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324 del self.recalculate_locks[locking.LEVEL_NODE]
327 class NoHooksLU(LogicalUnit):
328 """Simple LU which runs no hooks.
330 This LU is intended as a parent for other LogicalUnits which will
331 run no hooks, in order to reduce duplicate code.
338 def _GetWantedNodes(lu, nodes):
339 """Returns list of checked and expanded node names.
341 @type lu: L{LogicalUnit}
342 @param lu: the logical unit on whose behalf we execute
344 @param nodes: list of node names or None for all nodes
346 @return: the list of nodes, sorted
347 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350 if not isinstance(nodes, list):
351 raise errors.OpPrereqError("Invalid argument type 'nodes'")
354 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355 " non-empty list of nodes whose name is to be expanded.")
359 node = lu.cfg.ExpandNodeName(name)
361 raise errors.OpPrereqError("No such node name '%s'" % name)
364 return utils.NiceSort(wanted)
367 def _GetWantedInstances(lu, instances):
368 """Returns list of checked and expanded instance names.
370 @type lu: L{LogicalUnit}
371 @param lu: the logical unit on whose behalf we execute
372 @type instances: list
373 @param instances: list of instance names or None for all instances
375 @return: the list of instances, sorted
376 @raise errors.OpPrereqError: if the instances parameter is wrong type
377 @raise errors.OpPrereqError: if any of the passed instances is not found
380 if not isinstance(instances, list):
381 raise errors.OpPrereqError("Invalid argument type 'instances'")
386 for name in instances:
387 instance = lu.cfg.ExpandInstanceName(name)
389 raise errors.OpPrereqError("No such instance name '%s'" % name)
390 wanted.append(instance)
393 wanted = lu.cfg.GetInstanceList()
394 return utils.NiceSort(wanted)
397 def _CheckOutputFields(static, dynamic, selected):
398 """Checks whether all selected fields are valid.
400 @type static: L{utils.FieldSet}
401 @param static: static fields set
402 @type dynamic: L{utils.FieldSet}
403 @param dynamic: dynamic fields set
410 delta = f.NonMatching(selected)
412 raise errors.OpPrereqError("Unknown output fields selected: %s"
416 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417 memory, vcpus, nics):
418 """Builds instance related env variables for hooks
420 This builds the hook environment from individual variables.
423 @param name: the name of the instance
424 @type primary_node: string
425 @param primary_node: the name of the instance's primary node
426 @type secondary_nodes: list
427 @param secondary_nodes: list of secondary nodes as strings
428 @type os_type: string
429 @param os_type: the name of the instance's OS
431 @param status: the desired status of the instances
433 @param memory: the memory size of the instance
435 @param vcpus: the count of VCPUs the instance has
437 @param nics: list of tuples (ip, bridge, mac) representing
438 the NICs the instance has
440 @return: the hook environment for this instance
445 "INSTANCE_NAME": name,
446 "INSTANCE_PRIMARY": primary_node,
447 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448 "INSTANCE_OS_TYPE": os_type,
449 "INSTANCE_STATUS": status,
450 "INSTANCE_MEMORY": memory,
451 "INSTANCE_VCPUS": vcpus,
455 nic_count = len(nics)
456 for idx, (ip, bridge, mac) in enumerate(nics):
459 env["INSTANCE_NIC%d_IP" % idx] = ip
460 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
465 env["INSTANCE_NIC_COUNT"] = nic_count
470 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471 """Builds instance related env variables for hooks from an object.
473 @type lu: L{LogicalUnit}
474 @param lu: the logical unit on whose behalf we execute
475 @type instance: L{objects.Instance}
476 @param instance: the instance for which we should build the
479 @param override: dictionary with key/values that will override
482 @return: the hook environment dictionary
485 bep = lu.cfg.GetClusterInfo().FillBE(instance)
487 'name': instance.name,
488 'primary_node': instance.primary_node,
489 'secondary_nodes': instance.secondary_nodes,
490 'os_type': instance.os,
491 'status': instance.os,
492 'memory': bep[constants.BE_MEMORY],
493 'vcpus': bep[constants.BE_VCPUS],
494 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
497 args.update(override)
498 return _BuildInstanceHookEnv(**args)
501 def _CheckInstanceBridgesExist(lu, instance):
502 """Check that the brigdes needed by an instance exist.
505 # check bridges existance
506 brlist = [nic.bridge for nic in instance.nics]
507 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508 raise errors.OpPrereqError("one or more target bridges %s does not"
509 " exist on destination node '%s'" %
510 (brlist, instance.primary_node))
513 class LUDestroyCluster(NoHooksLU):
514 """Logical unit for destroying the cluster.
519 def CheckPrereq(self):
520 """Check prerequisites.
522 This checks whether the cluster is empty.
524 Any errors are signalled by raising errors.OpPrereqError.
527 master = self.cfg.GetMasterNode()
529 nodelist = self.cfg.GetNodeList()
530 if len(nodelist) != 1 or nodelist[0] != master:
531 raise errors.OpPrereqError("There are still %d node(s) in"
532 " this cluster." % (len(nodelist) - 1))
533 instancelist = self.cfg.GetInstanceList()
535 raise errors.OpPrereqError("There are still %d instance(s) in"
536 " this cluster." % len(instancelist))
538 def Exec(self, feedback_fn):
539 """Destroys the cluster.
542 master = self.cfg.GetMasterNode()
543 if not self.rpc.call_node_stop_master(master, False):
544 raise errors.OpExecError("Could not disable the master role")
545 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546 utils.CreateBackup(priv_key)
547 utils.CreateBackup(pub_key)
551 class LUVerifyCluster(LogicalUnit):
552 """Verifies the cluster status.
555 HPATH = "cluster-verify"
556 HTYPE = constants.HTYPE_CLUSTER
557 _OP_REQP = ["skip_checks"]
560 def ExpandNames(self):
561 self.needed_locks = {
562 locking.LEVEL_NODE: locking.ALL_SET,
563 locking.LEVEL_INSTANCE: locking.ALL_SET,
565 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
567 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568 remote_version, feedback_fn):
569 """Run multiple tests against a node.
573 - compares ganeti version
574 - checks vg existance and size > 20G
575 - checks config file checksum
576 - checks ssh to other nodes
579 @param node: the name of the node to check
580 @param file_list: required list of files
581 @param local_cksum: dictionary of local files and their checksums
583 @param vglist: dictionary of volume group names and their size
584 @param node_result: the results from the node
585 @param remote_version: the RPC version from the remote node
586 @param feedback_fn: function used to accumulate results
589 # compares ganeti version
590 local_version = constants.PROTOCOL_VERSION
591 if not remote_version:
592 feedback_fn(" - ERROR: connection to %s failed" % (node))
595 if local_version != remote_version:
596 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
597 (local_version, node, remote_version))
600 # checks vg existance and size > 20G
604 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
608 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609 constants.MIN_VG_SIZE)
611 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
615 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
618 # checks config file checksum
621 if 'filelist' not in node_result:
623 feedback_fn(" - ERROR: node hasn't returned file checksum data")
625 remote_cksum = node_result['filelist']
626 for file_name in file_list:
627 if file_name not in remote_cksum:
629 feedback_fn(" - ERROR: file '%s' missing" % file_name)
630 elif remote_cksum[file_name] != local_cksum[file_name]:
632 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
634 if 'nodelist' not in node_result:
636 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
638 if node_result['nodelist']:
640 for node in node_result['nodelist']:
641 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
642 (node, node_result['nodelist'][node]))
643 if 'node-net-test' not in node_result:
645 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
647 if node_result['node-net-test']:
649 nlist = utils.NiceSort(node_result['node-net-test'].keys())
651 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
652 (node, node_result['node-net-test'][node]))
654 hyp_result = node_result.get('hypervisor', None)
655 if isinstance(hyp_result, dict):
656 for hv_name, hv_result in hyp_result.iteritems():
657 if hv_result is not None:
658 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
659 (hv_name, hv_result))
662 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663 node_instance, feedback_fn):
664 """Verify an instance.
666 This function checks to see if the required block devices are
667 available on the instance's node.
672 node_current = instanceconfig.primary_node
675 instanceconfig.MapLVsByNode(node_vol_should)
677 for node in node_vol_should:
678 for volume in node_vol_should[node]:
679 if node not in node_vol_is or volume not in node_vol_is[node]:
680 feedback_fn(" - ERROR: volume %s missing on node %s" %
684 if not instanceconfig.status == 'down':
685 if (node_current not in node_instance or
686 not instance in node_instance[node_current]):
687 feedback_fn(" - ERROR: instance %s not running on node %s" %
688 (instance, node_current))
691 for node in node_instance:
692 if (not node == node_current):
693 if instance in node_instance[node]:
694 feedback_fn(" - ERROR: instance %s should not run on node %s" %
700 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701 """Verify if there are any unknown volumes in the cluster.
703 The .os, .swap and backup volumes are ignored. All other volumes are
709 for node in node_vol_is:
710 for volume in node_vol_is[node]:
711 if node not in node_vol_should or volume not in node_vol_should[node]:
712 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
717 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718 """Verify the list of running instances.
720 This checks what instances are running but unknown to the cluster.
724 for node in node_instance:
725 for runninginstance in node_instance[node]:
726 if runninginstance not in instancelist:
727 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
728 (runninginstance, node))
732 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733 """Verify N+1 Memory Resilience.
735 Check that if one single node dies we can still start all the instances it
741 for node, nodeinfo in node_info.iteritems():
742 # This code checks that every node which is now listed as secondary has
743 # enough memory to host all instances it is supposed to should a single
744 # other node in the cluster fail.
745 # FIXME: not ready for failover to an arbitrary node
746 # FIXME: does not support file-backed instances
747 # WARNING: we currently take into account down instances as well as up
748 # ones, considering that even if they're down someone might want to start
749 # them even in the event of a node failure.
750 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
752 for instance in instances:
753 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754 if bep[constants.BE_AUTO_BALANCE]:
755 needed_mem += bep[constants.BE_MEMORY]
756 if nodeinfo['mfree'] < needed_mem:
757 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
758 " failovers should node %s fail" % (node, prinode))
762 def CheckPrereq(self):
763 """Check prerequisites.
765 Transform the list of checks we're going to skip into a set and check that
766 all its members are valid.
769 self.skip_set = frozenset(self.op.skip_checks)
770 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771 raise errors.OpPrereqError("Invalid checks to be skipped specified")
773 def BuildHooksEnv(self):
776 Cluster-Verify hooks just rone in the post phase and their failure makes
777 the output be logged in the verify output and the verification to fail.
780 all_nodes = self.cfg.GetNodeList()
781 # TODO: populate the environment with useful information for verify hooks
783 return env, [], all_nodes
785 def Exec(self, feedback_fn):
786 """Verify integrity of cluster, performing various test on nodes.
790 feedback_fn("* Verifying global settings")
791 for msg in self.cfg.VerifyConfig():
792 feedback_fn(" - ERROR: %s" % msg)
794 vg_name = self.cfg.GetVGName()
795 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796 nodelist = utils.NiceSort(self.cfg.GetNodeList())
797 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799 i_non_redundant = [] # Non redundant instances
800 i_non_a_balanced = [] # Non auto-balanced instances
806 # FIXME: verify OS list
809 file_names.append(constants.SSL_CERT_FILE)
810 file_names.append(constants.CLUSTER_CONF_FILE)
811 local_checksums = utils.FingerprintFiles(file_names)
813 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816 all_vglist = self.rpc.call_vg_list(nodelist)
817 node_verify_param = {
818 'filelist': file_names,
819 'nodelist': nodelist,
820 'hypervisor': hypervisors,
821 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822 for node in nodeinfo]
824 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825 self.cfg.GetClusterName())
826 all_rversion = self.rpc.call_version(nodelist)
827 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828 self.cfg.GetHypervisorType())
830 cluster = self.cfg.GetClusterInfo()
831 for node in nodelist:
832 feedback_fn("* Verifying node %s" % node)
833 result = self._VerifyNode(node, file_names, local_checksums,
834 all_vglist[node], all_nvinfo[node],
835 all_rversion[node], feedback_fn)
839 volumeinfo = all_volumeinfo[node]
841 if isinstance(volumeinfo, basestring):
842 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
843 (node, volumeinfo[-400:].encode('string_escape')))
845 node_volume[node] = {}
846 elif not isinstance(volumeinfo, dict):
847 feedback_fn(" - ERROR: connection to %s failed" % (node,))
851 node_volume[node] = volumeinfo
854 nodeinstance = all_instanceinfo[node]
855 if type(nodeinstance) != list:
856 feedback_fn(" - ERROR: connection to %s failed" % (node,))
860 node_instance[node] = nodeinstance
863 nodeinfo = all_ninfo[node]
864 if not isinstance(nodeinfo, dict):
865 feedback_fn(" - ERROR: connection to %s failed" % (node,))
871 "mfree": int(nodeinfo['memory_free']),
872 "dfree": int(nodeinfo['vg_free']),
875 # dictionary holding all instances this node is secondary for,
876 # grouped by their primary node. Each key is a cluster node, and each
877 # value is a list of instances which have the key as primary and the
878 # current node as secondary. this is handy to calculate N+1 memory
879 # availability if you can only failover from a primary to its
881 "sinst-by-pnode": {},
884 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
890 for instance in instancelist:
891 feedback_fn("* Verifying instance %s" % instance)
892 inst_config = self.cfg.GetInstanceInfo(instance)
893 result = self._VerifyInstance(instance, inst_config, node_volume,
894 node_instance, feedback_fn)
897 inst_config.MapLVsByNode(node_vol_should)
899 instance_cfg[instance] = inst_config
901 pnode = inst_config.primary_node
902 if pnode in node_info:
903 node_info[pnode]['pinst'].append(instance)
905 feedback_fn(" - ERROR: instance %s, connection to primary node"
906 " %s failed" % (instance, pnode))
909 # If the instance is non-redundant we cannot survive losing its primary
910 # node, so we are not N+1 compliant. On the other hand we have no disk
911 # templates with more than one secondary so that situation is not well
913 # FIXME: does not support file-backed instances
914 if len(inst_config.secondary_nodes) == 0:
915 i_non_redundant.append(instance)
916 elif len(inst_config.secondary_nodes) > 1:
917 feedback_fn(" - WARNING: multiple secondaries for instance %s"
920 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921 i_non_a_balanced.append(instance)
923 for snode in inst_config.secondary_nodes:
924 if snode in node_info:
925 node_info[snode]['sinst'].append(instance)
926 if pnode not in node_info[snode]['sinst-by-pnode']:
927 node_info[snode]['sinst-by-pnode'][pnode] = []
928 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
930 feedback_fn(" - ERROR: instance %s, connection to secondary node"
931 " %s failed" % (instance, snode))
933 feedback_fn("* Verifying orphan volumes")
934 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
938 feedback_fn("* Verifying remaining instances")
939 result = self._VerifyOrphanInstances(instancelist, node_instance,
943 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944 feedback_fn("* Verifying N+1 Memory redundancy")
945 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
948 feedback_fn("* Other Notes")
950 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
951 % len(i_non_redundant))
954 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
955 % len(i_non_a_balanced))
959 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960 """Analize the post-hooks' result
962 This method analyses the hook result, handles it, and sends some
963 nicely-formatted feedback back to the user.
965 @param phase: one of L{constants.HOOKS_PHASE_POST} or
966 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967 @param hooks_results: the results of the multi-node hooks rpc call
968 @param feedback_fn: function used send feedback back to the caller
969 @param lu_result: previous Exec result
970 @return: the new Exec result, based on the previous result
974 # We only really run POST phase hooks, and are only interested in
976 if phase == constants.HOOKS_PHASE_POST:
977 # Used to change hooks' output to proper indentation
978 indent_re = re.compile('^', re.M)
979 feedback_fn("* Hooks Results")
980 if not hooks_results:
981 feedback_fn(" - ERROR: general communication failure")
984 for node_name in hooks_results:
985 show_node_header = True
986 res = hooks_results[node_name]
987 if res is False or not isinstance(res, list):
988 feedback_fn(" Communication failure")
991 for script, hkr, output in res:
992 if hkr == constants.HKR_FAIL:
993 # The node header is only shown once, if there are
994 # failing hooks on that node
996 feedback_fn(" Node %s:" % node_name)
997 show_node_header = False
998 feedback_fn(" ERROR: Script %s failed, output:" % script)
999 output = indent_re.sub(' ', output)
1000 feedback_fn("%s" % output)
1006 class LUVerifyDisks(NoHooksLU):
1007 """Verifies the cluster disks status.
1013 def ExpandNames(self):
1014 self.needed_locks = {
1015 locking.LEVEL_NODE: locking.ALL_SET,
1016 locking.LEVEL_INSTANCE: locking.ALL_SET,
1018 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1020 def CheckPrereq(self):
1021 """Check prerequisites.
1023 This has no prerequisites.
1028 def Exec(self, feedback_fn):
1029 """Verify integrity of cluster disks.
1032 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1034 vg_name = self.cfg.GetVGName()
1035 nodes = utils.NiceSort(self.cfg.GetNodeList())
1036 instances = [self.cfg.GetInstanceInfo(name)
1037 for name in self.cfg.GetInstanceList()]
1040 for inst in instances:
1042 if (inst.status != "up" or
1043 inst.disk_template not in constants.DTS_NET_MIRROR):
1045 inst.MapLVsByNode(inst_lvs)
1046 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047 for node, vol_list in inst_lvs.iteritems():
1048 for vol in vol_list:
1049 nv_dict[(node, vol)] = inst
1054 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1059 lvs = node_lvs[node]
1061 if isinstance(lvs, basestring):
1062 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063 res_nlvm[node] = lvs
1064 elif not isinstance(lvs, dict):
1065 logging.warning("Connection to node %s failed or invalid data"
1067 res_nodes.append(node)
1070 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071 inst = nv_dict.pop((node, lv_name), None)
1072 if (not lv_online and inst is not None
1073 and inst.name not in res_instances):
1074 res_instances.append(inst.name)
1076 # any leftover items in nv_dict are missing LVs, let's arrange the
1078 for key, inst in nv_dict.iteritems():
1079 if inst.name not in res_missing:
1080 res_missing[inst.name] = []
1081 res_missing[inst.name].append(key)
1086 class LURenameCluster(LogicalUnit):
1087 """Rename the cluster.
1090 HPATH = "cluster-rename"
1091 HTYPE = constants.HTYPE_CLUSTER
1094 def BuildHooksEnv(self):
1099 "OP_TARGET": self.cfg.GetClusterName(),
1100 "NEW_NAME": self.op.name,
1102 mn = self.cfg.GetMasterNode()
1103 return env, [mn], [mn]
1105 def CheckPrereq(self):
1106 """Verify that the passed name is a valid one.
1109 hostname = utils.HostInfo(self.op.name)
1111 new_name = hostname.name
1112 self.ip = new_ip = hostname.ip
1113 old_name = self.cfg.GetClusterName()
1114 old_ip = self.cfg.GetMasterIP()
1115 if new_name == old_name and new_ip == old_ip:
1116 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117 " cluster has changed")
1118 if new_ip != old_ip:
1119 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121 " reachable on the network. Aborting." %
1124 self.op.name = new_name
1126 def Exec(self, feedback_fn):
1127 """Rename the cluster.
1130 clustername = self.op.name
1133 # shutdown the master IP
1134 master = self.cfg.GetMasterNode()
1135 if not self.rpc.call_node_stop_master(master, False):
1136 raise errors.OpExecError("Could not disable the master role")
1141 ss.SetKey(ss.SS_MASTER_IP, ip)
1142 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1144 # Distribute updated ss config to all nodes
1145 myself = self.cfg.GetNodeInfo(master)
1146 dist_nodes = self.cfg.GetNodeList()
1147 if myself.name in dist_nodes:
1148 dist_nodes.remove(myself.name)
1150 logging.debug("Copying updated ssconf data to all nodes")
1151 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152 fname = ss.KeyToFilename(keyname)
1153 result = self.rpc.call_upload_file(dist_nodes, fname)
1154 for to_node in dist_nodes:
1155 if not result[to_node]:
1156 self.LogWarning("Copy of file %s to node %s failed",
1159 if not self.rpc.call_node_start_master(master, False):
1160 self.LogWarning("Could not re-enable the master role on"
1161 " the master, please restart manually.")
1164 def _RecursiveCheckIfLVMBased(disk):
1165 """Check if the given disk or its children are lvm-based.
1167 @type disk: L{objects.Disk}
1168 @param disk: the disk to check
1170 @return: boolean indicating whether a LD_LV dev_type was found or not
1174 for chdisk in disk.children:
1175 if _RecursiveCheckIfLVMBased(chdisk):
1177 return disk.dev_type == constants.LD_LV
1180 class LUSetClusterParams(LogicalUnit):
1181 """Change the parameters of the cluster.
1184 HPATH = "cluster-modify"
1185 HTYPE = constants.HTYPE_CLUSTER
1189 def ExpandNames(self):
1190 # FIXME: in the future maybe other cluster params won't require checking on
1191 # all nodes to be modified.
1192 self.needed_locks = {
1193 locking.LEVEL_NODE: locking.ALL_SET,
1195 self.share_locks[locking.LEVEL_NODE] = 1
1197 def BuildHooksEnv(self):
1202 "OP_TARGET": self.cfg.GetClusterName(),
1203 "NEW_VG_NAME": self.op.vg_name,
1205 mn = self.cfg.GetMasterNode()
1206 return env, [mn], [mn]
1208 def CheckPrereq(self):
1209 """Check prerequisites.
1211 This checks whether the given params don't conflict and
1212 if the given volume group is valid.
1215 # FIXME: This only works because there is only one parameter that can be
1216 # changed or removed.
1217 if self.op.vg_name is not None and not self.op.vg_name:
1218 instances = self.cfg.GetAllInstancesInfo().values()
1219 for inst in instances:
1220 for disk in inst.disks:
1221 if _RecursiveCheckIfLVMBased(disk):
1222 raise errors.OpPrereqError("Cannot disable lvm storage while"
1223 " lvm-based instances exist")
1225 node_list = self.acquired_locks[locking.LEVEL_NODE]
1227 # if vg_name not None, checks given volume group on all nodes
1229 vglist = self.rpc.call_vg_list(node_list)
1230 for node in node_list:
1231 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232 constants.MIN_VG_SIZE)
1234 raise errors.OpPrereqError("Error on node '%s': %s" %
1237 self.cluster = cluster = self.cfg.GetClusterInfo()
1238 # beparams changes do not need validation (we can't validate?),
1239 # but we still process here
1240 if self.op.beparams:
1241 self.new_beparams = cluster.FillDict(
1242 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1244 # hypervisor list/parameters
1245 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246 if self.op.hvparams:
1247 if not isinstance(self.op.hvparams, dict):
1248 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249 for hv_name, hv_dict in self.op.hvparams.items():
1250 if hv_name not in self.new_hvparams:
1251 self.new_hvparams[hv_name] = hv_dict
1253 self.new_hvparams[hv_name].update(hv_dict)
1255 if self.op.enabled_hypervisors is not None:
1256 self.hv_list = self.op.enabled_hypervisors
1258 self.hv_list = cluster.enabled_hypervisors
1260 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261 # either the enabled list has changed, or the parameters have, validate
1262 for hv_name, hv_params in self.new_hvparams.items():
1263 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264 (self.op.enabled_hypervisors and
1265 hv_name in self.op.enabled_hypervisors)):
1266 # either this is a new hypervisor, or its parameters have changed
1267 hv_class = hypervisor.GetHypervisor(hv_name)
1268 hv_class.CheckParameterSyntax(hv_params)
1269 _CheckHVParams(self, node_list, hv_name, hv_params)
1271 def Exec(self, feedback_fn):
1272 """Change the parameters of the cluster.
1275 if self.op.vg_name is not None:
1276 if self.op.vg_name != self.cfg.GetVGName():
1277 self.cfg.SetVGName(self.op.vg_name)
1279 feedback_fn("Cluster LVM configuration already in desired"
1280 " state, not changing")
1281 if self.op.hvparams:
1282 self.cluster.hvparams = self.new_hvparams
1283 if self.op.enabled_hypervisors is not None:
1284 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285 if self.op.beparams:
1286 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287 self.cfg.Update(self.cluster)
1290 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291 """Sleep and poll for an instance's disk to sync.
1294 if not instance.disks:
1298 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1300 node = instance.primary_node
1302 for dev in instance.disks:
1303 lu.cfg.SetDiskID(dev, node)
1309 cumul_degraded = False
1310 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1312 lu.LogWarning("Can't get any data from node %s", node)
1315 raise errors.RemoteError("Can't contact node %s for mirror data,"
1316 " aborting." % node)
1320 for i in range(len(rstats)):
1323 lu.LogWarning("Can't compute data for node %s/%s",
1324 node, instance.disks[i].iv_name)
1326 # we ignore the ldisk parameter
1327 perc_done, est_time, is_degraded, _ = mstat
1328 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329 if perc_done is not None:
1331 if est_time is not None:
1332 rem_time = "%d estimated seconds remaining" % est_time
1335 rem_time = "no time estimate"
1336 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337 (instance.disks[i].iv_name, perc_done, rem_time))
1341 time.sleep(min(60, max_time))
1344 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345 return not cumul_degraded
1348 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349 """Check that mirrors are not degraded.
1351 The ldisk parameter, if True, will change the test from the
1352 is_degraded attribute (which represents overall non-ok status for
1353 the device(s)) to the ldisk (representing the local storage status).
1356 lu.cfg.SetDiskID(dev, node)
1363 if on_primary or dev.AssembleOnSecondary():
1364 rstats = lu.rpc.call_blockdev_find(node, dev)
1366 logging.warning("Node %s: disk degraded, not found or node down", node)
1369 result = result and (not rstats[idx])
1371 for child in dev.children:
1372 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1377 class LUDiagnoseOS(NoHooksLU):
1378 """Logical unit for OS diagnose/query.
1381 _OP_REQP = ["output_fields", "names"]
1383 _FIELDS_STATIC = utils.FieldSet()
1384 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1386 def ExpandNames(self):
1388 raise errors.OpPrereqError("Selective OS query not supported")
1390 _CheckOutputFields(static=self._FIELDS_STATIC,
1391 dynamic=self._FIELDS_DYNAMIC,
1392 selected=self.op.output_fields)
1394 # Lock all nodes, in shared mode
1395 self.needed_locks = {}
1396 self.share_locks[locking.LEVEL_NODE] = 1
1397 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1399 def CheckPrereq(self):
1400 """Check prerequisites.
1405 def _DiagnoseByOS(node_list, rlist):
1406 """Remaps a per-node return list into an a per-os per-node dictionary
1408 @param node_list: a list with the names of all nodes
1409 @param rlist: a map with node names as keys and OS objects as values
1412 @returns: a dictionary with osnames as keys and as value another map, with
1413 nodes as keys and list of OS objects as values, eg::
1415 {"debian-etch": {"node1": [<object>,...],
1416 "node2": [<object>,]}
1421 for node_name, nr in rlist.iteritems():
1425 if os_obj.name not in all_os:
1426 # build a list of nodes for this os containing empty lists
1427 # for each node in node_list
1428 all_os[os_obj.name] = {}
1429 for nname in node_list:
1430 all_os[os_obj.name][nname] = []
1431 all_os[os_obj.name][node_name].append(os_obj)
1434 def Exec(self, feedback_fn):
1435 """Compute the list of OSes.
1438 node_list = self.acquired_locks[locking.LEVEL_NODE]
1439 node_data = self.rpc.call_os_diagnose(node_list)
1440 if node_data == False:
1441 raise errors.OpExecError("Can't gather the list of OSes")
1442 pol = self._DiagnoseByOS(node_list, node_data)
1444 for os_name, os_data in pol.iteritems():
1446 for field in self.op.output_fields:
1449 elif field == "valid":
1450 val = utils.all([osl and osl[0] for osl in os_data.values()])
1451 elif field == "node_status":
1453 for node_name, nos_list in os_data.iteritems():
1454 val[node_name] = [(v.status, v.path) for v in nos_list]
1456 raise errors.ParameterError(field)
1463 class LURemoveNode(LogicalUnit):
1464 """Logical unit for removing a node.
1467 HPATH = "node-remove"
1468 HTYPE = constants.HTYPE_NODE
1469 _OP_REQP = ["node_name"]
1471 def BuildHooksEnv(self):
1474 This doesn't run on the target node in the pre phase as a failed
1475 node would then be impossible to remove.
1479 "OP_TARGET": self.op.node_name,
1480 "NODE_NAME": self.op.node_name,
1482 all_nodes = self.cfg.GetNodeList()
1483 all_nodes.remove(self.op.node_name)
1484 return env, all_nodes, all_nodes
1486 def CheckPrereq(self):
1487 """Check prerequisites.
1490 - the node exists in the configuration
1491 - it does not have primary or secondary instances
1492 - it's not the master
1494 Any errors are signalled by raising errors.OpPrereqError.
1497 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1499 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1501 instance_list = self.cfg.GetInstanceList()
1503 masternode = self.cfg.GetMasterNode()
1504 if node.name == masternode:
1505 raise errors.OpPrereqError("Node is the master node,"
1506 " you need to failover first.")
1508 for instance_name in instance_list:
1509 instance = self.cfg.GetInstanceInfo(instance_name)
1510 if node.name == instance.primary_node:
1511 raise errors.OpPrereqError("Instance %s still running on the node,"
1512 " please remove first." % instance_name)
1513 if node.name in instance.secondary_nodes:
1514 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515 " please remove first." % instance_name)
1516 self.op.node_name = node.name
1519 def Exec(self, feedback_fn):
1520 """Removes the node from the cluster.
1524 logging.info("Stopping the node daemon and removing configs from node %s",
1527 self.context.RemoveNode(node.name)
1529 self.rpc.call_node_leave_cluster(node.name)
1532 class LUQueryNodes(NoHooksLU):
1533 """Logical unit for querying nodes.
1536 _OP_REQP = ["output_fields", "names"]
1538 _FIELDS_DYNAMIC = utils.FieldSet(
1540 "mtotal", "mnode", "mfree",
1545 _FIELDS_STATIC = utils.FieldSet(
1546 "name", "pinst_cnt", "sinst_cnt",
1547 "pinst_list", "sinst_list",
1548 "pip", "sip", "tags",
1552 def ExpandNames(self):
1553 _CheckOutputFields(static=self._FIELDS_STATIC,
1554 dynamic=self._FIELDS_DYNAMIC,
1555 selected=self.op.output_fields)
1557 self.needed_locks = {}
1558 self.share_locks[locking.LEVEL_NODE] = 1
1561 self.wanted = _GetWantedNodes(self, self.op.names)
1563 self.wanted = locking.ALL_SET
1565 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1567 # if we don't request only static fields, we need to lock the nodes
1568 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1571 def CheckPrereq(self):
1572 """Check prerequisites.
1575 # The validation of the node list is done in the _GetWantedNodes,
1576 # if non empty, and if empty, there's no validation to do
1579 def Exec(self, feedback_fn):
1580 """Computes the list of nodes and their attributes.
1583 all_info = self.cfg.GetAllNodesInfo()
1585 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1586 elif self.wanted != locking.ALL_SET:
1587 nodenames = self.wanted
1588 missing = set(nodenames).difference(all_info.keys())
1590 raise errors.OpExecError(
1591 "Some nodes were removed before retrieving their data: %s" % missing)
1593 nodenames = all_info.keys()
1595 nodenames = utils.NiceSort(nodenames)
1596 nodelist = [all_info[name] for name in nodenames]
1598 # begin data gathering
1602 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1603 self.cfg.GetHypervisorType())
1604 for name in nodenames:
1605 nodeinfo = node_data.get(name, None)
1608 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1609 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1610 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1611 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1612 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1613 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1614 "bootid": nodeinfo['bootid'],
1617 live_data[name] = {}
1619 live_data = dict.fromkeys(nodenames, {})
1621 node_to_primary = dict([(name, set()) for name in nodenames])
1622 node_to_secondary = dict([(name, set()) for name in nodenames])
1624 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1625 "sinst_cnt", "sinst_list"))
1626 if inst_fields & frozenset(self.op.output_fields):
1627 instancelist = self.cfg.GetInstanceList()
1629 for instance_name in instancelist:
1630 inst = self.cfg.GetInstanceInfo(instance_name)
1631 if inst.primary_node in node_to_primary:
1632 node_to_primary[inst.primary_node].add(inst.name)
1633 for secnode in inst.secondary_nodes:
1634 if secnode in node_to_secondary:
1635 node_to_secondary[secnode].add(inst.name)
1637 # end data gathering
1640 for node in nodelist:
1642 for field in self.op.output_fields:
1645 elif field == "pinst_list":
1646 val = list(node_to_primary[node.name])
1647 elif field == "sinst_list":
1648 val = list(node_to_secondary[node.name])
1649 elif field == "pinst_cnt":
1650 val = len(node_to_primary[node.name])
1651 elif field == "sinst_cnt":
1652 val = len(node_to_secondary[node.name])
1653 elif field == "pip":
1654 val = node.primary_ip
1655 elif field == "sip":
1656 val = node.secondary_ip
1657 elif field == "tags":
1658 val = list(node.GetTags())
1659 elif field == "serial_no":
1660 val = node.serial_no
1661 elif self._FIELDS_DYNAMIC.Matches(field):
1662 val = live_data[node.name].get(field, None)
1664 raise errors.ParameterError(field)
1665 node_output.append(val)
1666 output.append(node_output)
1671 class LUQueryNodeVolumes(NoHooksLU):
1672 """Logical unit for getting volumes on node(s).
1675 _OP_REQP = ["nodes", "output_fields"]
1677 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1678 _FIELDS_STATIC = utils.FieldSet("node")
1680 def ExpandNames(self):
1681 _CheckOutputFields(static=self._FIELDS_STATIC,
1682 dynamic=self._FIELDS_DYNAMIC,
1683 selected=self.op.output_fields)
1685 self.needed_locks = {}
1686 self.share_locks[locking.LEVEL_NODE] = 1
1687 if not self.op.nodes:
1688 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1690 self.needed_locks[locking.LEVEL_NODE] = \
1691 _GetWantedNodes(self, self.op.nodes)
1693 def CheckPrereq(self):
1694 """Check prerequisites.
1696 This checks that the fields required are valid output fields.
1699 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1701 def Exec(self, feedback_fn):
1702 """Computes the list of nodes and their attributes.
1705 nodenames = self.nodes
1706 volumes = self.rpc.call_node_volumes(nodenames)
1708 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1709 in self.cfg.GetInstanceList()]
1711 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1714 for node in nodenames:
1715 if node not in volumes or not volumes[node]:
1718 node_vols = volumes[node][:]
1719 node_vols.sort(key=lambda vol: vol['dev'])
1721 for vol in node_vols:
1723 for field in self.op.output_fields:
1726 elif field == "phys":
1730 elif field == "name":
1732 elif field == "size":
1733 val = int(float(vol['size']))
1734 elif field == "instance":
1736 if node not in lv_by_node[inst]:
1738 if vol['name'] in lv_by_node[inst][node]:
1744 raise errors.ParameterError(field)
1745 node_output.append(str(val))
1747 output.append(node_output)
1752 class LUAddNode(LogicalUnit):
1753 """Logical unit for adding node to the cluster.
1757 HTYPE = constants.HTYPE_NODE
1758 _OP_REQP = ["node_name"]
1760 def BuildHooksEnv(self):
1763 This will run on all nodes before, and on all nodes + the new node after.
1767 "OP_TARGET": self.op.node_name,
1768 "NODE_NAME": self.op.node_name,
1769 "NODE_PIP": self.op.primary_ip,
1770 "NODE_SIP": self.op.secondary_ip,
1772 nodes_0 = self.cfg.GetNodeList()
1773 nodes_1 = nodes_0 + [self.op.node_name, ]
1774 return env, nodes_0, nodes_1
1776 def CheckPrereq(self):
1777 """Check prerequisites.
1780 - the new node is not already in the config
1782 - its parameters (single/dual homed) matches the cluster
1784 Any errors are signalled by raising errors.OpPrereqError.
1787 node_name = self.op.node_name
1790 dns_data = utils.HostInfo(node_name)
1792 node = dns_data.name
1793 primary_ip = self.op.primary_ip = dns_data.ip
1794 secondary_ip = getattr(self.op, "secondary_ip", None)
1795 if secondary_ip is None:
1796 secondary_ip = primary_ip
1797 if not utils.IsValidIP(secondary_ip):
1798 raise errors.OpPrereqError("Invalid secondary IP given")
1799 self.op.secondary_ip = secondary_ip
1801 node_list = cfg.GetNodeList()
1802 if not self.op.readd and node in node_list:
1803 raise errors.OpPrereqError("Node %s is already in the configuration" %
1805 elif self.op.readd and node not in node_list:
1806 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1808 for existing_node_name in node_list:
1809 existing_node = cfg.GetNodeInfo(existing_node_name)
1811 if self.op.readd and node == existing_node_name:
1812 if (existing_node.primary_ip != primary_ip or
1813 existing_node.secondary_ip != secondary_ip):
1814 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1815 " address configuration as before")
1818 if (existing_node.primary_ip == primary_ip or
1819 existing_node.secondary_ip == primary_ip or
1820 existing_node.primary_ip == secondary_ip or
1821 existing_node.secondary_ip == secondary_ip):
1822 raise errors.OpPrereqError("New node ip address(es) conflict with"
1823 " existing node %s" % existing_node.name)
1825 # check that the type of the node (single versus dual homed) is the
1826 # same as for the master
1827 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1828 master_singlehomed = myself.secondary_ip == myself.primary_ip
1829 newbie_singlehomed = secondary_ip == primary_ip
1830 if master_singlehomed != newbie_singlehomed:
1831 if master_singlehomed:
1832 raise errors.OpPrereqError("The master has no private ip but the"
1833 " new node has one")
1835 raise errors.OpPrereqError("The master has a private ip but the"
1836 " new node doesn't have one")
1838 # checks reachablity
1839 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1840 raise errors.OpPrereqError("Node not reachable by ping")
1842 if not newbie_singlehomed:
1843 # check reachability from my secondary ip to newbie's secondary ip
1844 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1845 source=myself.secondary_ip):
1846 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1847 " based ping to noded port")
1849 self.new_node = objects.Node(name=node,
1850 primary_ip=primary_ip,
1851 secondary_ip=secondary_ip)
1853 def Exec(self, feedback_fn):
1854 """Adds the new node to the cluster.
1857 new_node = self.new_node
1858 node = new_node.name
1860 # check connectivity
1861 result = self.rpc.call_version([node])[node]
1863 if constants.PROTOCOL_VERSION == result:
1864 logging.info("Communication to node %s fine, sw version %s match",
1867 raise errors.OpExecError("Version mismatch master version %s,"
1868 " node version %s" %
1869 (constants.PROTOCOL_VERSION, result))
1871 raise errors.OpExecError("Cannot get version from the new node")
1874 logging.info("Copy ssh key to node %s", node)
1875 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1877 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1878 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1884 keyarray.append(f.read())
1888 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1890 keyarray[3], keyarray[4], keyarray[5])
1893 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1895 # Add node to our /etc/hosts, and add key to known_hosts
1896 utils.AddHostToEtcHosts(new_node.name)
1898 if new_node.secondary_ip != new_node.primary_ip:
1899 if not self.rpc.call_node_has_ip_address(new_node.name,
1900 new_node.secondary_ip):
1901 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1902 " you gave (%s). Please fix and re-run this"
1903 " command." % new_node.secondary_ip)
1905 node_verify_list = [self.cfg.GetMasterNode()]
1906 node_verify_param = {
1908 # TODO: do a node-net-test as well?
1911 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1912 self.cfg.GetClusterName())
1913 for verifier in node_verify_list:
1914 if not result[verifier]:
1915 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1916 " for remote verification" % verifier)
1917 if result[verifier]['nodelist']:
1918 for failed in result[verifier]['nodelist']:
1919 feedback_fn("ssh/hostname verification failed %s -> %s" %
1920 (verifier, result[verifier]['nodelist'][failed]))
1921 raise errors.OpExecError("ssh/hostname verification failed.")
1923 # Distribute updated /etc/hosts and known_hosts to all nodes,
1924 # including the node just added
1925 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1926 dist_nodes = self.cfg.GetNodeList()
1927 if not self.op.readd:
1928 dist_nodes.append(node)
1929 if myself.name in dist_nodes:
1930 dist_nodes.remove(myself.name)
1932 logging.debug("Copying hosts and known_hosts to all nodes")
1933 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1934 result = self.rpc.call_upload_file(dist_nodes, fname)
1935 for to_node in dist_nodes:
1936 if not result[to_node]:
1937 logging.error("Copy of file %s to node %s failed", fname, to_node)
1940 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1941 to_copy.append(constants.VNC_PASSWORD_FILE)
1942 for fname in to_copy:
1943 result = self.rpc.call_upload_file([node], fname)
1944 if not result[node]:
1945 logging.error("Could not copy file %s to node %s", fname, node)
1948 self.context.ReaddNode(new_node)
1950 self.context.AddNode(new_node)
1953 class LUQueryClusterInfo(NoHooksLU):
1954 """Query cluster configuration.
1960 def ExpandNames(self):
1961 self.needed_locks = {}
1963 def CheckPrereq(self):
1964 """No prerequsites needed for this LU.
1969 def Exec(self, feedback_fn):
1970 """Return cluster config.
1973 cluster = self.cfg.GetClusterInfo()
1975 "software_version": constants.RELEASE_VERSION,
1976 "protocol_version": constants.PROTOCOL_VERSION,
1977 "config_version": constants.CONFIG_VERSION,
1978 "os_api_version": constants.OS_API_VERSION,
1979 "export_version": constants.EXPORT_VERSION,
1980 "architecture": (platform.architecture()[0], platform.machine()),
1981 "name": cluster.cluster_name,
1982 "master": cluster.master_node,
1983 "default_hypervisor": cluster.default_hypervisor,
1984 "enabled_hypervisors": cluster.enabled_hypervisors,
1985 "hvparams": cluster.hvparams,
1986 "beparams": cluster.beparams,
1992 class LUQueryConfigValues(NoHooksLU):
1993 """Return configuration values.
1998 _FIELDS_DYNAMIC = utils.FieldSet()
1999 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2001 def ExpandNames(self):
2002 self.needed_locks = {}
2004 _CheckOutputFields(static=self._FIELDS_STATIC,
2005 dynamic=self._FIELDS_DYNAMIC,
2006 selected=self.op.output_fields)
2008 def CheckPrereq(self):
2009 """No prerequisites.
2014 def Exec(self, feedback_fn):
2015 """Dump a representation of the cluster config to the standard output.
2019 for field in self.op.output_fields:
2020 if field == "cluster_name":
2021 entry = self.cfg.GetClusterName()
2022 elif field == "master_node":
2023 entry = self.cfg.GetMasterNode()
2024 elif field == "drain_flag":
2025 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2027 raise errors.ParameterError(field)
2028 values.append(entry)
2032 class LUActivateInstanceDisks(NoHooksLU):
2033 """Bring up an instance's disks.
2036 _OP_REQP = ["instance_name"]
2039 def ExpandNames(self):
2040 self._ExpandAndLockInstance()
2041 self.needed_locks[locking.LEVEL_NODE] = []
2042 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2044 def DeclareLocks(self, level):
2045 if level == locking.LEVEL_NODE:
2046 self._LockInstancesNodes()
2048 def CheckPrereq(self):
2049 """Check prerequisites.
2051 This checks that the instance is in the cluster.
2054 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2055 assert self.instance is not None, \
2056 "Cannot retrieve locked instance %s" % self.op.instance_name
2058 def Exec(self, feedback_fn):
2059 """Activate the disks.
2062 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2064 raise errors.OpExecError("Cannot activate block devices")
2069 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2070 """Prepare the block devices for an instance.
2072 This sets up the block devices on all nodes.
2074 @type lu: L{LogicalUnit}
2075 @param lu: the logical unit on whose behalf we execute
2076 @type instance: L{objects.Instance}
2077 @param instance: the instance for whose disks we assemble
2078 @type ignore_secondaries: boolean
2079 @param ignore_secondaries: if true, errors on secondary nodes
2080 won't result in an error return from the function
2081 @return: False if the operation failed, otherwise a list of
2082 (host, instance_visible_name, node_visible_name)
2083 with the mapping from node devices to instance devices
2088 iname = instance.name
2089 # With the two passes mechanism we try to reduce the window of
2090 # opportunity for the race condition of switching DRBD to primary
2091 # before handshaking occured, but we do not eliminate it
2093 # The proper fix would be to wait (with some limits) until the
2094 # connection has been made and drbd transitions from WFConnection
2095 # into any other network-connected state (Connected, SyncTarget,
2098 # 1st pass, assemble on all nodes in secondary mode
2099 for inst_disk in instance.disks:
2100 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2101 lu.cfg.SetDiskID(node_disk, node)
2102 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2104 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2105 " (is_primary=False, pass=1)",
2106 inst_disk.iv_name, node)
2107 if not ignore_secondaries:
2110 # FIXME: race condition on drbd migration to primary
2112 # 2nd pass, do only the primary node
2113 for inst_disk in instance.disks:
2114 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2115 if node != instance.primary_node:
2117 lu.cfg.SetDiskID(node_disk, node)
2118 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2120 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2121 " (is_primary=True, pass=2)",
2122 inst_disk.iv_name, node)
2124 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2126 # leave the disks configured for the primary node
2127 # this is a workaround that would be fixed better by
2128 # improving the logical/physical id handling
2129 for disk in instance.disks:
2130 lu.cfg.SetDiskID(disk, instance.primary_node)
2132 return disks_ok, device_info
2135 def _StartInstanceDisks(lu, instance, force):
2136 """Start the disks of an instance.
2139 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2140 ignore_secondaries=force)
2142 _ShutdownInstanceDisks(lu, instance)
2143 if force is not None and not force:
2144 lu.proc.LogWarning("", hint="If the message above refers to a"
2146 " you can retry the operation using '--force'.")
2147 raise errors.OpExecError("Disk consistency error")
2150 class LUDeactivateInstanceDisks(NoHooksLU):
2151 """Shutdown an instance's disks.
2154 _OP_REQP = ["instance_name"]
2157 def ExpandNames(self):
2158 self._ExpandAndLockInstance()
2159 self.needed_locks[locking.LEVEL_NODE] = []
2160 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2162 def DeclareLocks(self, level):
2163 if level == locking.LEVEL_NODE:
2164 self._LockInstancesNodes()
2166 def CheckPrereq(self):
2167 """Check prerequisites.
2169 This checks that the instance is in the cluster.
2172 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173 assert self.instance is not None, \
2174 "Cannot retrieve locked instance %s" % self.op.instance_name
2176 def Exec(self, feedback_fn):
2177 """Deactivate the disks
2180 instance = self.instance
2181 _SafeShutdownInstanceDisks(self, instance)
2184 def _SafeShutdownInstanceDisks(lu, instance):
2185 """Shutdown block devices of an instance.
2187 This function checks if an instance is running, before calling
2188 _ShutdownInstanceDisks.
2191 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2192 [instance.hypervisor])
2193 ins_l = ins_l[instance.primary_node]
2194 if not type(ins_l) is list:
2195 raise errors.OpExecError("Can't contact node '%s'" %
2196 instance.primary_node)
2198 if instance.name in ins_l:
2199 raise errors.OpExecError("Instance is running, can't shutdown"
2202 _ShutdownInstanceDisks(lu, instance)
2205 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2206 """Shutdown block devices of an instance.
2208 This does the shutdown on all nodes of the instance.
2210 If the ignore_primary is false, errors on the primary node are
2215 for disk in instance.disks:
2216 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2217 lu.cfg.SetDiskID(top_disk, node)
2218 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2219 logging.error("Could not shutdown block device %s on node %s",
2221 if not ignore_primary or node != instance.primary_node:
2226 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2227 """Checks if a node has enough free memory.
2229 This function check if a given node has the needed amount of free
2230 memory. In case the node has less memory or we cannot get the
2231 information from the node, this function raise an OpPrereqError
2234 @type lu: C{LogicalUnit}
2235 @param lu: a logical unit from which we get configuration data
2237 @param node: the node to check
2238 @type reason: C{str}
2239 @param reason: string to use in the error message
2240 @type requested: C{int}
2241 @param requested: the amount of memory in MiB to check for
2242 @type hypervisor: C{str}
2243 @param hypervisor: the hypervisor to ask for memory stats
2244 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2245 we cannot check the node
2248 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2249 if not nodeinfo or not isinstance(nodeinfo, dict):
2250 raise errors.OpPrereqError("Could not contact node %s for resource"
2251 " information" % (node,))
2253 free_mem = nodeinfo[node].get('memory_free')
2254 if not isinstance(free_mem, int):
2255 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2256 " was '%s'" % (node, free_mem))
2257 if requested > free_mem:
2258 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2259 " needed %s MiB, available %s MiB" %
2260 (node, reason, requested, free_mem))
2263 class LUStartupInstance(LogicalUnit):
2264 """Starts an instance.
2267 HPATH = "instance-start"
2268 HTYPE = constants.HTYPE_INSTANCE
2269 _OP_REQP = ["instance_name", "force"]
2272 def ExpandNames(self):
2273 self._ExpandAndLockInstance()
2275 def BuildHooksEnv(self):
2278 This runs on master, primary and secondary nodes of the instance.
2282 "FORCE": self.op.force,
2284 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2285 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2286 list(self.instance.secondary_nodes))
2289 def CheckPrereq(self):
2290 """Check prerequisites.
2292 This checks that the instance is in the cluster.
2295 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2296 assert self.instance is not None, \
2297 "Cannot retrieve locked instance %s" % self.op.instance_name
2299 bep = self.cfg.GetClusterInfo().FillBE(instance)
2300 # check bridges existance
2301 _CheckInstanceBridgesExist(self, instance)
2303 _CheckNodeFreeMemory(self, instance.primary_node,
2304 "starting instance %s" % instance.name,
2305 bep[constants.BE_MEMORY], instance.hypervisor)
2307 def Exec(self, feedback_fn):
2308 """Start the instance.
2311 instance = self.instance
2312 force = self.op.force
2313 extra_args = getattr(self.op, "extra_args", "")
2315 self.cfg.MarkInstanceUp(instance.name)
2317 node_current = instance.primary_node
2319 _StartInstanceDisks(self, instance, force)
2321 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2322 _ShutdownInstanceDisks(self, instance)
2323 raise errors.OpExecError("Could not start instance")
2326 class LURebootInstance(LogicalUnit):
2327 """Reboot an instance.
2330 HPATH = "instance-reboot"
2331 HTYPE = constants.HTYPE_INSTANCE
2332 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2335 def ExpandNames(self):
2336 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2337 constants.INSTANCE_REBOOT_HARD,
2338 constants.INSTANCE_REBOOT_FULL]:
2339 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2340 (constants.INSTANCE_REBOOT_SOFT,
2341 constants.INSTANCE_REBOOT_HARD,
2342 constants.INSTANCE_REBOOT_FULL))
2343 self._ExpandAndLockInstance()
2345 def BuildHooksEnv(self):
2348 This runs on master, primary and secondary nodes of the instance.
2352 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2354 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2355 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356 list(self.instance.secondary_nodes))
2359 def CheckPrereq(self):
2360 """Check prerequisites.
2362 This checks that the instance is in the cluster.
2365 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366 assert self.instance is not None, \
2367 "Cannot retrieve locked instance %s" % self.op.instance_name
2369 # check bridges existance
2370 _CheckInstanceBridgesExist(self, instance)
2372 def Exec(self, feedback_fn):
2373 """Reboot the instance.
2376 instance = self.instance
2377 ignore_secondaries = self.op.ignore_secondaries
2378 reboot_type = self.op.reboot_type
2379 extra_args = getattr(self.op, "extra_args", "")
2381 node_current = instance.primary_node
2383 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2384 constants.INSTANCE_REBOOT_HARD]:
2385 if not self.rpc.call_instance_reboot(node_current, instance,
2386 reboot_type, extra_args):
2387 raise errors.OpExecError("Could not reboot instance")
2389 if not self.rpc.call_instance_shutdown(node_current, instance):
2390 raise errors.OpExecError("could not shutdown instance for full reboot")
2391 _ShutdownInstanceDisks(self, instance)
2392 _StartInstanceDisks(self, instance, ignore_secondaries)
2393 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2394 _ShutdownInstanceDisks(self, instance)
2395 raise errors.OpExecError("Could not start instance for full reboot")
2397 self.cfg.MarkInstanceUp(instance.name)
2400 class LUShutdownInstance(LogicalUnit):
2401 """Shutdown an instance.
2404 HPATH = "instance-stop"
2405 HTYPE = constants.HTYPE_INSTANCE
2406 _OP_REQP = ["instance_name"]
2409 def ExpandNames(self):
2410 self._ExpandAndLockInstance()
2412 def BuildHooksEnv(self):
2415 This runs on master, primary and secondary nodes of the instance.
2418 env = _BuildInstanceHookEnvByObject(self, self.instance)
2419 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420 list(self.instance.secondary_nodes))
2423 def CheckPrereq(self):
2424 """Check prerequisites.
2426 This checks that the instance is in the cluster.
2429 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430 assert self.instance is not None, \
2431 "Cannot retrieve locked instance %s" % self.op.instance_name
2433 def Exec(self, feedback_fn):
2434 """Shutdown the instance.
2437 instance = self.instance
2438 node_current = instance.primary_node
2439 self.cfg.MarkInstanceDown(instance.name)
2440 if not self.rpc.call_instance_shutdown(node_current, instance):
2441 self.proc.LogWarning("Could not shutdown instance")
2443 _ShutdownInstanceDisks(self, instance)
2446 class LUReinstallInstance(LogicalUnit):
2447 """Reinstall an instance.
2450 HPATH = "instance-reinstall"
2451 HTYPE = constants.HTYPE_INSTANCE
2452 _OP_REQP = ["instance_name"]
2455 def ExpandNames(self):
2456 self._ExpandAndLockInstance()
2458 def BuildHooksEnv(self):
2461 This runs on master, primary and secondary nodes of the instance.
2464 env = _BuildInstanceHookEnvByObject(self, self.instance)
2465 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2466 list(self.instance.secondary_nodes))
2469 def CheckPrereq(self):
2470 """Check prerequisites.
2472 This checks that the instance is in the cluster and is not running.
2475 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2476 assert instance is not None, \
2477 "Cannot retrieve locked instance %s" % self.op.instance_name
2479 if instance.disk_template == constants.DT_DISKLESS:
2480 raise errors.OpPrereqError("Instance '%s' has no disks" %
2481 self.op.instance_name)
2482 if instance.status != "down":
2483 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2484 self.op.instance_name)
2485 remote_info = self.rpc.call_instance_info(instance.primary_node,
2487 instance.hypervisor)
2489 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2490 (self.op.instance_name,
2491 instance.primary_node))
2493 self.op.os_type = getattr(self.op, "os_type", None)
2494 if self.op.os_type is not None:
2496 pnode = self.cfg.GetNodeInfo(
2497 self.cfg.ExpandNodeName(instance.primary_node))
2499 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2501 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2503 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2504 " primary node" % self.op.os_type)
2506 self.instance = instance
2508 def Exec(self, feedback_fn):
2509 """Reinstall the instance.
2512 inst = self.instance
2514 if self.op.os_type is not None:
2515 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2516 inst.os = self.op.os_type
2517 self.cfg.Update(inst)
2519 _StartInstanceDisks(self, inst, None)
2521 feedback_fn("Running the instance OS create scripts...")
2522 if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2523 raise errors.OpExecError("Could not install OS for instance %s"
2525 (inst.name, inst.primary_node))
2527 _ShutdownInstanceDisks(self, inst)
2530 class LURenameInstance(LogicalUnit):
2531 """Rename an instance.
2534 HPATH = "instance-rename"
2535 HTYPE = constants.HTYPE_INSTANCE
2536 _OP_REQP = ["instance_name", "new_name"]
2538 def BuildHooksEnv(self):
2541 This runs on master, primary and secondary nodes of the instance.
2544 env = _BuildInstanceHookEnvByObject(self, self.instance)
2545 env["INSTANCE_NEW_NAME"] = self.op.new_name
2546 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2547 list(self.instance.secondary_nodes))
2550 def CheckPrereq(self):
2551 """Check prerequisites.
2553 This checks that the instance is in the cluster and is not running.
2556 instance = self.cfg.GetInstanceInfo(
2557 self.cfg.ExpandInstanceName(self.op.instance_name))
2558 if instance is None:
2559 raise errors.OpPrereqError("Instance '%s' not known" %
2560 self.op.instance_name)
2561 if instance.status != "down":
2562 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2563 self.op.instance_name)
2564 remote_info = self.rpc.call_instance_info(instance.primary_node,
2566 instance.hypervisor)
2568 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569 (self.op.instance_name,
2570 instance.primary_node))
2571 self.instance = instance
2573 # new name verification
2574 name_info = utils.HostInfo(self.op.new_name)
2576 self.op.new_name = new_name = name_info.name
2577 instance_list = self.cfg.GetInstanceList()
2578 if new_name in instance_list:
2579 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2582 if not getattr(self.op, "ignore_ip", False):
2583 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2584 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2585 (name_info.ip, new_name))
2588 def Exec(self, feedback_fn):
2589 """Reinstall the instance.
2592 inst = self.instance
2593 old_name = inst.name
2595 if inst.disk_template == constants.DT_FILE:
2596 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2598 self.cfg.RenameInstance(inst.name, self.op.new_name)
2599 # Change the instance lock. This is definitely safe while we hold the BGL
2600 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2601 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2603 # re-read the instance from the configuration after rename
2604 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2606 if inst.disk_template == constants.DT_FILE:
2607 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2609 old_file_storage_dir,
2610 new_file_storage_dir)
2613 raise errors.OpExecError("Could not connect to node '%s' to rename"
2614 " directory '%s' to '%s' (but the instance"
2615 " has been renamed in Ganeti)" % (
2616 inst.primary_node, old_file_storage_dir,
2617 new_file_storage_dir))
2620 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2621 " (but the instance has been renamed in"
2622 " Ganeti)" % (old_file_storage_dir,
2623 new_file_storage_dir))
2625 _StartInstanceDisks(self, inst, None)
2627 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2629 msg = ("Could not run OS rename script for instance %s on node %s"
2630 " (but the instance has been renamed in Ganeti)" %
2631 (inst.name, inst.primary_node))
2632 self.proc.LogWarning(msg)
2634 _ShutdownInstanceDisks(self, inst)
2637 class LURemoveInstance(LogicalUnit):
2638 """Remove an instance.
2641 HPATH = "instance-remove"
2642 HTYPE = constants.HTYPE_INSTANCE
2643 _OP_REQP = ["instance_name", "ignore_failures"]
2646 def ExpandNames(self):
2647 self._ExpandAndLockInstance()
2648 self.needed_locks[locking.LEVEL_NODE] = []
2649 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2651 def DeclareLocks(self, level):
2652 if level == locking.LEVEL_NODE:
2653 self._LockInstancesNodes()
2655 def BuildHooksEnv(self):
2658 This runs on master, primary and secondary nodes of the instance.
2661 env = _BuildInstanceHookEnvByObject(self, self.instance)
2662 nl = [self.cfg.GetMasterNode()]
2665 def CheckPrereq(self):
2666 """Check prerequisites.
2668 This checks that the instance is in the cluster.
2671 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672 assert self.instance is not None, \
2673 "Cannot retrieve locked instance %s" % self.op.instance_name
2675 def Exec(self, feedback_fn):
2676 """Remove the instance.
2679 instance = self.instance
2680 logging.info("Shutting down instance %s on node %s",
2681 instance.name, instance.primary_node)
2683 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2684 if self.op.ignore_failures:
2685 feedback_fn("Warning: can't shutdown instance")
2687 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2688 (instance.name, instance.primary_node))
2690 logging.info("Removing block devices for instance %s", instance.name)
2692 if not _RemoveDisks(self, instance):
2693 if self.op.ignore_failures:
2694 feedback_fn("Warning: can't remove instance's disks")
2696 raise errors.OpExecError("Can't remove instance's disks")
2698 logging.info("Removing instance %s out of cluster config", instance.name)
2700 self.cfg.RemoveInstance(instance.name)
2701 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2704 class LUQueryInstances(NoHooksLU):
2705 """Logical unit for querying instances.
2708 _OP_REQP = ["output_fields", "names"]
2710 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2711 "admin_state", "admin_ram",
2712 "disk_template", "ip", "mac", "bridge",
2713 "sda_size", "sdb_size", "vcpus", "tags",
2714 "network_port", "beparams",
2715 "(disk).(size)/([0-9]+)",
2717 "(nic).(mac|ip|bridge)/([0-9]+)",
2718 "(nic).(macs|ips|bridges)",
2719 "(disk|nic).(count)",
2720 "serial_no", "hypervisor", "hvparams",] +
2722 for name in constants.HVS_PARAMETERS] +
2724 for name in constants.BES_PARAMETERS])
2725 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2728 def ExpandNames(self):
2729 _CheckOutputFields(static=self._FIELDS_STATIC,
2730 dynamic=self._FIELDS_DYNAMIC,
2731 selected=self.op.output_fields)
2733 self.needed_locks = {}
2734 self.share_locks[locking.LEVEL_INSTANCE] = 1
2735 self.share_locks[locking.LEVEL_NODE] = 1
2738 self.wanted = _GetWantedInstances(self, self.op.names)
2740 self.wanted = locking.ALL_SET
2742 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2744 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2745 self.needed_locks[locking.LEVEL_NODE] = []
2746 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2748 def DeclareLocks(self, level):
2749 if level == locking.LEVEL_NODE and self.do_locking:
2750 self._LockInstancesNodes()
2752 def CheckPrereq(self):
2753 """Check prerequisites.
2758 def Exec(self, feedback_fn):
2759 """Computes the list of nodes and their attributes.
2762 all_info = self.cfg.GetAllInstancesInfo()
2764 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2765 elif self.wanted != locking.ALL_SET:
2766 instance_names = self.wanted
2767 missing = set(instance_names).difference(all_info.keys())
2769 raise errors.OpExecError(
2770 "Some instances were removed before retrieving their data: %s"
2773 instance_names = all_info.keys()
2775 instance_names = utils.NiceSort(instance_names)
2776 instance_list = [all_info[iname] for iname in instance_names]
2778 # begin data gathering
2780 nodes = frozenset([inst.primary_node for inst in instance_list])
2781 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2786 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2788 result = node_data[name]
2790 live_data.update(result)
2791 elif result == False:
2792 bad_nodes.append(name)
2793 # else no instance is alive
2795 live_data = dict([(name, {}) for name in instance_names])
2797 # end data gathering
2802 for instance in instance_list:
2804 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2805 i_be = self.cfg.GetClusterInfo().FillBE(instance)
2806 for field in self.op.output_fields:
2807 st_match = self._FIELDS_STATIC.Matches(field)
2812 elif field == "pnode":
2813 val = instance.primary_node
2814 elif field == "snodes":
2815 val = list(instance.secondary_nodes)
2816 elif field == "admin_state":
2817 val = (instance.status != "down")
2818 elif field == "oper_state":
2819 if instance.primary_node in bad_nodes:
2822 val = bool(live_data.get(instance.name))
2823 elif field == "status":
2824 if instance.primary_node in bad_nodes:
2825 val = "ERROR_nodedown"
2827 running = bool(live_data.get(instance.name))
2829 if instance.status != "down":
2834 if instance.status != "down":
2838 elif field == "oper_ram":
2839 if instance.primary_node in bad_nodes:
2841 elif instance.name in live_data:
2842 val = live_data[instance.name].get("memory", "?")
2845 elif field == "disk_template":
2846 val = instance.disk_template
2848 val = instance.nics[0].ip
2849 elif field == "bridge":
2850 val = instance.nics[0].bridge
2851 elif field == "mac":
2852 val = instance.nics[0].mac
2853 elif field == "sda_size" or field == "sdb_size":
2854 idx = ord(field[2]) - ord('a')
2856 val = instance.FindDisk(idx).size
2857 except errors.OpPrereqError:
2859 elif field == "tags":
2860 val = list(instance.GetTags())
2861 elif field == "serial_no":
2862 val = instance.serial_no
2863 elif field == "network_port":
2864 val = instance.network_port
2865 elif field == "hypervisor":
2866 val = instance.hypervisor
2867 elif field == "hvparams":
2869 elif (field.startswith(HVPREFIX) and
2870 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2871 val = i_hv.get(field[len(HVPREFIX):], None)
2872 elif field == "beparams":
2874 elif (field.startswith(BEPREFIX) and
2875 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2876 val = i_be.get(field[len(BEPREFIX):], None)
2877 elif st_match and st_match.groups():
2878 # matches a variable list
2879 st_groups = st_match.groups()
2880 if st_groups and st_groups[0] == "disk":
2881 if st_groups[1] == "count":
2882 val = len(instance.disks)
2883 elif st_groups[1] == "sizes":
2884 val = [disk.size for disk in instance.disks]
2885 elif st_groups[1] == "size":
2887 val = instance.FindDisk(st_groups[2]).size
2888 except errors.OpPrereqError:
2891 assert False, "Unhandled disk parameter"
2892 elif st_groups[0] == "nic":
2893 if st_groups[1] == "count":
2894 val = len(instance.nics)
2895 elif st_groups[1] == "macs":
2896 val = [nic.mac for nic in instance.nics]
2897 elif st_groups[1] == "ips":
2898 val = [nic.ip for nic in instance.nics]
2899 elif st_groups[1] == "bridges":
2900 val = [nic.bridge for nic in instance.nics]
2903 nic_idx = int(st_groups[2])
2904 if nic_idx >= len(instance.nics):
2907 if st_groups[1] == "mac":
2908 val = instance.nics[nic_idx].mac
2909 elif st_groups[1] == "ip":
2910 val = instance.nics[nic_idx].ip
2911 elif st_groups[1] == "bridge":
2912 val = instance.nics[nic_idx].bridge
2914 assert False, "Unhandled NIC parameter"
2916 assert False, "Unhandled variable parameter"
2918 raise errors.ParameterError(field)
2925 class LUFailoverInstance(LogicalUnit):
2926 """Failover an instance.
2929 HPATH = "instance-failover"
2930 HTYPE = constants.HTYPE_INSTANCE
2931 _OP_REQP = ["instance_name", "ignore_consistency"]
2934 def ExpandNames(self):
2935 self._ExpandAndLockInstance()
2936 self.needed_locks[locking.LEVEL_NODE] = []
2937 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2939 def DeclareLocks(self, level):
2940 if level == locking.LEVEL_NODE:
2941 self._LockInstancesNodes()
2943 def BuildHooksEnv(self):
2946 This runs on master, primary and secondary nodes of the instance.
2950 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2952 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2953 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2956 def CheckPrereq(self):
2957 """Check prerequisites.
2959 This checks that the instance is in the cluster.
2962 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2963 assert self.instance is not None, \
2964 "Cannot retrieve locked instance %s" % self.op.instance_name
2966 bep = self.cfg.GetClusterInfo().FillBE(instance)
2967 if instance.disk_template not in constants.DTS_NET_MIRROR:
2968 raise errors.OpPrereqError("Instance's disk layout is not"
2969 " network mirrored, cannot failover.")
2971 secondary_nodes = instance.secondary_nodes
2972 if not secondary_nodes:
2973 raise errors.ProgrammerError("no secondary node but using "
2974 "a mirrored disk template")
2976 target_node = secondary_nodes[0]
2977 # check memory requirements on the secondary node
2978 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2979 instance.name, bep[constants.BE_MEMORY],
2980 instance.hypervisor)
2982 # check bridge existance
2983 brlist = [nic.bridge for nic in instance.nics]
2984 if not self.rpc.call_bridges_exist(target_node, brlist):
2985 raise errors.OpPrereqError("One or more target bridges %s does not"
2986 " exist on destination node '%s'" %
2987 (brlist, target_node))
2989 def Exec(self, feedback_fn):
2990 """Failover an instance.
2992 The failover is done by shutting it down on its present node and
2993 starting it on the secondary.
2996 instance = self.instance
2998 source_node = instance.primary_node
2999 target_node = instance.secondary_nodes[0]
3001 feedback_fn("* checking disk consistency between source and target")
3002 for dev in instance.disks:
3003 # for drbd, these are drbd over lvm
3004 if not _CheckDiskConsistency(self, dev, target_node, False):
3005 if instance.status == "up" and not self.op.ignore_consistency:
3006 raise errors.OpExecError("Disk %s is degraded on target node,"
3007 " aborting failover." % dev.iv_name)
3009 feedback_fn("* shutting down instance on source node")
3010 logging.info("Shutting down instance %s on node %s",
3011 instance.name, source_node)
3013 if not self.rpc.call_instance_shutdown(source_node, instance):
3014 if self.op.ignore_consistency:
3015 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3017 " anyway. Please make sure node %s is down",
3018 instance.name, source_node, source_node)
3020 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3021 (instance.name, source_node))
3023 feedback_fn("* deactivating the instance's disks on source node")
3024 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3025 raise errors.OpExecError("Can't shut down the instance's disks.")
3027 instance.primary_node = target_node
3028 # distribute new instance config to the other nodes
3029 self.cfg.Update(instance)
3031 # Only start the instance if it's marked as up
3032 if instance.status == "up":
3033 feedback_fn("* activating the instance's disks on target node")
3034 logging.info("Starting instance %s on node %s",
3035 instance.name, target_node)
3037 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3038 ignore_secondaries=True)
3040 _ShutdownInstanceDisks(self, instance)
3041 raise errors.OpExecError("Can't activate the instance's disks")
3043 feedback_fn("* starting the instance on the target node")
3044 if not self.rpc.call_instance_start(target_node, instance, None):
3045 _ShutdownInstanceDisks(self, instance)
3046 raise errors.OpExecError("Could not start instance %s on node %s." %
3047 (instance.name, target_node))
3050 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3051 """Create a tree of block devices on the primary node.
3053 This always creates all devices.
3057 for child in device.children:
3058 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3061 lu.cfg.SetDiskID(device, node)
3062 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3063 instance.name, True, info)
3066 if device.physical_id is None:
3067 device.physical_id = new_id
3071 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3072 """Create a tree of block devices on a secondary node.
3074 If this device type has to be created on secondaries, create it and
3077 If not, just recurse to children keeping the same 'force' value.
3080 if device.CreateOnSecondary():
3083 for child in device.children:
3084 if not _CreateBlockDevOnSecondary(lu, node, instance,
3085 child, force, info):
3090 lu.cfg.SetDiskID(device, node)
3091 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3092 instance.name, False, info)
3095 if device.physical_id is None:
3096 device.physical_id = new_id
3100 def _GenerateUniqueNames(lu, exts):
3101 """Generate a suitable LV name.
3103 This will generate a logical volume name for the given instance.
3108 new_id = lu.cfg.GenerateUniqueID()
3109 results.append("%s%s" % (new_id, val))
3113 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3115 """Generate a drbd8 device complete with its children.
3118 port = lu.cfg.AllocatePort()
3119 vgname = lu.cfg.GetVGName()
3120 shared_secret = lu.cfg.GenerateDRBDSecret()
3121 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3122 logical_id=(vgname, names[0]))
3123 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3124 logical_id=(vgname, names[1]))
3125 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3126 logical_id=(primary, secondary, port,
3129 children=[dev_data, dev_meta],
3134 def _GenerateDiskTemplate(lu, template_name,
3135 instance_name, primary_node,
3136 secondary_nodes, disk_info,
3137 file_storage_dir, file_driver,
3139 """Generate the entire disk layout for a given template type.
3142 #TODO: compute space requirements
3144 vgname = lu.cfg.GetVGName()
3145 disk_count = len(disk_info)
3147 if template_name == constants.DT_DISKLESS:
3149 elif template_name == constants.DT_PLAIN:
3150 if len(secondary_nodes) != 0:
3151 raise errors.ProgrammerError("Wrong template configuration")
3153 names = _GenerateUniqueNames(lu, [".disk%d" % i
3154 for i in range(disk_count)])
3155 for idx, disk in enumerate(disk_info):
3156 disk_index = idx + base_index
3157 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3158 logical_id=(vgname, names[idx]),
3159 iv_name="disk/%d" % disk_index)
3160 disks.append(disk_dev)
3161 elif template_name == constants.DT_DRBD8:
3162 if len(secondary_nodes) != 1:
3163 raise errors.ProgrammerError("Wrong template configuration")
3164 remote_node = secondary_nodes[0]
3165 minors = lu.cfg.AllocateDRBDMinor(
3166 [primary_node, remote_node] * len(disk_info), instance_name)
3168 names = _GenerateUniqueNames(lu,
3169 [".disk%d_%s" % (i, s)
3170 for i in range(disk_count)
3171 for s in ("data", "meta")
3173 for idx, disk in enumerate(disk_info):
3174 disk_index = idx + base_index
3175 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3176 disk["size"], names[idx*2:idx*2+2],
3177 "disk/%d" % disk_index,
3178 minors[idx*2], minors[idx*2+1])
3179 disks.append(disk_dev)
3180 elif template_name == constants.DT_FILE:
3181 if len(secondary_nodes) != 0:
3182 raise errors.ProgrammerError("Wrong template configuration")
3184 for idx, disk in enumerate(disk_info):
3185 disk_index = idx + base_index
3186 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3187 iv_name="disk/%d" % disk_index,
3188 logical_id=(file_driver,
3189 "%s/disk%d" % (file_storage_dir,
3191 disks.append(disk_dev)
3193 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3197 def _GetInstanceInfoText(instance):
3198 """Compute that text that should be added to the disk's metadata.
3201 return "originstname+%s" % instance.name
3204 def _CreateDisks(lu, instance):
3205 """Create all disks for an instance.
3207 This abstracts away some work from AddInstance.
3209 @type lu: L{LogicalUnit}
3210 @param lu: the logical unit on whose behalf we execute
3211 @type instance: L{objects.Instance}
3212 @param instance: the instance whose disks we should create
3214 @return: the success of the creation
3217 info = _GetInstanceInfoText(instance)
3219 if instance.disk_template == constants.DT_FILE:
3220 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3221 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3225 logging.error("Could not connect to node '%s'", instance.primary_node)
3229 logging.error("Failed to create directory '%s'", file_storage_dir)
3232 # Note: this needs to be kept in sync with adding of disks in
3233 # LUSetInstanceParams
3234 for device in instance.disks:
3235 logging.info("Creating volume %s for instance %s",
3236 device.iv_name, instance.name)
3238 for secondary_node in instance.secondary_nodes:
3239 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3240 device, False, info):
3241 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3242 device.iv_name, device, secondary_node)
3245 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3246 instance, device, info):
3247 logging.error("Failed to create volume %s on primary!", device.iv_name)
3253 def _RemoveDisks(lu, instance):
3254 """Remove all disks for an instance.
3256 This abstracts away some work from `AddInstance()` and
3257 `RemoveInstance()`. Note that in case some of the devices couldn't
3258 be removed, the removal will continue with the other ones (compare
3259 with `_CreateDisks()`).
3261 @type lu: L{LogicalUnit}
3262 @param lu: the logical unit on whose behalf we execute
3263 @type instance: L{objects.Instance}
3264 @param instance: the instance whose disks we should remove
3266 @return: the success of the removal
3269 logging.info("Removing block devices for instance %s", instance.name)
3272 for device in instance.disks:
3273 for node, disk in device.ComputeNodeTree(instance.primary_node):
3274 lu.cfg.SetDiskID(disk, node)
3275 if not lu.rpc.call_blockdev_remove(node, disk):
3276 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3277 " continuing anyway", device.iv_name, node)
3280 if instance.disk_template == constants.DT_FILE:
3281 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3282 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3284 logging.error("Could not remove directory '%s'", file_storage_dir)
3290 def _ComputeDiskSize(disk_template, disks):
3291 """Compute disk size requirements in the volume group
3294 # Required free disk space as a function of disk and swap space
3296 constants.DT_DISKLESS: None,
3297 constants.DT_PLAIN: sum(d["size"] for d in disks),
3298 # 128 MB are added for drbd metadata for each disk
3299 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3300 constants.DT_FILE: None,
3303 if disk_template not in req_size_dict:
3304 raise errors.ProgrammerError("Disk template '%s' size requirement"
3305 " is unknown" % disk_template)
3307 return req_size_dict[disk_template]
3310 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3311 """Hypervisor parameter validation.
3313 This function abstract the hypervisor parameter validation to be
3314 used in both instance create and instance modify.
3316 @type lu: L{LogicalUnit}
3317 @param lu: the logical unit for which we check
3318 @type nodenames: list
3319 @param nodenames: the list of nodes on which we should check
3320 @type hvname: string
3321 @param hvname: the name of the hypervisor we should use
3322 @type hvparams: dict
3323 @param hvparams: the parameters which we need to check
3324 @raise errors.OpPrereqError: if the parameters are not valid
3327 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3330 for node in nodenames:
3331 info = hvinfo.get(node, None)
3332 if not info or not isinstance(info, (tuple, list)):
3333 raise errors.OpPrereqError("Cannot get current information"
3334 " from node '%s' (%s)" % (node, info))
3336 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3340 class LUCreateInstance(LogicalUnit):
3341 """Create an instance.
3344 HPATH = "instance-add"
3345 HTYPE = constants.HTYPE_INSTANCE
3346 _OP_REQP = ["instance_name", "disks", "disk_template",
3348 "wait_for_sync", "ip_check", "nics",
3349 "hvparams", "beparams"]
3352 def _ExpandNode(self, node):
3353 """Expands and checks one node name.
3356 node_full = self.cfg.ExpandNodeName(node)
3357 if node_full is None:
3358 raise errors.OpPrereqError("Unknown node %s" % node)
3361 def ExpandNames(self):
3362 """ExpandNames for CreateInstance.
3364 Figure out the right locks for instance creation.
3367 self.needed_locks = {}
3369 # set optional parameters to none if they don't exist
3370 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3371 if not hasattr(self.op, attr):
3372 setattr(self.op, attr, None)
3374 # cheap checks, mostly valid constants given
3376 # verify creation mode
3377 if self.op.mode not in (constants.INSTANCE_CREATE,
3378 constants.INSTANCE_IMPORT):
3379 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3382 # disk template and mirror node verification
3383 if self.op.disk_template not in constants.DISK_TEMPLATES:
3384 raise errors.OpPrereqError("Invalid disk template name")
3386 if self.op.hypervisor is None:
3387 self.op.hypervisor = self.cfg.GetHypervisorType()
3389 cluster = self.cfg.GetClusterInfo()
3390 enabled_hvs = cluster.enabled_hypervisors
3391 if self.op.hypervisor not in enabled_hvs:
3392 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3393 " cluster (%s)" % (self.op.hypervisor,
3394 ",".join(enabled_hvs)))
3396 # check hypervisor parameter syntax (locally)
3398 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3400 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3401 hv_type.CheckParameterSyntax(filled_hvp)
3403 # fill and remember the beparams dict
3404 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3407 #### instance parameters check
3409 # instance name verification
3410 hostname1 = utils.HostInfo(self.op.instance_name)
3411 self.op.instance_name = instance_name = hostname1.name
3413 # this is just a preventive check, but someone might still add this
3414 # instance in the meantime, and creation will fail at lock-add time
3415 if instance_name in self.cfg.GetInstanceList():
3416 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3419 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3423 for nic in self.op.nics:
3424 # ip validity checks
3425 ip = nic.get("ip", None)
3426 if ip is None or ip.lower() == "none":
3428 elif ip.lower() == constants.VALUE_AUTO:
3429 nic_ip = hostname1.ip
3431 if not utils.IsValidIP(ip):
3432 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3433 " like a valid IP" % ip)
3436 # MAC address verification
3437 mac = nic.get("mac", constants.VALUE_AUTO)
3438 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3439 if not utils.IsValidMac(mac.lower()):
3440 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3442 # bridge verification
3443 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3444 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3446 # disk checks/pre-build
3448 for disk in self.op.disks:
3449 mode = disk.get("mode", constants.DISK_RDWR)
3450 if mode not in constants.DISK_ACCESS_SET:
3451 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3453 size = disk.get("size", None)
3455 raise errors.OpPrereqError("Missing disk size")
3459 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3460 self.disks.append({"size": size, "mode": mode})
3462 # used in CheckPrereq for ip ping check
3463 self.check_ip = hostname1.ip
3465 # file storage checks
3466 if (self.op.file_driver and
3467 not self.op.file_driver in constants.FILE_DRIVER):
3468 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3469 self.op.file_driver)
3471 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3472 raise errors.OpPrereqError("File storage directory path not absolute")
3474 ### Node/iallocator related checks
3475 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3476 raise errors.OpPrereqError("One and only one of iallocator and primary"
3477 " node must be given")
3479 if self.op.iallocator:
3480 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3482 self.op.pnode = self._ExpandNode(self.op.pnode)
3483 nodelist = [self.op.pnode]
3484 if self.op.snode is not None:
3485 self.op.snode = self._ExpandNode(self.op.snode)
3486 nodelist.append(self.op.snode)
3487 self.needed_locks[locking.LEVEL_NODE] = nodelist
3489 # in case of import lock the source node too
3490 if self.op.mode == constants.INSTANCE_IMPORT:
3491 src_node = getattr(self.op, "src_node", None)
3492 src_path = getattr(self.op, "src_path", None)
3494 if src_node is None or src_path is None:
3495 raise errors.OpPrereqError("Importing an instance requires source"
3496 " node and path options")
3498 if not os.path.isabs(src_path):
3499 raise errors.OpPrereqError("The source path must be absolute")
3501 self.op.src_node = src_node = self._ExpandNode(src_node)
3502 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3503 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3505 else: # INSTANCE_CREATE
3506 if getattr(self.op, "os_type", None) is None:
3507 raise errors.OpPrereqError("No guest OS specified")
3509 def _RunAllocator(self):
3510 """Run the allocator based on input opcode.
3513 nics = [n.ToDict() for n in self.nics]
3514 ial = IAllocator(self,
3515 mode=constants.IALLOCATOR_MODE_ALLOC,
3516 name=self.op.instance_name,
3517 disk_template=self.op.disk_template,
3520 vcpus=self.be_full[constants.BE_VCPUS],
3521 mem_size=self.be_full[constants.BE_MEMORY],
3524 hypervisor=self.op.hypervisor,
3527 ial.Run(self.op.iallocator)
3530 raise errors.OpPrereqError("Can't compute nodes using"
3531 " iallocator '%s': %s" % (self.op.iallocator,
3533 if len(ial.nodes) != ial.required_nodes:
3534 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3535 " of nodes (%s), required %s" %
3536 (self.op.iallocator, len(ial.nodes),
3537 ial.required_nodes))
3538 self.op.pnode = ial.nodes[0]
3539 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3540 self.op.instance_name, self.op.iallocator,
3541 ", ".join(ial.nodes))
3542 if ial.required_nodes == 2:
3543 self.op.snode = ial.nodes[1]
3545 def BuildHooksEnv(self):
3548 This runs on master, primary and secondary nodes of the instance.
3552 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3553 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3554 "INSTANCE_ADD_MODE": self.op.mode,
3556 if self.op.mode == constants.INSTANCE_IMPORT:
3557 env["INSTANCE_SRC_NODE"] = self.op.src_node
3558 env["INSTANCE_SRC_PATH"] = self.op.src_path
3559 env["INSTANCE_SRC_IMAGES"] = self.src_images
3561 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3562 primary_node=self.op.pnode,
3563 secondary_nodes=self.secondaries,
3564 status=self.instance_status,
3565 os_type=self.op.os_type,
3566 memory=self.be_full[constants.BE_MEMORY],
3567 vcpus=self.be_full[constants.BE_VCPUS],
3568 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3571 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3576 def CheckPrereq(self):
3577 """Check prerequisites.
3580 if (not self.cfg.GetVGName() and
3581 self.op.disk_template not in constants.DTS_NOT_LVM):
3582 raise errors.OpPrereqError("Cluster does not support lvm-based"
3586 if self.op.mode == constants.INSTANCE_IMPORT:
3587 src_node = self.op.src_node
3588 src_path = self.op.src_path
3590 export_info = self.rpc.call_export_info(src_node, src_path)
3593 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3595 if not export_info.has_section(constants.INISECT_EXP):
3596 raise errors.ProgrammerError("Corrupted export config")
3598 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3599 if (int(ei_version) != constants.EXPORT_VERSION):
3600 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3601 (ei_version, constants.EXPORT_VERSION))
3603 # Check that the new instance doesn't have less disks than the export
3604 instance_disks = len(self.disks)
3605 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3606 if instance_disks < export_disks:
3607 raise errors.OpPrereqError("Not enough disks to import."
3608 " (instance: %d, export: %d)" %
3611 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3613 for idx in range(export_disks):
3614 option = 'disk%d_dump' % idx
3615 if export_info.has_option(constants.INISECT_INS, option):
3616 # FIXME: are the old os-es, disk sizes, etc. useful?
3617 export_name = export_info.get(constants.INISECT_INS, option)
3618 image = os.path.join(src_path, export_name)
3619 disk_images.append(image)
3621 disk_images.append(False)
3623 self.src_images = disk_images
3625 old_name = export_info.get(constants.INISECT_INS, 'name')
3626 # FIXME: int() here could throw a ValueError on broken exports
3627 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3628 if self.op.instance_name == old_name:
3629 for idx, nic in enumerate(self.nics):
3630 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3631 nic_mac_ini = 'nic%d_mac' % idx
3632 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3634 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3635 if self.op.start and not self.op.ip_check:
3636 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3637 " adding an instance in start mode")
3639 if self.op.ip_check:
3640 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3641 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3642 (self.check_ip, self.op.instance_name))
3646 if self.op.iallocator is not None:
3647 self._RunAllocator()
3649 #### node related checks
3651 # check primary node
3652 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3653 assert self.pnode is not None, \
3654 "Cannot retrieve locked node %s" % self.op.pnode
3655 self.secondaries = []
3657 # mirror node verification
3658 if self.op.disk_template in constants.DTS_NET_MIRROR:
3659 if self.op.snode is None:
3660 raise errors.OpPrereqError("The networked disk templates need"
3662 if self.op.snode == pnode.name:
3663 raise errors.OpPrereqError("The secondary node cannot be"
3664 " the primary node.")
3665 self.secondaries.append(self.op.snode)
3667 nodenames = [pnode.name] + self.secondaries
3669 req_size = _ComputeDiskSize(self.op.disk_template,
3672 # Check lv size requirements
3673 if req_size is not None:
3674 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3676 for node in nodenames:
3677 info = nodeinfo.get(node, None)
3679 raise errors.OpPrereqError("Cannot get current information"
3680 " from node '%s'" % node)
3681 vg_free = info.get('vg_free', None)
3682 if not isinstance(vg_free, int):
3683 raise errors.OpPrereqError("Can't compute free disk space on"
3685 if req_size > info['vg_free']:
3686 raise errors.OpPrereqError("Not enough disk space on target node %s."
3687 " %d MB available, %d MB required" %
3688 (node, info['vg_free'], req_size))
3690 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3693 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3695 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3696 " primary node" % self.op.os_type)
3698 # bridge check on primary node
3699 bridges = [n.bridge for n in self.nics]
3700 if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3701 raise errors.OpPrereqError("one of the target bridges '%s' does not"
3703 " destination node '%s'" %
3704 (",".join(bridges), pnode.name))
3706 # memory check on primary node
3708 _CheckNodeFreeMemory(self, self.pnode.name,
3709 "creating instance %s" % self.op.instance_name,
3710 self.be_full[constants.BE_MEMORY],
3714 self.instance_status = 'up'
3716 self.instance_status = 'down'
3718 def Exec(self, feedback_fn):
3719 """Create and add the instance to the cluster.
3722 instance = self.op.instance_name
3723 pnode_name = self.pnode.name
3725 for nic in self.nics:
3726 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3727 nic.mac = self.cfg.GenerateMAC()
3729 ht_kind = self.op.hypervisor
3730 if ht_kind in constants.HTS_REQ_PORT:
3731 network_port = self.cfg.AllocatePort()
3735 ##if self.op.vnc_bind_address is None:
3736 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3738 # this is needed because os.path.join does not accept None arguments
3739 if self.op.file_storage_dir is None:
3740 string_file_storage_dir = ""
3742 string_file_storage_dir = self.op.file_storage_dir
3744 # build the full file storage dir path
3745 file_storage_dir = os.path.normpath(os.path.join(
3746 self.cfg.GetFileStorageDir(),
3747 string_file_storage_dir, instance))
3750 disks = _GenerateDiskTemplate(self,
3751 self.op.disk_template,
3752 instance, pnode_name,
3756 self.op.file_driver,
3759 iobj = objects.Instance(name=instance, os=self.op.os_type,
3760 primary_node=pnode_name,
3761 nics=self.nics, disks=disks,
3762 disk_template=self.op.disk_template,
3763 status=self.instance_status,
3764 network_port=network_port,
3765 beparams=self.op.beparams,
3766 hvparams=self.op.hvparams,
3767 hypervisor=self.op.hypervisor,
3770 feedback_fn("* creating instance disks...")
3771 if not _CreateDisks(self, iobj):
3772 _RemoveDisks(self, iobj)
3773 self.cfg.ReleaseDRBDMinors(instance)
3774 raise errors.OpExecError("Device creation failed, reverting...")
3776 feedback_fn("adding instance %s to cluster config" % instance)
3778 self.cfg.AddInstance(iobj)
3779 # Declare that we don't want to remove the instance lock anymore, as we've
3780 # added the instance to the config
3781 del self.remove_locks[locking.LEVEL_INSTANCE]
3782 # Remove the temp. assignements for the instance's drbds
3783 self.cfg.ReleaseDRBDMinors(instance)
3784 # Unlock all the nodes
3785 self.context.glm.release(locking.LEVEL_NODE)
3786 del self.acquired_locks[locking.LEVEL_NODE]
3788 if self.op.wait_for_sync:
3789 disk_abort = not _WaitForSync(self, iobj)
3790 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3791 # make sure the disks are not degraded (still sync-ing is ok)
3793 feedback_fn("* checking mirrors status")
3794 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3799 _RemoveDisks(self, iobj)
3800 self.cfg.RemoveInstance(iobj.name)
3801 # Make sure the instance lock gets removed
3802 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3803 raise errors.OpExecError("There are some degraded disks for"
3806 feedback_fn("creating os for instance %s on node %s" %
3807 (instance, pnode_name))
3809 if iobj.disk_template != constants.DT_DISKLESS:
3810 if self.op.mode == constants.INSTANCE_CREATE:
3811 feedback_fn("* running the instance OS create scripts...")
3812 if not self.rpc.call_instance_os_add(pnode_name, iobj):
3813 raise errors.OpExecError("could not add os for instance %s"
3815 (instance, pnode_name))
3817 elif self.op.mode == constants.INSTANCE_IMPORT:
3818 feedback_fn("* running the instance OS import scripts...")
3819 src_node = self.op.src_node
3820 src_images = self.src_images
3821 cluster_name = self.cfg.GetClusterName()
3822 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3823 src_node, src_images,
3825 for idx, result in enumerate(import_result):
3827 self.LogWarning("Could not image %s for on instance %s, disk %d,"
3828 " on node %s" % (src_images[idx], instance, idx,
3831 # also checked in the prereq part
3832 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3836 logging.info("Starting instance %s on node %s", instance, pnode_name)
3837 feedback_fn("* starting instance...")
3838 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3839 raise errors.OpExecError("Could not start instance")
3842 class LUConnectConsole(NoHooksLU):
3843 """Connect to an instance's console.
3845 This is somewhat special in that it returns the command line that
3846 you need to run on the master node in order to connect to the
3850 _OP_REQP = ["instance_name"]
3853 def ExpandNames(self):
3854 self._ExpandAndLockInstance()
3856 def CheckPrereq(self):
3857 """Check prerequisites.
3859 This checks that the instance is in the cluster.
3862 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3863 assert self.instance is not None, \
3864 "Cannot retrieve locked instance %s" % self.op.instance_name
3866 def Exec(self, feedback_fn):
3867 """Connect to the console of an instance
3870 instance = self.instance
3871 node = instance.primary_node
3873 node_insts = self.rpc.call_instance_list([node],
3874 [instance.hypervisor])[node]
3875 if node_insts is False:
3876 raise errors.OpExecError("Can't connect to node %s." % node)
3878 if instance.name not in node_insts:
3879 raise errors.OpExecError("Instance %s is not running." % instance.name)
3881 logging.debug("Connecting to console of %s on %s", instance.name, node)
3883 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3884 console_cmd = hyper.GetShellCommandForConsole(instance)
3887 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3890 class LUReplaceDisks(LogicalUnit):
3891 """Replace the disks of an instance.
3894 HPATH = "mirrors-replace"
3895 HTYPE = constants.HTYPE_INSTANCE
3896 _OP_REQP = ["instance_name", "mode", "disks"]
3899 def ExpandNames(self):
3900 self._ExpandAndLockInstance()
3902 if not hasattr(self.op, "remote_node"):
3903 self.op.remote_node = None
3905 ia_name = getattr(self.op, "iallocator", None)
3906 if ia_name is not None:
3907 if self.op.remote_node is not None:
3908 raise errors.OpPrereqError("Give either the iallocator or the new"
3909 " secondary, not both")
3910 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3911 elif self.op.remote_node is not None:
3912 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3913 if remote_node is None:
3914 raise errors.OpPrereqError("Node '%s' not known" %
3915 self.op.remote_node)
3916 self.op.remote_node = remote_node
3917 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3918 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3920 self.needed_locks[locking.LEVEL_NODE] = []
3921 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3923 def DeclareLocks(self, level):
3924 # If we're not already locking all nodes in the set we have to declare the
3925 # instance's primary/secondary nodes.
3926 if (level == locking.LEVEL_NODE and
3927 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3928 self._LockInstancesNodes()
3930 def _RunAllocator(self):
3931 """Compute a new secondary node using an IAllocator.
3934 ial = IAllocator(self,
3935 mode=constants.IALLOCATOR_MODE_RELOC,
3936 name=self.op.instance_name,
3937 relocate_from=[self.sec_node])
3939 ial.Run(self.op.iallocator)
3942 raise errors.OpPrereqError("Can't compute nodes using"
3943 " iallocator '%s': %s" % (self.op.iallocator,
3945 if len(ial.nodes) != ial.required_nodes:
3946 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3947 " of nodes (%s), required %s" %
3948 (len(ial.nodes), ial.required_nodes))
3949 self.op.remote_node = ial.nodes[0]
3950 self.LogInfo("Selected new secondary for the instance: %s",
3951 self.op.remote_node)
3953 def BuildHooksEnv(self):
3956 This runs on the master, the primary and all the secondaries.
3960 "MODE": self.op.mode,
3961 "NEW_SECONDARY": self.op.remote_node,
3962 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3964 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3966 self.cfg.GetMasterNode(),
3967 self.instance.primary_node,
3969 if self.op.remote_node is not None:
3970 nl.append(self.op.remote_node)
3973 def CheckPrereq(self):
3974 """Check prerequisites.
3976 This checks that the instance is in the cluster.
3979 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3980 assert instance is not None, \
3981 "Cannot retrieve locked instance %s" % self.op.instance_name
3982 self.instance = instance
3984 if instance.disk_template not in constants.DTS_NET_MIRROR:
3985 raise errors.OpPrereqError("Instance's disk layout is not"
3986 " network mirrored.")
3988 if len(instance.secondary_nodes) != 1:
3989 raise errors.OpPrereqError("The instance has a strange layout,"
3990 " expected one secondary but found %d" %
3991 len(instance.secondary_nodes))
3993 self.sec_node = instance.secondary_nodes[0]
3995 ia_name = getattr(self.op, "iallocator", None)
3996 if ia_name is not None:
3997 self._RunAllocator()
3999 remote_node = self.op.remote_node
4000 if remote_node is not None:
4001 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4002 assert self.remote_node_info is not None, \
4003 "Cannot retrieve locked node %s" % remote_node
4005 self.remote_node_info = None
4006 if remote_node == instance.primary_node:
4007 raise errors.OpPrereqError("The specified node is the primary node of"
4009 elif remote_node == self.sec_node:
4010 if self.op.mode == constants.REPLACE_DISK_SEC:
4011 # this is for DRBD8, where we can't execute the same mode of
4012 # replacement as for drbd7 (no different port allocated)
4013 raise errors.OpPrereqError("Same secondary given, cannot execute"
4015 if instance.disk_template == constants.DT_DRBD8:
4016 if (self.op.mode == constants.REPLACE_DISK_ALL and
4017 remote_node is not None):
4018 # switch to replace secondary mode
4019 self.op.mode = constants.REPLACE_DISK_SEC
4021 if self.op.mode == constants.REPLACE_DISK_ALL:
4022 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4023 " secondary disk replacement, not"
4025 elif self.op.mode == constants.REPLACE_DISK_PRI:
4026 if remote_node is not None:
4027 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4028 " the secondary while doing a primary"
4029 " node disk replacement")
4030 self.tgt_node = instance.primary_node
4031 self.oth_node = instance.secondary_nodes[0]
4032 elif self.op.mode == constants.REPLACE_DISK_SEC:
4033 self.new_node = remote_node # this can be None, in which case
4034 # we don't change the secondary
4035 self.tgt_node = instance.secondary_nodes[0]
4036 self.oth_node = instance.primary_node
4038 raise errors.ProgrammerError("Unhandled disk replace mode")
4040 if not self.op.disks:
4041 self.op.disks = range(len(instance.disks))
4043 for disk_idx in self.op.disks:
4044 instance.FindDisk(disk_idx)
4046 def _ExecD8DiskOnly(self, feedback_fn):
4047 """Replace a disk on the primary or secondary for dbrd8.
4049 The algorithm for replace is quite complicated:
4051 1. for each disk to be replaced:
4053 1. create new LVs on the target node with unique names
4054 1. detach old LVs from the drbd device
4055 1. rename old LVs to name_replaced.<time_t>
4056 1. rename new LVs to old LVs
4057 1. attach the new LVs (with the old names now) to the drbd device
4059 1. wait for sync across all devices
4061 1. for each modified disk:
4063 1. remove old LVs (which have the name name_replaces.<time_t>)
4065 Failures are not very well handled.
4069 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4070 instance = self.instance
4072 vgname = self.cfg.GetVGName()
4075 tgt_node = self.tgt_node
4076 oth_node = self.oth_node
4078 # Step: check device activation
4079 self.proc.LogStep(1, steps_total, "check device existence")
4080 info("checking volume groups")
4081 my_vg = cfg.GetVGName()
4082 results = self.rpc.call_vg_list([oth_node, tgt_node])
4084 raise errors.OpExecError("Can't list volume groups on the nodes")
4085 for node in oth_node, tgt_node:
4086 res = results.get(node, False)
4087 if not res or my_vg not in res:
4088 raise errors.OpExecError("Volume group '%s' not found on %s" %
4090 for idx, dev in enumerate(instance.disks):
4091 if idx not in self.op.disks:
4093 for node in tgt_node, oth_node:
4094 info("checking disk/%d on %s" % (idx, node))
4095 cfg.SetDiskID(dev, node)
4096 if not self.rpc.call_blockdev_find(node, dev):
4097 raise errors.OpExecError("Can't find disk/%d on node %s" %
4100 # Step: check other node consistency
4101 self.proc.LogStep(2, steps_total, "check peer consistency")
4102 for idx, dev in enumerate(instance.disks):
4103 if idx not in self.op.disks:
4105 info("checking disk/%d consistency on %s" % (idx, oth_node))
4106 if not _CheckDiskConsistency(self, dev, oth_node,
4107 oth_node==instance.primary_node):
4108 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4109 " to replace disks on this node (%s)" %
4110 (oth_node, tgt_node))
4112 # Step: create new storage
4113 self.proc.LogStep(3, steps_total, "allocate new storage")
4114 for idx, dev in enumerate(instance.disks):
4115 if idx not in self.op.disks:
4118 cfg.SetDiskID(dev, tgt_node)
4119 lv_names = [".disk%d_%s" % (idx, suf)
4120 for suf in ["data", "meta"]]
4121 names = _GenerateUniqueNames(self, lv_names)
4122 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4123 logical_id=(vgname, names[0]))
4124 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4125 logical_id=(vgname, names[1]))
4126 new_lvs = [lv_data, lv_meta]
4127 old_lvs = dev.children
4128 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4129 info("creating new local storage on %s for %s" %
4130 (tgt_node, dev.iv_name))
4131 # since we *always* want to create this LV, we use the
4132 # _Create...OnPrimary (which forces the creation), even if we
4133 # are talking about the secondary node
4134 for new_lv in new_lvs:
4135 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4136 _GetInstanceInfoText(instance)):
4137 raise errors.OpExecError("Failed to create new LV named '%s' on"
4139 (new_lv.logical_id[1], tgt_node))
4141 # Step: for each lv, detach+rename*2+attach
4142 self.proc.LogStep(4, steps_total, "change drbd configuration")
4143 for dev, old_lvs, new_lvs in iv_names.itervalues():
4144 info("detaching %s drbd from local storage" % dev.iv_name)
4145 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4146 raise errors.OpExecError("Can't detach drbd from local storage on node"
4147 " %s for device %s" % (tgt_node, dev.iv_name))
4149 #cfg.Update(instance)
4151 # ok, we created the new LVs, so now we know we have the needed
4152 # storage; as such, we proceed on the target node to rename
4153 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4154 # using the assumption that logical_id == physical_id (which in
4155 # turn is the unique_id on that node)
4157 # FIXME(iustin): use a better name for the replaced LVs
4158 temp_suffix = int(time.time())
4159 ren_fn = lambda d, suff: (d.physical_id[0],
4160 d.physical_id[1] + "_replaced-%s" % suff)
4161 # build the rename list based on what LVs exist on the node
4163 for to_ren in old_lvs:
4164 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4165 if find_res is not None: # device exists
4166 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4168 info("renaming the old LVs on the target node")
4169 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4170 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4171 # now we rename the new LVs to the old LVs
4172 info("renaming the new LVs on the target node")
4173 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4174 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4175 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4177 for old, new in zip(old_lvs, new_lvs):
4178 new.logical_id = old.logical_id
4179 cfg.SetDiskID(new, tgt_node)
4181 for disk in old_lvs:
4182 disk.logical_id = ren_fn(disk, temp_suffix)
4183 cfg.SetDiskID(disk, tgt_node)
4185 # now that the new lvs have the old name, we can add them to the device
4186 info("adding new mirror component on %s" % tgt_node)
4187 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4188 for new_lv in new_lvs:
4189 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4190 warning("Can't rollback device %s", hint="manually cleanup unused"
4192 raise errors.OpExecError("Can't add local storage to drbd")
4194 dev.children = new_lvs
4195 cfg.Update(instance)
4197 # Step: wait for sync
4199 # this can fail as the old devices are degraded and _WaitForSync
4200 # does a combined result over all disks, so we don't check its
4202 self.proc.LogStep(5, steps_total, "sync devices")
4203 _WaitForSync(self, instance, unlock=True)
4205 # so check manually all the devices
4206 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4207 cfg.SetDiskID(dev, instance.primary_node)
4208 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4210 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4212 # Step: remove old storage
4213 self.proc.LogStep(6, steps_total, "removing old storage")
4214 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4215 info("remove logical volumes for %s" % name)
4217 cfg.SetDiskID(lv, tgt_node)
4218 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4219 warning("Can't remove old LV", hint="manually remove unused LVs")
4222 def _ExecD8Secondary(self, feedback_fn):
4223 """Replace the secondary node for drbd8.
4225 The algorithm for replace is quite complicated:
4226 - for all disks of the instance:
4227 - create new LVs on the new node with same names
4228 - shutdown the drbd device on the old secondary
4229 - disconnect the drbd network on the primary
4230 - create the drbd device on the new secondary
4231 - network attach the drbd on the primary, using an artifice:
4232 the drbd code for Attach() will connect to the network if it
4233 finds a device which is connected to the good local disks but
4235 - wait for sync across all devices
4236 - remove all disks from the old secondary
4238 Failures are not very well handled.
4242 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4243 instance = self.instance
4245 vgname = self.cfg.GetVGName()
4248 old_node = self.tgt_node
4249 new_node = self.new_node
4250 pri_node = instance.primary_node
4252 # Step: check device activation
4253 self.proc.LogStep(1, steps_total, "check device existence")
4254 info("checking volume groups")
4255 my_vg = cfg.GetVGName()
4256 results = self.rpc.call_vg_list([pri_node, new_node])
4258 raise errors.OpExecError("Can't list volume groups on the nodes")
4259 for node in pri_node, new_node:
4260 res = results.get(node, False)
4261 if not res or my_vg not in res:
4262 raise errors.OpExecError("Volume group '%s' not found on %s" %
4264 for idx, dev in enumerate(instance.disks):
4265 if idx not in self.op.disks:
4267 info("checking disk/%d on %s" % (idx, pri_node))
4268 cfg.SetDiskID(dev, pri_node)
4269 if not self.rpc.call_blockdev_find(pri_node, dev):
4270 raise errors.OpExecError("Can't find disk/%d on node %s" %
4273 # Step: check other node consistency
4274 self.proc.LogStep(2, steps_total, "check peer consistency")
4275 for idx, dev in enumerate(instance.disks):
4276 if idx not in self.op.disks:
4278 info("checking disk/%d consistency on %s" % (idx, pri_node))
4279 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4280 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4281 " unsafe to replace the secondary" %
4284 # Step: create new storage
4285 self.proc.LogStep(3, steps_total, "allocate new storage")
4286 for idx, dev in enumerate(instance.disks):
4288 info("adding new local storage on %s for disk/%d" %
4290 # since we *always* want to create this LV, we use the
4291 # _Create...OnPrimary (which forces the creation), even if we
4292 # are talking about the secondary node
4293 for new_lv in dev.children:
4294 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4295 _GetInstanceInfoText(instance)):
4296 raise errors.OpExecError("Failed to create new LV named '%s' on"
4298 (new_lv.logical_id[1], new_node))
4300 # Step 4: dbrd minors and drbd setups changes
4301 # after this, we must manually remove the drbd minors on both the
4302 # error and the success paths
4303 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4305 logging.debug("Allocated minors %s" % (minors,))
4306 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4307 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4309 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4310 # create new devices on new_node
4311 if pri_node == dev.logical_id[0]:
4312 new_logical_id = (pri_node, new_node,
4313 dev.logical_id[2], dev.logical_id[3], new_minor,
4316 new_logical_id = (new_node, pri_node,
4317 dev.logical_id[2], new_minor, dev.logical_id[4],
4319 iv_names[idx] = (dev, dev.children, new_logical_id)
4320 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4322 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4323 logical_id=new_logical_id,
4324 children=dev.children)
4325 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4327 _GetInstanceInfoText(instance)):
4328 self.cfg.ReleaseDRBDMinors(instance.name)
4329 raise errors.OpExecError("Failed to create new DRBD on"
4330 " node '%s'" % new_node)
4332 for idx, dev in enumerate(instance.disks):
4333 # we have new devices, shutdown the drbd on the old secondary
4334 info("shutting down drbd for disk/%d on old node" % idx)
4335 cfg.SetDiskID(dev, old_node)
4336 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4337 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4338 hint="Please cleanup this device manually as soon as possible")
4340 info("detaching primary drbds from the network (=> standalone)")
4342 for idx, dev in enumerate(instance.disks):
4343 cfg.SetDiskID(dev, pri_node)
4344 # set the network part of the physical (unique in bdev terms) id
4345 # to None, meaning detach from network
4346 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4347 # and 'find' the device, which will 'fix' it to match the
4349 if self.rpc.call_blockdev_find(pri_node, dev):
4352 warning("Failed to detach drbd disk/%d from network, unusual case" %
4356 # no detaches succeeded (very unlikely)
4357 self.cfg.ReleaseDRBDMinors(instance.name)
4358 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4360 # if we managed to detach at least one, we update all the disks of
4361 # the instance to point to the new secondary
4362 info("updating instance configuration")
4363 for dev, _, new_logical_id in iv_names.itervalues():
4364 dev.logical_id = new_logical_id
4365 cfg.SetDiskID(dev, pri_node)
4366 cfg.Update(instance)
4367 # we can remove now the temp minors as now the new values are
4368 # written to the config file (and therefore stable)
4369 self.cfg.ReleaseDRBDMinors(instance.name)
4371 # and now perform the drbd attach
4372 info("attaching primary drbds to new secondary (standalone => connected)")
4374 for idx, dev in enumerate(instance.disks):
4375 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4376 # since the attach is smart, it's enough to 'find' the device,
4377 # it will automatically activate the network, if the physical_id
4379 cfg.SetDiskID(dev, pri_node)
4380 logging.debug("Disk to attach: %s", dev)
4381 if not self.rpc.call_blockdev_find(pri_node, dev):
4382 warning("can't attach drbd disk/%d to new secondary!" % idx,
4383 "please do a gnt-instance info to see the status of disks")
4385 # this can fail as the old devices are degraded and _WaitForSync
4386 # does a combined result over all disks, so we don't check its
4388 self.proc.LogStep(5, steps_total, "sync devices")
4389 _WaitForSync(self, instance, unlock=True)
4391 # so check manually all the devices
4392 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4393 cfg.SetDiskID(dev, pri_node)
4394 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4396 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4398 self.proc.LogStep(6, steps_total, "removing old storage")
4399 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4400 info("remove logical volumes for disk/%d" % idx)
4402 cfg.SetDiskID(lv, old_node)
4403 if not self.rpc.call_blockdev_remove(old_node, lv):
4404 warning("Can't remove LV on old secondary",
4405 hint="Cleanup stale volumes by hand")
4407 def Exec(self, feedback_fn):
4408 """Execute disk replacement.
4410 This dispatches the disk replacement to the appropriate handler.
4413 instance = self.instance
4415 # Activate the instance disks if we're replacing them on a down instance
4416 if instance.status == "down":
4417 _StartInstanceDisks(self, instance, True)
4419 if instance.disk_template == constants.DT_DRBD8:
4420 if self.op.remote_node is None:
4421 fn = self._ExecD8DiskOnly
4423 fn = self._ExecD8Secondary
4425 raise errors.ProgrammerError("Unhandled disk replacement case")
4427 ret = fn(feedback_fn)
4429 # Deactivate the instance disks if we're replacing them on a down instance
4430 if instance.status == "down":
4431 _SafeShutdownInstanceDisks(self, instance)
4436 class LUGrowDisk(LogicalUnit):
4437 """Grow a disk of an instance.
4441 HTYPE = constants.HTYPE_INSTANCE
4442 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4445 def ExpandNames(self):
4446 self._ExpandAndLockInstance()
4447 self.needed_locks[locking.LEVEL_NODE] = []
4448 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4450 def DeclareLocks(self, level):
4451 if level == locking.LEVEL_NODE:
4452 self._LockInstancesNodes()
4454 def BuildHooksEnv(self):
4457 This runs on the master, the primary and all the secondaries.
4461 "DISK": self.op.disk,
4462 "AMOUNT": self.op.amount,
4464 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4466 self.cfg.GetMasterNode(),
4467 self.instance.primary_node,
4471 def CheckPrereq(self):
4472 """Check prerequisites.
4474 This checks that the instance is in the cluster.
4477 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4478 assert instance is not None, \
4479 "Cannot retrieve locked instance %s" % self.op.instance_name
4481 self.instance = instance
4483 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4484 raise errors.OpPrereqError("Instance's disk layout does not support"
4487 self.disk = instance.FindDisk(self.op.disk)
4489 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4490 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4491 instance.hypervisor)
4492 for node in nodenames:
4493 info = nodeinfo.get(node, None)
4495 raise errors.OpPrereqError("Cannot get current information"
4496 " from node '%s'" % node)
4497 vg_free = info.get('vg_free', None)
4498 if not isinstance(vg_free, int):
4499 raise errors.OpPrereqError("Can't compute free disk space on"
4501 if self.op.amount > info['vg_free']:
4502 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4503 " %d MiB available, %d MiB required" %
4504 (node, info['vg_free'], self.op.amount))
4506 def Exec(self, feedback_fn):
4507 """Execute disk grow.
4510 instance = self.instance
4512 for node in (instance.secondary_nodes + (instance.primary_node,)):
4513 self.cfg.SetDiskID(disk, node)
4514 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4515 if (not result or not isinstance(result, (list, tuple)) or
4517 raise errors.OpExecError("grow request failed to node %s" % node)
4519 raise errors.OpExecError("grow request failed to node %s: %s" %
4521 disk.RecordGrow(self.op.amount)
4522 self.cfg.Update(instance)
4523 if self.op.wait_for_sync:
4524 disk_abort = not _WaitForSync(self, instance)
4526 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4527 " status.\nPlease check the instance.")
4530 class LUQueryInstanceData(NoHooksLU):
4531 """Query runtime instance data.
4534 _OP_REQP = ["instances", "static"]
4537 def ExpandNames(self):
4538 self.needed_locks = {}
4539 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4541 if not isinstance(self.op.instances, list):
4542 raise errors.OpPrereqError("Invalid argument type 'instances'")
4544 if self.op.instances:
4545 self.wanted_names = []
4546 for name in self.op.instances:
4547 full_name = self.cfg.ExpandInstanceName(name)
4548 if full_name is None:
4549 raise errors.OpPrereqError("Instance '%s' not known" %
4550 self.op.instance_name)
4551 self.wanted_names.append(full_name)
4552 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4554 self.wanted_names = None
4555 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4557 self.needed_locks[locking.LEVEL_NODE] = []
4558 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4560 def DeclareLocks(self, level):
4561 if level == locking.LEVEL_NODE:
4562 self._LockInstancesNodes()
4564 def CheckPrereq(self):
4565 """Check prerequisites.
4567 This only checks the optional instance list against the existing names.
4570 if self.wanted_names is None:
4571 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4573 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4574 in self.wanted_names]
4577 def _ComputeDiskStatus(self, instance, snode, dev):
4578 """Compute block device status.
4581 static = self.op.static
4583 self.cfg.SetDiskID(dev, instance.primary_node)
4584 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4588 if dev.dev_type in constants.LDS_DRBD:
4589 # we change the snode then (otherwise we use the one passed in)
4590 if dev.logical_id[0] == instance.primary_node:
4591 snode = dev.logical_id[1]
4593 snode = dev.logical_id[0]
4595 if snode and not static:
4596 self.cfg.SetDiskID(dev, snode)
4597 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4602 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4603 for child in dev.children]
4608 "iv_name": dev.iv_name,
4609 "dev_type": dev.dev_type,
4610 "logical_id": dev.logical_id,
4611 "physical_id": dev.physical_id,
4612 "pstatus": dev_pstatus,
4613 "sstatus": dev_sstatus,
4614 "children": dev_children,
4620 def Exec(self, feedback_fn):
4621 """Gather and return data"""
4624 cluster = self.cfg.GetClusterInfo()
4626 for instance in self.wanted_instances:
4627 if not self.op.static:
4628 remote_info = self.rpc.call_instance_info(instance.primary_node,
4630 instance.hypervisor)
4631 if remote_info and "state" in remote_info:
4634 remote_state = "down"
4637 if instance.status == "down":
4638 config_state = "down"
4642 disks = [self._ComputeDiskStatus(instance, None, device)
4643 for device in instance.disks]
4646 "name": instance.name,
4647 "config_state": config_state,
4648 "run_state": remote_state,
4649 "pnode": instance.primary_node,
4650 "snodes": instance.secondary_nodes,
4652 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4654 "hypervisor": instance.hypervisor,
4655 "network_port": instance.network_port,
4656 "hv_instance": instance.hvparams,
4657 "hv_actual": cluster.FillHV(instance),
4658 "be_instance": instance.beparams,
4659 "be_actual": cluster.FillBE(instance),
4662 result[instance.name] = idict
4667 class LUSetInstanceParams(LogicalUnit):
4668 """Modifies an instances's parameters.
4671 HPATH = "instance-modify"
4672 HTYPE = constants.HTYPE_INSTANCE
4673 _OP_REQP = ["instance_name"]
4676 def CheckArguments(self):
4677 if not hasattr(self.op, 'nics'):
4679 if not hasattr(self.op, 'disks'):
4681 if not hasattr(self.op, 'beparams'):
4682 self.op.beparams = {}
4683 if not hasattr(self.op, 'hvparams'):
4684 self.op.hvparams = {}
4685 self.op.force = getattr(self.op, "force", False)
4686 if not (self.op.nics or self.op.disks or
4687 self.op.hvparams or self.op.beparams):
4688 raise errors.OpPrereqError("No changes submitted")
4690 for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4691 val = self.op.beparams.get(item, None)
4695 except ValueError, err:
4696 raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4697 self.op.beparams[item] = val
4700 for disk_op, disk_dict in self.op.disks:
4701 if disk_op == constants.DDM_REMOVE:
4704 elif disk_op == constants.DDM_ADD:
4707 if not isinstance(disk_op, int):
4708 raise errors.OpPrereqError("Invalid disk index")
4709 if disk_op == constants.DDM_ADD:
4710 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4711 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4712 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4713 size = disk_dict.get('size', None)
4715 raise errors.OpPrereqError("Required disk parameter size missing")
4718 except ValueError, err:
4719 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4721 disk_dict['size'] = size
4723 # modification of disk
4724 if 'size' in disk_dict:
4725 raise errors.OpPrereqError("Disk size change not possible, use"
4728 if disk_addremove > 1:
4729 raise errors.OpPrereqError("Only one disk add or remove operation"
4730 " supported at a time")
4734 for nic_op, nic_dict in self.op.nics:
4735 if nic_op == constants.DDM_REMOVE:
4738 elif nic_op == constants.DDM_ADD:
4741 if not isinstance(nic_op, int):
4742 raise errors.OpPrereqError("Invalid nic index")
4744 # nic_dict should be a dict
4745 nic_ip = nic_dict.get('ip', None)
4746 if nic_ip is not None:
4747 if nic_ip.lower() == "none":
4748 nic_dict['ip'] = None
4750 if not utils.IsValidIP(nic_ip):
4751 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
4752 # we can only check None bridges and assign the default one
4753 nic_bridge = nic_dict.get('bridge', None)
4754 if nic_bridge is None:
4755 nic_dict['bridge'] = self.cfg.GetDefBridge()
4756 # but we can validate MACs
4757 nic_mac = nic_dict.get('mac', None)
4758 if nic_mac is not None:
4759 if self.cfg.IsMacInUse(nic_mac):
4760 raise errors.OpPrereqError("MAC address %s already in use"
4761 " in cluster" % nic_mac)
4762 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4763 if not utils.IsValidMac(nic_mac):
4764 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
4765 if nic_addremove > 1:
4766 raise errors.OpPrereqError("Only one NIC add or remove operation"
4767 " supported at a time")
4769 def ExpandNames(self):
4770 self._ExpandAndLockInstance()
4771 self.needed_locks[locking.LEVEL_NODE] = []
4772 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4774 def DeclareLocks(self, level):
4775 if level == locking.LEVEL_NODE:
4776 self._LockInstancesNodes()
4778 def BuildHooksEnv(self):
4781 This runs on the master, primary and secondaries.
4785 if constants.BE_MEMORY in self.be_new:
4786 args['memory'] = self.be_new[constants.BE_MEMORY]
4787 if constants.BE_VCPUS in self.be_new:
4788 args['vcpus'] = self.be_new[constants.BE_VCPUS]
4789 # FIXME: readd disk/nic changes
4790 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4791 nl = [self.cfg.GetMasterNode(),
4792 self.instance.primary_node] + list(self.instance.secondary_nodes)
4795 def CheckPrereq(self):
4796 """Check prerequisites.
4798 This only checks the instance list against the existing names.
4801 force = self.force = self.op.force
4803 # checking the new params on the primary/secondary nodes
4805 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4806 assert self.instance is not None, \
4807 "Cannot retrieve locked instance %s" % self.op.instance_name
4808 pnode = self.instance.primary_node
4810 nodelist.extend(instance.secondary_nodes)
4812 # hvparams processing
4813 if self.op.hvparams:
4814 i_hvdict = copy.deepcopy(instance.hvparams)
4815 for key, val in self.op.hvparams.iteritems():
4823 cluster = self.cfg.GetClusterInfo()
4824 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4827 hypervisor.GetHypervisor(
4828 instance.hypervisor).CheckParameterSyntax(hv_new)
4829 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4830 self.hv_new = hv_new # the new actual values
4831 self.hv_inst = i_hvdict # the new dict (without defaults)
4833 self.hv_new = self.hv_inst = {}
4835 # beparams processing
4836 if self.op.beparams:
4837 i_bedict = copy.deepcopy(instance.beparams)
4838 for key, val in self.op.beparams.iteritems():
4846 cluster = self.cfg.GetClusterInfo()
4847 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4849 self.be_new = be_new # the new actual values
4850 self.be_inst = i_bedict # the new dict (without defaults)
4852 self.be_new = self.be_inst = {}
4856 if constants.BE_MEMORY in self.op.beparams and not self.force:
4857 mem_check_list = [pnode]
4858 if be_new[constants.BE_AUTO_BALANCE]:
4859 # either we changed auto_balance to yes or it was from before
4860 mem_check_list.extend(instance.secondary_nodes)
4861 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4862 instance.hypervisor)
4863 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4864 instance.hypervisor)
4866 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4867 # Assume the primary node is unreachable and go ahead
4868 self.warn.append("Can't get info from primary node %s" % pnode)
4871 current_mem = instance_info['memory']
4873 # Assume instance not running
4874 # (there is a slight race condition here, but it's not very probable,
4875 # and we have no other way to check)
4877 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4878 nodeinfo[pnode]['memory_free'])
4880 raise errors.OpPrereqError("This change will prevent the instance"
4881 " from starting, due to %d MB of memory"
4882 " missing on its primary node" % miss_mem)
4884 if be_new[constants.BE_AUTO_BALANCE]:
4885 for node in instance.secondary_nodes:
4886 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4887 self.warn.append("Can't get info from secondary node %s" % node)
4888 elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4889 self.warn.append("Not enough memory to failover instance to"
4890 " secondary node %s" % node)
4893 for nic_op, nic_dict in self.op.nics:
4894 if nic_op == constants.DDM_REMOVE:
4895 if not instance.nics:
4896 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
4898 if nic_op != constants.DDM_ADD:
4900 if nic_op < 0 or nic_op >= len(instance.nics):
4901 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
4903 (nic_op, len(instance.nics)))
4904 nic_bridge = nic_dict.get('bridge', None)
4905 if nic_bridge is not None:
4906 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
4907 msg = ("Bridge '%s' doesn't exist on one of"
4908 " the instance nodes" % nic_bridge)
4910 self.warn.append(msg)
4912 raise errors.OpPrereqError(msg)
4915 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
4916 raise errors.OpPrereqError("Disk operations not supported for"
4917 " diskless instances")
4918 for disk_op, disk_dict in self.op.disks:
4919 if disk_op == constants.DDM_REMOVE:
4920 if len(instance.disks) == 1:
4921 raise errors.OpPrereqError("Cannot remove the last disk of"
4923 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
4924 ins_l = ins_l[pnode]
4925 if not type(ins_l) is list:
4926 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
4927 if instance.name in ins_l:
4928 raise errors.OpPrereqError("Instance is running, can't remove"
4931 if (disk_op == constants.DDM_ADD and
4932 len(instance.nics) >= constants.MAX_DISKS):
4933 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
4934 " add more" % constants.MAX_DISKS)
4935 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
4937 if disk_op < 0 or disk_op >= len(instance.disks):
4938 raise errors.OpPrereqError("Invalid disk index %s, valid values"
4940 (disk_op, len(instance.disks)))
4944 def Exec(self, feedback_fn):
4945 """Modifies an instance.
4947 All parameters take effect only at the next restart of the instance.
4950 # Process here the warnings from CheckPrereq, as we don't have a
4951 # feedback_fn there.
4952 for warn in self.warn:
4953 feedback_fn("WARNING: %s" % warn)
4956 instance = self.instance
4958 for disk_op, disk_dict in self.op.disks:
4959 if disk_op == constants.DDM_REMOVE:
4960 # remove the last disk
4961 device = instance.disks.pop()
4962 device_idx = len(instance.disks)
4963 for node, disk in device.ComputeNodeTree(instance.primary_node):
4964 self.cfg.SetDiskID(disk, node)
4965 if not self.rpc.call_blockdev_remove(node, disk):
4966 self.proc.LogWarning("Could not remove disk/%d on node %s,"
4967 " continuing anyway", device_idx, node)
4968 result.append(("disk/%d" % device_idx, "remove"))
4969 elif disk_op == constants.DDM_ADD:
4971 if instance.disk_template == constants.DT_FILE:
4972 file_driver, file_path = instance.disks[0].logical_id
4973 file_path = os.path.dirname(file_path)
4975 file_driver = file_path = None
4976 disk_idx_base = len(instance.disks)
4977 new_disk = _GenerateDiskTemplate(self,
4978 instance.disk_template,
4979 instance, instance.primary_node,
4980 instance.secondary_nodes,
4985 new_disk.mode = disk_dict['mode']
4986 instance.disks.append(new_disk)
4987 info = _GetInstanceInfoText(instance)
4989 logging.info("Creating volume %s for instance %s",
4990 new_disk.iv_name, instance.name)
4991 # Note: this needs to be kept in sync with _CreateDisks
4993 for secondary_node in instance.secondary_nodes:
4994 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
4995 new_disk, False, info):
4996 self.LogWarning("Failed to create volume %s (%s) on"
4997 " secondary node %s!",
4998 new_disk.iv_name, new_disk, secondary_node)
5000 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5001 instance, new_disk, info):
5002 self.LogWarning("Failed to create volume %s on primary!",
5004 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5005 (new_disk.size, new_disk.mode)))
5007 # change a given disk
5008 instance.disks[disk_op].mode = disk_dict['mode']
5009 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5011 for nic_op, nic_dict in self.op.nics:
5012 if nic_op == constants.DDM_REMOVE:
5013 # remove the last nic
5014 del instance.nics[-1]
5015 result.append(("nic.%d" % len(instance.nics), "remove"))
5016 elif nic_op == constants.DDM_ADD:
5018 if 'mac' not in nic_dict:
5019 mac = constants.VALUE_GENERATE
5021 mac = nic_dict['mac']
5022 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5023 mac = self.cfg.GenerateMAC()
5024 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5025 bridge=nic_dict.get('bridge', None))
5026 instance.nics.append(new_nic)
5027 result.append(("nic.%d" % (len(instance.nics) - 1),
5028 "add:mac=%s,ip=%s,bridge=%s" %
5029 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5031 # change a given nic
5032 for key in 'mac', 'ip', 'bridge':
5034 setattr(instance.nics[nic_op], key, nic_dict[key])
5035 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5038 if self.op.hvparams:
5039 instance.hvparams = self.hv_new
5040 for key, val in self.op.hvparams.iteritems():
5041 result.append(("hv/%s" % key, val))
5044 if self.op.beparams:
5045 instance.beparams = self.be_inst
5046 for key, val in self.op.beparams.iteritems():
5047 result.append(("be/%s" % key, val))
5049 self.cfg.Update(instance)
5054 class LUQueryExports(NoHooksLU):
5055 """Query the exports list
5058 _OP_REQP = ['nodes']
5061 def ExpandNames(self):
5062 self.needed_locks = {}
5063 self.share_locks[locking.LEVEL_NODE] = 1
5064 if not self.op.nodes:
5065 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5067 self.needed_locks[locking.LEVEL_NODE] = \
5068 _GetWantedNodes(self, self.op.nodes)
5070 def CheckPrereq(self):
5071 """Check prerequisites.
5074 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5076 def Exec(self, feedback_fn):
5077 """Compute the list of all the exported system images.
5080 @return: a dictionary with the structure node->(export-list)
5081 where export-list is a list of the instances exported on
5085 return self.rpc.call_export_list(self.nodes)
5088 class LUExportInstance(LogicalUnit):
5089 """Export an instance to an image in the cluster.
5092 HPATH = "instance-export"
5093 HTYPE = constants.HTYPE_INSTANCE
5094 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5097 def ExpandNames(self):
5098 self._ExpandAndLockInstance()
5099 # FIXME: lock only instance primary and destination node
5101 # Sad but true, for now we have do lock all nodes, as we don't know where
5102 # the previous export might be, and and in this LU we search for it and
5103 # remove it from its current node. In the future we could fix this by:
5104 # - making a tasklet to search (share-lock all), then create the new one,
5105 # then one to remove, after
5106 # - removing the removal operation altoghether
5107 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5109 def DeclareLocks(self, level):
5110 """Last minute lock declaration."""
5111 # All nodes are locked anyway, so nothing to do here.
5113 def BuildHooksEnv(self):
5116 This will run on the master, primary node and target node.
5120 "EXPORT_NODE": self.op.target_node,
5121 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5123 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5124 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5125 self.op.target_node]
5128 def CheckPrereq(self):
5129 """Check prerequisites.
5131 This checks that the instance and node names are valid.
5134 instance_name = self.op.instance_name
5135 self.instance = self.cfg.GetInstanceInfo(instance_name)
5136 assert self.instance is not None, \
5137 "Cannot retrieve locked instance %s" % self.op.instance_name
5139 self.dst_node = self.cfg.GetNodeInfo(
5140 self.cfg.ExpandNodeName(self.op.target_node))
5142 assert self.dst_node is not None, \
5143 "Cannot retrieve locked node %s" % self.op.target_node
5145 # instance disk type verification
5146 for disk in self.instance.disks:
5147 if disk.dev_type == constants.LD_FILE:
5148 raise errors.OpPrereqError("Export not supported for instances with"
5149 " file-based disks")
5151 def Exec(self, feedback_fn):
5152 """Export an instance to an image in the cluster.
5155 instance = self.instance
5156 dst_node = self.dst_node
5157 src_node = instance.primary_node
5158 if self.op.shutdown:
5159 # shutdown the instance, but not the disks
5160 if not self.rpc.call_instance_shutdown(src_node, instance):
5161 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5162 (instance.name, src_node))
5164 vgname = self.cfg.GetVGName()
5169 for disk in instance.disks:
5170 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5171 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5173 if not new_dev_name:
5174 self.LogWarning("Could not snapshot block device %s on node %s",
5175 disk.logical_id[1], src_node)
5176 snap_disks.append(False)
5178 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5179 logical_id=(vgname, new_dev_name),
5180 physical_id=(vgname, new_dev_name),
5181 iv_name=disk.iv_name)
5182 snap_disks.append(new_dev)
5185 if self.op.shutdown and instance.status == "up":
5186 if not self.rpc.call_instance_start(src_node, instance, None):
5187 _ShutdownInstanceDisks(self, instance)
5188 raise errors.OpExecError("Could not start instance")
5190 # TODO: check for size
5192 cluster_name = self.cfg.GetClusterName()
5193 for idx, dev in enumerate(snap_disks):
5195 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5196 instance, cluster_name, idx):
5197 self.LogWarning("Could not export block device %s from node %s to"
5198 " node %s", dev.logical_id[1], src_node,
5200 if not self.rpc.call_blockdev_remove(src_node, dev):
5201 self.LogWarning("Could not remove snapshot block device %s from node"
5202 " %s", dev.logical_id[1], src_node)
5204 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5205 self.LogWarning("Could not finalize export for instance %s on node %s",
5206 instance.name, dst_node.name)
5208 nodelist = self.cfg.GetNodeList()
5209 nodelist.remove(dst_node.name)
5211 # on one-node clusters nodelist will be empty after the removal
5212 # if we proceed the backup would be removed because OpQueryExports
5213 # substitutes an empty list with the full cluster node list.
5215 exportlist = self.rpc.call_export_list(nodelist)
5216 for node in exportlist:
5217 if instance.name in exportlist[node]:
5218 if not self.rpc.call_export_remove(node, instance.name):
5219 self.LogWarning("Could not remove older export for instance %s"
5220 " on node %s", instance.name, node)
5223 class LURemoveExport(NoHooksLU):
5224 """Remove exports related to the named instance.
5227 _OP_REQP = ["instance_name"]
5230 def ExpandNames(self):
5231 self.needed_locks = {}
5232 # We need all nodes to be locked in order for RemoveExport to work, but we
5233 # don't need to lock the instance itself, as nothing will happen to it (and
5234 # we can remove exports also for a removed instance)
5235 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5237 def CheckPrereq(self):
5238 """Check prerequisites.
5242 def Exec(self, feedback_fn):
5243 """Remove any export.
5246 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5247 # If the instance was not found we'll try with the name that was passed in.
5248 # This will only work if it was an FQDN, though.
5250 if not instance_name:
5252 instance_name = self.op.instance_name
5254 exportlist = self.rpc.call_export_list(self.acquired_locks[
5255 locking.LEVEL_NODE])
5257 for node in exportlist:
5258 if instance_name in exportlist[node]:
5260 if not self.rpc.call_export_remove(node, instance_name):
5261 logging.error("Could not remove export for instance %s"
5262 " on node %s", instance_name, node)
5264 if fqdn_warn and not found:
5265 feedback_fn("Export not found. If trying to remove an export belonging"
5266 " to a deleted instance please use its Fully Qualified"
5270 class TagsLU(NoHooksLU):
5273 This is an abstract class which is the parent of all the other tags LUs.
5277 def ExpandNames(self):
5278 self.needed_locks = {}
5279 if self.op.kind == constants.TAG_NODE:
5280 name = self.cfg.ExpandNodeName(self.op.name)
5282 raise errors.OpPrereqError("Invalid node name (%s)" %
5285 self.needed_locks[locking.LEVEL_NODE] = name
5286 elif self.op.kind == constants.TAG_INSTANCE:
5287 name = self.cfg.ExpandInstanceName(self.op.name)
5289 raise errors.OpPrereqError("Invalid instance name (%s)" %
5292 self.needed_locks[locking.LEVEL_INSTANCE] = name
5294 def CheckPrereq(self):
5295 """Check prerequisites.
5298 if self.op.kind == constants.TAG_CLUSTER:
5299 self.target = self.cfg.GetClusterInfo()
5300 elif self.op.kind == constants.TAG_NODE:
5301 self.target = self.cfg.GetNodeInfo(self.op.name)
5302 elif self.op.kind == constants.TAG_INSTANCE:
5303 self.target = self.cfg.GetInstanceInfo(self.op.name)
5305 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5309 class LUGetTags(TagsLU):
5310 """Returns the tags of a given object.
5313 _OP_REQP = ["kind", "name"]
5316 def Exec(self, feedback_fn):
5317 """Returns the tag list.
5320 return list(self.target.GetTags())
5323 class LUSearchTags(NoHooksLU):
5324 """Searches the tags for a given pattern.
5327 _OP_REQP = ["pattern"]
5330 def ExpandNames(self):
5331 self.needed_locks = {}
5333 def CheckPrereq(self):
5334 """Check prerequisites.
5336 This checks the pattern passed for validity by compiling it.
5340 self.re = re.compile(self.op.pattern)
5341 except re.error, err:
5342 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5343 (self.op.pattern, err))
5345 def Exec(self, feedback_fn):
5346 """Returns the tag list.
5350 tgts = [("/cluster", cfg.GetClusterInfo())]
5351 ilist = cfg.GetAllInstancesInfo().values()
5352 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5353 nlist = cfg.GetAllNodesInfo().values()
5354 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5356 for path, target in tgts:
5357 for tag in target.GetTags():
5358 if self.re.search(tag):
5359 results.append((path, tag))
5363 class LUAddTags(TagsLU):
5364 """Sets a tag on a given object.
5367 _OP_REQP = ["kind", "name", "tags"]
5370 def CheckPrereq(self):
5371 """Check prerequisites.
5373 This checks the type and length of the tag name and value.
5376 TagsLU.CheckPrereq(self)
5377 for tag in self.op.tags:
5378 objects.TaggableObject.ValidateTag(tag)
5380 def Exec(self, feedback_fn):
5385 for tag in self.op.tags:
5386 self.target.AddTag(tag)
5387 except errors.TagError, err:
5388 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5390 self.cfg.Update(self.target)
5391 except errors.ConfigurationError:
5392 raise errors.OpRetryError("There has been a modification to the"
5393 " config file and the operation has been"
5394 " aborted. Please retry.")
5397 class LUDelTags(TagsLU):
5398 """Delete a list of tags from a given object.
5401 _OP_REQP = ["kind", "name", "tags"]
5404 def CheckPrereq(self):
5405 """Check prerequisites.
5407 This checks that we have the given tag.
5410 TagsLU.CheckPrereq(self)
5411 for tag in self.op.tags:
5412 objects.TaggableObject.ValidateTag(tag)
5413 del_tags = frozenset(self.op.tags)
5414 cur_tags = self.target.GetTags()
5415 if not del_tags <= cur_tags:
5416 diff_tags = del_tags - cur_tags
5417 diff_names = ["'%s'" % tag for tag in diff_tags]
5419 raise errors.OpPrereqError("Tag(s) %s not found" %
5420 (",".join(diff_names)))
5422 def Exec(self, feedback_fn):
5423 """Remove the tag from the object.
5426 for tag in self.op.tags:
5427 self.target.RemoveTag(tag)
5429 self.cfg.Update(self.target)
5430 except errors.ConfigurationError:
5431 raise errors.OpRetryError("There has been a modification to the"
5432 " config file and the operation has been"
5433 " aborted. Please retry.")
5436 class LUTestDelay(NoHooksLU):
5437 """Sleep for a specified amount of time.
5439 This LU sleeps on the master and/or nodes for a specified amount of
5443 _OP_REQP = ["duration", "on_master", "on_nodes"]
5446 def ExpandNames(self):
5447 """Expand names and set required locks.
5449 This expands the node list, if any.
5452 self.needed_locks = {}
5453 if self.op.on_nodes:
5454 # _GetWantedNodes can be used here, but is not always appropriate to use
5455 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5457 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5458 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5460 def CheckPrereq(self):
5461 """Check prerequisites.
5465 def Exec(self, feedback_fn):
5466 """Do the actual sleep.
5469 if self.op.on_master:
5470 if not utils.TestDelay(self.op.duration):
5471 raise errors.OpExecError("Error during master delay test")
5472 if self.op.on_nodes:
5473 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5475 raise errors.OpExecError("Complete failure from rpc call")
5476 for node, node_result in result.items():
5478 raise errors.OpExecError("Failure during rpc call to node %s,"
5479 " result: %s" % (node, node_result))
5482 class IAllocator(object):
5483 """IAllocator framework.
5485 An IAllocator instance has three sets of attributes:
5486 - cfg that is needed to query the cluster
5487 - input data (all members of the _KEYS class attribute are required)
5488 - four buffer attributes (in|out_data|text), that represent the
5489 input (to the external script) in text and data structure format,
5490 and the output from it, again in two formats
5491 - the result variables from the script (success, info, nodes) for
5496 "mem_size", "disks", "disk_template",
5497 "os", "tags", "nics", "vcpus", "hypervisor",
5503 def __init__(self, lu, mode, name, **kwargs):
5505 # init buffer variables
5506 self.in_text = self.out_text = self.in_data = self.out_data = None
5507 # init all input fields so that pylint is happy
5510 self.mem_size = self.disks = self.disk_template = None
5511 self.os = self.tags = self.nics = self.vcpus = None
5512 self.relocate_from = None
5514 self.required_nodes = None
5515 # init result fields
5516 self.success = self.info = self.nodes = None
5517 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5518 keyset = self._ALLO_KEYS
5519 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5520 keyset = self._RELO_KEYS
5522 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5523 " IAllocator" % self.mode)
5525 if key not in keyset:
5526 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5527 " IAllocator" % key)
5528 setattr(self, key, kwargs[key])
5530 if key not in kwargs:
5531 raise errors.ProgrammerError("Missing input parameter '%s' to"
5532 " IAllocator" % key)
5533 self._BuildInputData()
5535 def _ComputeClusterData(self):
5536 """Compute the generic allocator input data.
5538 This is the data that is independent of the actual operation.
5542 cluster_info = cfg.GetClusterInfo()
5546 "cluster_name": cfg.GetClusterName(),
5547 "cluster_tags": list(cluster_info.GetTags()),
5548 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5549 # we don't have job IDs
5551 iinfo = cfg.GetAllInstancesInfo().values()
5552 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5556 node_list = cfg.GetNodeList()
5558 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5559 hypervisor = self.hypervisor
5560 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5561 hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5563 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5565 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5566 cluster_info.enabled_hypervisors)
5567 for nname in node_list:
5568 ninfo = cfg.GetNodeInfo(nname)
5569 if nname not in node_data or not isinstance(node_data[nname], dict):
5570 raise errors.OpExecError("Can't get data for node %s" % nname)
5571 remote_info = node_data[nname]
5572 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5573 'vg_size', 'vg_free', 'cpu_total']:
5574 if attr not in remote_info:
5575 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5578 remote_info[attr] = int(remote_info[attr])
5579 except ValueError, err:
5580 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5581 " %s" % (nname, attr, str(err)))
5582 # compute memory used by primary instances
5583 i_p_mem = i_p_up_mem = 0
5584 for iinfo, beinfo in i_list:
5585 if iinfo.primary_node == nname:
5586 i_p_mem += beinfo[constants.BE_MEMORY]
5587 if iinfo.name not in node_iinfo[nname]:
5590 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5591 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5592 remote_info['memory_free'] -= max(0, i_mem_diff)
5594 if iinfo.status == "up":
5595 i_p_up_mem += beinfo[constants.BE_MEMORY]
5597 # compute memory used by instances
5599 "tags": list(ninfo.GetTags()),
5600 "total_memory": remote_info['memory_total'],
5601 "reserved_memory": remote_info['memory_dom0'],
5602 "free_memory": remote_info['memory_free'],
5603 "i_pri_memory": i_p_mem,
5604 "i_pri_up_memory": i_p_up_mem,
5605 "total_disk": remote_info['vg_size'],
5606 "free_disk": remote_info['vg_free'],
5607 "primary_ip": ninfo.primary_ip,
5608 "secondary_ip": ninfo.secondary_ip,
5609 "total_cpus": remote_info['cpu_total'],
5611 node_results[nname] = pnr
5612 data["nodes"] = node_results
5616 for iinfo, beinfo in i_list:
5617 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5618 for n in iinfo.nics]
5620 "tags": list(iinfo.GetTags()),
5621 "should_run": iinfo.status == "up",
5622 "vcpus": beinfo[constants.BE_VCPUS],
5623 "memory": beinfo[constants.BE_MEMORY],
5625 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5627 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5628 "disk_template": iinfo.disk_template,
5629 "hypervisor": iinfo.hypervisor,
5631 instance_data[iinfo.name] = pir
5633 data["instances"] = instance_data
5637 def _AddNewInstance(self):
5638 """Add new instance data to allocator structure.
5640 This in combination with _AllocatorGetClusterData will create the
5641 correct structure needed as input for the allocator.
5643 The checks for the completeness of the opcode must have already been
5648 if len(self.disks) != 2:
5649 raise errors.OpExecError("Only two-disk configurations supported")
5651 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5653 if self.disk_template in constants.DTS_NET_MIRROR:
5654 self.required_nodes = 2
5656 self.required_nodes = 1
5660 "disk_template": self.disk_template,
5663 "vcpus": self.vcpus,
5664 "memory": self.mem_size,
5665 "disks": self.disks,
5666 "disk_space_total": disk_space,
5668 "required_nodes": self.required_nodes,
5670 data["request"] = request
5672 def _AddRelocateInstance(self):
5673 """Add relocate instance data to allocator structure.
5675 This in combination with _IAllocatorGetClusterData will create the
5676 correct structure needed as input for the allocator.
5678 The checks for the completeness of the opcode must have already been
5682 instance = self.lu.cfg.GetInstanceInfo(self.name)
5683 if instance is None:
5684 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5685 " IAllocator" % self.name)
5687 if instance.disk_template not in constants.DTS_NET_MIRROR:
5688 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5690 if len(instance.secondary_nodes) != 1:
5691 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5693 self.required_nodes = 1
5694 disk_sizes = [{'size': disk.size} for disk in instance.disks]
5695 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5700 "disk_space_total": disk_space,
5701 "required_nodes": self.required_nodes,
5702 "relocate_from": self.relocate_from,
5704 self.in_data["request"] = request
5706 def _BuildInputData(self):
5707 """Build input data structures.
5710 self._ComputeClusterData()
5712 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5713 self._AddNewInstance()
5715 self._AddRelocateInstance()
5717 self.in_text = serializer.Dump(self.in_data)
5719 def Run(self, name, validate=True, call_fn=None):
5720 """Run an instance allocator and return the results.
5724 call_fn = self.lu.rpc.call_iallocator_runner
5727 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5729 if not isinstance(result, (list, tuple)) or len(result) != 4:
5730 raise errors.OpExecError("Invalid result from master iallocator runner")
5732 rcode, stdout, stderr, fail = result
5734 if rcode == constants.IARUN_NOTFOUND:
5735 raise errors.OpExecError("Can't find allocator '%s'" % name)
5736 elif rcode == constants.IARUN_FAILURE:
5737 raise errors.OpExecError("Instance allocator call failed: %s,"
5738 " output: %s" % (fail, stdout+stderr))
5739 self.out_text = stdout
5741 self._ValidateResult()
5743 def _ValidateResult(self):
5744 """Process the allocator results.
5746 This will process and if successful save the result in
5747 self.out_data and the other parameters.
5751 rdict = serializer.Load(self.out_text)
5752 except Exception, err:
5753 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5755 if not isinstance(rdict, dict):
5756 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5758 for key in "success", "info", "nodes":
5759 if key not in rdict:
5760 raise errors.OpExecError("Can't parse iallocator results:"
5761 " missing key '%s'" % key)
5762 setattr(self, key, rdict[key])
5764 if not isinstance(rdict["nodes"], list):
5765 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5767 self.out_data = rdict
5770 class LUTestAllocator(NoHooksLU):
5771 """Run allocator tests.
5773 This LU runs the allocator tests
5776 _OP_REQP = ["direction", "mode", "name"]
5778 def CheckPrereq(self):
5779 """Check prerequisites.
5781 This checks the opcode parameters depending on the director and mode test.
5784 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5785 for attr in ["name", "mem_size", "disks", "disk_template",
5786 "os", "tags", "nics", "vcpus"]:
5787 if not hasattr(self.op, attr):
5788 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5790 iname = self.cfg.ExpandInstanceName(self.op.name)
5791 if iname is not None:
5792 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5794 if not isinstance(self.op.nics, list):
5795 raise errors.OpPrereqError("Invalid parameter 'nics'")
5796 for row in self.op.nics:
5797 if (not isinstance(row, dict) or
5800 "bridge" not in row):
5801 raise errors.OpPrereqError("Invalid contents of the"
5802 " 'nics' parameter")
5803 if not isinstance(self.op.disks, list):
5804 raise errors.OpPrereqError("Invalid parameter 'disks'")
5805 if len(self.op.disks) != 2:
5806 raise errors.OpPrereqError("Only two-disk configurations supported")
5807 for row in self.op.disks:
5808 if (not isinstance(row, dict) or
5809 "size" not in row or
5810 not isinstance(row["size"], int) or
5811 "mode" not in row or
5812 row["mode"] not in ['r', 'w']):
5813 raise errors.OpPrereqError("Invalid contents of the"
5814 " 'disks' parameter")
5815 if self.op.hypervisor is None:
5816 self.op.hypervisor = self.cfg.GetHypervisorType()
5817 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5818 if not hasattr(self.op, "name"):
5819 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5820 fname = self.cfg.ExpandInstanceName(self.op.name)
5822 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5824 self.op.name = fname
5825 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5827 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5830 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5831 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5832 raise errors.OpPrereqError("Missing allocator name")
5833 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5834 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5837 def Exec(self, feedback_fn):
5838 """Run the allocator test.
5841 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5842 ial = IAllocator(self,
5845 mem_size=self.op.mem_size,
5846 disks=self.op.disks,
5847 disk_template=self.op.disk_template,
5851 vcpus=self.op.vcpus,
5852 hypervisor=self.op.hypervisor,
5855 ial = IAllocator(self,
5858 relocate_from=list(self.relocate_from),
5861 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5862 result = ial.in_text
5864 ial.Run(self.op.allocator, validate=False)
5865 result = ial.out_text