4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the nodes is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445 memory, vcpus, nics):
446 """Builds instance related env variables for hooks
448 This builds the hook environment from individual variables.
451 @param name: the name of the instance
452 @type primary_node: string
453 @param primary_node: the name of the instance's primary node
454 @type secondary_nodes: list
455 @param secondary_nodes: list of secondary nodes as strings
456 @type os_type: string
457 @param os_type: the name of the instance's OS
458 @type status: boolean
459 @param status: the should_run status of the instance
461 @param memory: the memory size of the instance
463 @param vcpus: the count of VCPUs the instance has
465 @param nics: list of tuples (ip, bridge, mac) representing
466 the NICs the instance has
468 @return: the hook environment for this instance
477 "INSTANCE_NAME": name,
478 "INSTANCE_PRIMARY": primary_node,
479 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480 "INSTANCE_OS_TYPE": os_type,
481 "INSTANCE_STATUS": str_status,
482 "INSTANCE_MEMORY": memory,
483 "INSTANCE_VCPUS": vcpus,
487 nic_count = len(nics)
488 for idx, (ip, bridge, mac) in enumerate(nics):
491 env["INSTANCE_NIC%d_IP" % idx] = ip
492 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
497 env["INSTANCE_NIC_COUNT"] = nic_count
502 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503 """Builds instance related env variables for hooks from an object.
505 @type lu: L{LogicalUnit}
506 @param lu: the logical unit on whose behalf we execute
507 @type instance: L{objects.Instance}
508 @param instance: the instance for which we should build the
511 @param override: dictionary with key/values that will override
514 @return: the hook environment dictionary
517 bep = lu.cfg.GetClusterInfo().FillBE(instance)
519 'name': instance.name,
520 'primary_node': instance.primary_node,
521 'secondary_nodes': instance.secondary_nodes,
522 'os_type': instance.os,
523 'status': instance.admin_up,
524 'memory': bep[constants.BE_MEMORY],
525 'vcpus': bep[constants.BE_VCPUS],
526 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
529 args.update(override)
530 return _BuildInstanceHookEnv(**args)
533 def _AdjustCandidatePool(lu):
534 """Adjust the candidate pool after node operations.
537 mod_list = lu.cfg.MaintainCandidatePool()
539 lu.LogInfo("Promoted nodes to master candidate role: %s",
540 ", ".join(node.name for node in mod_list))
541 for name in mod_list:
542 lu.context.ReaddNode(name)
543 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
545 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
549 def _CheckInstanceBridgesExist(lu, instance):
550 """Check that the brigdes needed by an instance exist.
553 # check bridges existance
554 brlist = [nic.bridge for nic in instance.nics]
555 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
558 raise errors.OpPrereqError("One or more target bridges %s does not"
559 " exist on destination node '%s'" %
560 (brlist, instance.primary_node))
563 class LUDestroyCluster(NoHooksLU):
564 """Logical unit for destroying the cluster.
569 def CheckPrereq(self):
570 """Check prerequisites.
572 This checks whether the cluster is empty.
574 Any errors are signalled by raising errors.OpPrereqError.
577 master = self.cfg.GetMasterNode()
579 nodelist = self.cfg.GetNodeList()
580 if len(nodelist) != 1 or nodelist[0] != master:
581 raise errors.OpPrereqError("There are still %d node(s) in"
582 " this cluster." % (len(nodelist) - 1))
583 instancelist = self.cfg.GetInstanceList()
585 raise errors.OpPrereqError("There are still %d instance(s) in"
586 " this cluster." % len(instancelist))
588 def Exec(self, feedback_fn):
589 """Destroys the cluster.
592 master = self.cfg.GetMasterNode()
593 result = self.rpc.call_node_stop_master(master, False)
596 raise errors.OpExecError("Could not disable the master role")
597 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598 utils.CreateBackup(priv_key)
599 utils.CreateBackup(pub_key)
603 class LUVerifyCluster(LogicalUnit):
604 """Verifies the cluster status.
607 HPATH = "cluster-verify"
608 HTYPE = constants.HTYPE_CLUSTER
609 _OP_REQP = ["skip_checks"]
612 def ExpandNames(self):
613 self.needed_locks = {
614 locking.LEVEL_NODE: locking.ALL_SET,
615 locking.LEVEL_INSTANCE: locking.ALL_SET,
617 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
619 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620 node_result, feedback_fn, master_files,
622 """Run multiple tests against a node.
626 - compares ganeti version
627 - checks vg existance and size > 20G
628 - checks config file checksum
629 - checks ssh to other nodes
631 @type nodeinfo: L{objects.Node}
632 @param nodeinfo: the node to check
633 @param file_list: required list of files
634 @param local_cksum: dictionary of local files and their checksums
635 @param node_result: the results from the node
636 @param feedback_fn: function used to accumulate results
637 @param master_files: list of files that only masters should have
638 @param drbd_map: the useddrbd minors for this node, in
639 form of minor: (instance, must_exist) which correspond to instances
640 and their running status
645 # main result, node_result should be a non-empty dict
646 if not node_result or not isinstance(node_result, dict):
647 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
650 # compares ganeti version
651 local_version = constants.PROTOCOL_VERSION
652 remote_version = node_result.get('version', None)
653 if not remote_version:
654 feedback_fn(" - ERROR: connection to %s failed" % (node))
657 if local_version != remote_version:
658 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
659 (local_version, node, remote_version))
662 # checks vg existance and size > 20G
665 vglist = node_result.get(constants.NV_VGLIST, None)
667 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
671 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
672 constants.MIN_VG_SIZE)
674 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
677 # checks config file checksum
679 remote_cksum = node_result.get(constants.NV_FILELIST, None)
680 if not isinstance(remote_cksum, dict):
682 feedback_fn(" - ERROR: node hasn't returned file checksum data")
684 for file_name in file_list:
685 node_is_mc = nodeinfo.master_candidate
686 must_have_file = file_name not in master_files
687 if file_name not in remote_cksum:
688 if node_is_mc or must_have_file:
690 feedback_fn(" - ERROR: file '%s' missing" % file_name)
691 elif remote_cksum[file_name] != local_cksum[file_name]:
692 if node_is_mc or must_have_file:
694 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
696 # not candidate and this is not a must-have file
698 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
701 # all good, except non-master/non-must have combination
702 if not node_is_mc and not must_have_file:
703 feedback_fn(" - ERROR: file '%s' should not exist on non master"
704 " candidates" % file_name)
708 if constants.NV_NODELIST not in node_result:
710 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
712 if node_result[constants.NV_NODELIST]:
714 for node in node_result[constants.NV_NODELIST]:
715 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
716 (node, node_result[constants.NV_NODELIST][node]))
718 if constants.NV_NODENETTEST not in node_result:
720 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
722 if node_result[constants.NV_NODENETTEST]:
724 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
726 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
727 (node, node_result[constants.NV_NODENETTEST][node]))
729 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
730 if isinstance(hyp_result, dict):
731 for hv_name, hv_result in hyp_result.iteritems():
732 if hv_result is not None:
733 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
734 (hv_name, hv_result))
736 # check used drbd list
737 used_minors = node_result.get(constants.NV_DRBDLIST, [])
738 for minor, (iname, must_exist) in drbd_map.items():
739 if minor not in used_minors and must_exist:
740 feedback_fn(" - ERROR: drbd minor %d of instance %s is not active" %
743 for minor in used_minors:
744 if minor not in drbd_map:
745 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % minor)
750 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
751 node_instance, feedback_fn, n_offline):
752 """Verify an instance.
754 This function checks to see if the required block devices are
755 available on the instance's node.
760 node_current = instanceconfig.primary_node
763 instanceconfig.MapLVsByNode(node_vol_should)
765 for node in node_vol_should:
766 if node in n_offline:
767 # ignore missing volumes on offline nodes
769 for volume in node_vol_should[node]:
770 if node not in node_vol_is or volume not in node_vol_is[node]:
771 feedback_fn(" - ERROR: volume %s missing on node %s" %
775 if instanceconfig.admin_up:
776 if ((node_current not in node_instance or
777 not instance in node_instance[node_current]) and
778 node_current not in n_offline):
779 feedback_fn(" - ERROR: instance %s not running on node %s" %
780 (instance, node_current))
783 for node in node_instance:
784 if (not node == node_current):
785 if instance in node_instance[node]:
786 feedback_fn(" - ERROR: instance %s should not run on node %s" %
792 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
793 """Verify if there are any unknown volumes in the cluster.
795 The .os, .swap and backup volumes are ignored. All other volumes are
801 for node in node_vol_is:
802 for volume in node_vol_is[node]:
803 if node not in node_vol_should or volume not in node_vol_should[node]:
804 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
809 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
810 """Verify the list of running instances.
812 This checks what instances are running but unknown to the cluster.
816 for node in node_instance:
817 for runninginstance in node_instance[node]:
818 if runninginstance not in instancelist:
819 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
820 (runninginstance, node))
824 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
825 """Verify N+1 Memory Resilience.
827 Check that if one single node dies we can still start all the instances it
833 for node, nodeinfo in node_info.iteritems():
834 # This code checks that every node which is now listed as secondary has
835 # enough memory to host all instances it is supposed to should a single
836 # other node in the cluster fail.
837 # FIXME: not ready for failover to an arbitrary node
838 # FIXME: does not support file-backed instances
839 # WARNING: we currently take into account down instances as well as up
840 # ones, considering that even if they're down someone might want to start
841 # them even in the event of a node failure.
842 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
844 for instance in instances:
845 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
846 if bep[constants.BE_AUTO_BALANCE]:
847 needed_mem += bep[constants.BE_MEMORY]
848 if nodeinfo['mfree'] < needed_mem:
849 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
850 " failovers should node %s fail" % (node, prinode))
854 def CheckPrereq(self):
855 """Check prerequisites.
857 Transform the list of checks we're going to skip into a set and check that
858 all its members are valid.
861 self.skip_set = frozenset(self.op.skip_checks)
862 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
863 raise errors.OpPrereqError("Invalid checks to be skipped specified")
865 def BuildHooksEnv(self):
868 Cluster-Verify hooks just rone in the post phase and their failure makes
869 the output be logged in the verify output and the verification to fail.
872 all_nodes = self.cfg.GetNodeList()
873 # TODO: populate the environment with useful information for verify hooks
875 return env, [], all_nodes
877 def Exec(self, feedback_fn):
878 """Verify integrity of cluster, performing various test on nodes.
882 feedback_fn("* Verifying global settings")
883 for msg in self.cfg.VerifyConfig():
884 feedback_fn(" - ERROR: %s" % msg)
886 vg_name = self.cfg.GetVGName()
887 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
888 nodelist = utils.NiceSort(self.cfg.GetNodeList())
889 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
890 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
891 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
892 for iname in instancelist)
893 i_non_redundant = [] # Non redundant instances
894 i_non_a_balanced = [] # Non auto-balanced instances
895 n_offline = [] # List of offline nodes
901 # FIXME: verify OS list
903 master_files = [constants.CLUSTER_CONF_FILE]
905 file_names = ssconf.SimpleStore().GetFileList()
906 file_names.append(constants.SSL_CERT_FILE)
907 file_names.append(constants.RAPI_CERT_FILE)
908 file_names.extend(master_files)
910 local_checksums = utils.FingerprintFiles(file_names)
912 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
913 node_verify_param = {
914 constants.NV_FILELIST: file_names,
915 constants.NV_NODELIST: [node.name for node in nodeinfo
916 if not node.offline],
917 constants.NV_HYPERVISOR: hypervisors,
918 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
919 node.secondary_ip) for node in nodeinfo
920 if not node.offline],
921 constants.NV_LVLIST: vg_name,
922 constants.NV_INSTANCELIST: hypervisors,
923 constants.NV_VGLIST: None,
924 constants.NV_VERSION: None,
925 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
926 constants.NV_DRBDLIST: None,
928 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
929 self.cfg.GetClusterName())
931 cluster = self.cfg.GetClusterInfo()
932 master_node = self.cfg.GetMasterNode()
933 all_drbd_map = self.cfg.ComputeDRBDMap()
935 for node_i in nodeinfo:
937 nresult = all_nvinfo[node].data
940 feedback_fn("* Skipping offline node %s" % (node,))
941 n_offline.append(node)
944 if node == master_node:
946 elif node_i.master_candidate:
947 ntype = "master candidate"
950 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
952 if all_nvinfo[node].failed or not isinstance(nresult, dict):
953 feedback_fn(" - ERROR: connection to %s failed" % (node,))
958 for minor, instance in all_drbd_map[node].items():
959 instance = instanceinfo[instance]
960 node_drbd[minor] = (instance.name, instance.admin_up)
961 result = self._VerifyNode(node_i, file_names, local_checksums,
962 nresult, feedback_fn, master_files,
966 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
967 if isinstance(lvdata, basestring):
968 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
969 (node, lvdata.encode('string_escape')))
971 node_volume[node] = {}
972 elif not isinstance(lvdata, dict):
973 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
977 node_volume[node] = lvdata
980 idata = nresult.get(constants.NV_INSTANCELIST, None)
981 if not isinstance(idata, list):
982 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
987 node_instance[node] = idata
990 nodeinfo = nresult.get(constants.NV_HVINFO, None)
991 if not isinstance(nodeinfo, dict):
992 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
998 "mfree": int(nodeinfo['memory_free']),
999 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1002 # dictionary holding all instances this node is secondary for,
1003 # grouped by their primary node. Each key is a cluster node, and each
1004 # value is a list of instances which have the key as primary and the
1005 # current node as secondary. this is handy to calculate N+1 memory
1006 # availability if you can only failover from a primary to its
1008 "sinst-by-pnode": {},
1011 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
1015 node_vol_should = {}
1017 for instance in instancelist:
1018 feedback_fn("* Verifying instance %s" % instance)
1019 inst_config = instanceinfo[instance]
1020 result = self._VerifyInstance(instance, inst_config, node_volume,
1021 node_instance, feedback_fn, n_offline)
1023 inst_nodes_offline = []
1025 inst_config.MapLVsByNode(node_vol_should)
1027 instance_cfg[instance] = inst_config
1029 pnode = inst_config.primary_node
1030 if pnode in node_info:
1031 node_info[pnode]['pinst'].append(instance)
1032 elif pnode not in n_offline:
1033 feedback_fn(" - ERROR: instance %s, connection to primary node"
1034 " %s failed" % (instance, pnode))
1037 if pnode in n_offline:
1038 inst_nodes_offline.append(pnode)
1040 # If the instance is non-redundant we cannot survive losing its primary
1041 # node, so we are not N+1 compliant. On the other hand we have no disk
1042 # templates with more than one secondary so that situation is not well
1044 # FIXME: does not support file-backed instances
1045 if len(inst_config.secondary_nodes) == 0:
1046 i_non_redundant.append(instance)
1047 elif len(inst_config.secondary_nodes) > 1:
1048 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1051 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1052 i_non_a_balanced.append(instance)
1054 for snode in inst_config.secondary_nodes:
1055 if snode in node_info:
1056 node_info[snode]['sinst'].append(instance)
1057 if pnode not in node_info[snode]['sinst-by-pnode']:
1058 node_info[snode]['sinst-by-pnode'][pnode] = []
1059 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1060 elif snode not in n_offline:
1061 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1062 " %s failed" % (instance, snode))
1064 if snode in n_offline:
1065 inst_nodes_offline.append(snode)
1067 if inst_nodes_offline:
1068 # warn that the instance lives on offline nodes, and set bad=True
1069 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1070 ", ".join(inst_nodes_offline))
1073 feedback_fn("* Verifying orphan volumes")
1074 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1078 feedback_fn("* Verifying remaining instances")
1079 result = self._VerifyOrphanInstances(instancelist, node_instance,
1083 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1084 feedback_fn("* Verifying N+1 Memory redundancy")
1085 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1088 feedback_fn("* Other Notes")
1090 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1091 % len(i_non_redundant))
1093 if i_non_a_balanced:
1094 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1095 % len(i_non_a_balanced))
1098 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1102 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1103 """Analize the post-hooks' result
1105 This method analyses the hook result, handles it, and sends some
1106 nicely-formatted feedback back to the user.
1108 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1109 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1110 @param hooks_results: the results of the multi-node hooks rpc call
1111 @param feedback_fn: function used send feedback back to the caller
1112 @param lu_result: previous Exec result
1113 @return: the new Exec result, based on the previous result
1117 # We only really run POST phase hooks, and are only interested in
1119 if phase == constants.HOOKS_PHASE_POST:
1120 # Used to change hooks' output to proper indentation
1121 indent_re = re.compile('^', re.M)
1122 feedback_fn("* Hooks Results")
1123 if not hooks_results:
1124 feedback_fn(" - ERROR: general communication failure")
1127 for node_name in hooks_results:
1128 show_node_header = True
1129 res = hooks_results[node_name]
1130 if res.failed or res.data is False or not isinstance(res.data, list):
1132 # no need to warn or set fail return value
1134 feedback_fn(" Communication failure in hooks execution")
1137 for script, hkr, output in res.data:
1138 if hkr == constants.HKR_FAIL:
1139 # The node header is only shown once, if there are
1140 # failing hooks on that node
1141 if show_node_header:
1142 feedback_fn(" Node %s:" % node_name)
1143 show_node_header = False
1144 feedback_fn(" ERROR: Script %s failed, output:" % script)
1145 output = indent_re.sub(' ', output)
1146 feedback_fn("%s" % output)
1152 class LUVerifyDisks(NoHooksLU):
1153 """Verifies the cluster disks status.
1159 def ExpandNames(self):
1160 self.needed_locks = {
1161 locking.LEVEL_NODE: locking.ALL_SET,
1162 locking.LEVEL_INSTANCE: locking.ALL_SET,
1164 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1166 def CheckPrereq(self):
1167 """Check prerequisites.
1169 This has no prerequisites.
1174 def Exec(self, feedback_fn):
1175 """Verify integrity of cluster disks.
1178 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1180 vg_name = self.cfg.GetVGName()
1181 nodes = utils.NiceSort(self.cfg.GetNodeList())
1182 instances = [self.cfg.GetInstanceInfo(name)
1183 for name in self.cfg.GetInstanceList()]
1186 for inst in instances:
1188 if (not inst.admin_up or
1189 inst.disk_template not in constants.DTS_NET_MIRROR):
1191 inst.MapLVsByNode(inst_lvs)
1192 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1193 for node, vol_list in inst_lvs.iteritems():
1194 for vol in vol_list:
1195 nv_dict[(node, vol)] = inst
1200 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1205 lvs = node_lvs[node]
1208 self.LogWarning("Connection to node %s failed: %s" %
1212 if isinstance(lvs, basestring):
1213 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1214 res_nlvm[node] = lvs
1215 elif not isinstance(lvs, dict):
1216 logging.warning("Connection to node %s failed or invalid data"
1218 res_nodes.append(node)
1221 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1222 inst = nv_dict.pop((node, lv_name), None)
1223 if (not lv_online and inst is not None
1224 and inst.name not in res_instances):
1225 res_instances.append(inst.name)
1227 # any leftover items in nv_dict are missing LVs, let's arrange the
1229 for key, inst in nv_dict.iteritems():
1230 if inst.name not in res_missing:
1231 res_missing[inst.name] = []
1232 res_missing[inst.name].append(key)
1237 class LURenameCluster(LogicalUnit):
1238 """Rename the cluster.
1241 HPATH = "cluster-rename"
1242 HTYPE = constants.HTYPE_CLUSTER
1245 def BuildHooksEnv(self):
1250 "OP_TARGET": self.cfg.GetClusterName(),
1251 "NEW_NAME": self.op.name,
1253 mn = self.cfg.GetMasterNode()
1254 return env, [mn], [mn]
1256 def CheckPrereq(self):
1257 """Verify that the passed name is a valid one.
1260 hostname = utils.HostInfo(self.op.name)
1262 new_name = hostname.name
1263 self.ip = new_ip = hostname.ip
1264 old_name = self.cfg.GetClusterName()
1265 old_ip = self.cfg.GetMasterIP()
1266 if new_name == old_name and new_ip == old_ip:
1267 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1268 " cluster has changed")
1269 if new_ip != old_ip:
1270 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1271 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1272 " reachable on the network. Aborting." %
1275 self.op.name = new_name
1277 def Exec(self, feedback_fn):
1278 """Rename the cluster.
1281 clustername = self.op.name
1284 # shutdown the master IP
1285 master = self.cfg.GetMasterNode()
1286 result = self.rpc.call_node_stop_master(master, False)
1287 if result.failed or not result.data:
1288 raise errors.OpExecError("Could not disable the master role")
1291 cluster = self.cfg.GetClusterInfo()
1292 cluster.cluster_name = clustername
1293 cluster.master_ip = ip
1294 self.cfg.Update(cluster)
1296 # update the known hosts file
1297 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1298 node_list = self.cfg.GetNodeList()
1300 node_list.remove(master)
1303 result = self.rpc.call_upload_file(node_list,
1304 constants.SSH_KNOWN_HOSTS_FILE)
1305 for to_node, to_result in result.iteritems():
1306 if to_result.failed or not to_result.data:
1307 logging.error("Copy of file %s to node %s failed",
1308 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1311 result = self.rpc.call_node_start_master(master, False)
1312 if result.failed or not result.data:
1313 self.LogWarning("Could not re-enable the master role on"
1314 " the master, please restart manually.")
1317 def _RecursiveCheckIfLVMBased(disk):
1318 """Check if the given disk or its children are lvm-based.
1320 @type disk: L{objects.Disk}
1321 @param disk: the disk to check
1323 @return: boolean indicating whether a LD_LV dev_type was found or not
1327 for chdisk in disk.children:
1328 if _RecursiveCheckIfLVMBased(chdisk):
1330 return disk.dev_type == constants.LD_LV
1333 class LUSetClusterParams(LogicalUnit):
1334 """Change the parameters of the cluster.
1337 HPATH = "cluster-modify"
1338 HTYPE = constants.HTYPE_CLUSTER
1342 def CheckParameters(self):
1346 if not hasattr(self.op, "candidate_pool_size"):
1347 self.op.candidate_pool_size = None
1348 if self.op.candidate_pool_size is not None:
1350 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1351 except ValueError, err:
1352 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1354 if self.op.candidate_pool_size < 1:
1355 raise errors.OpPrereqError("At least one master candidate needed")
1357 def ExpandNames(self):
1358 # FIXME: in the future maybe other cluster params won't require checking on
1359 # all nodes to be modified.
1360 self.needed_locks = {
1361 locking.LEVEL_NODE: locking.ALL_SET,
1363 self.share_locks[locking.LEVEL_NODE] = 1
1365 def BuildHooksEnv(self):
1370 "OP_TARGET": self.cfg.GetClusterName(),
1371 "NEW_VG_NAME": self.op.vg_name,
1373 mn = self.cfg.GetMasterNode()
1374 return env, [mn], [mn]
1376 def CheckPrereq(self):
1377 """Check prerequisites.
1379 This checks whether the given params don't conflict and
1380 if the given volume group is valid.
1383 # FIXME: This only works because there is only one parameter that can be
1384 # changed or removed.
1385 if self.op.vg_name is not None and not self.op.vg_name:
1386 instances = self.cfg.GetAllInstancesInfo().values()
1387 for inst in instances:
1388 for disk in inst.disks:
1389 if _RecursiveCheckIfLVMBased(disk):
1390 raise errors.OpPrereqError("Cannot disable lvm storage while"
1391 " lvm-based instances exist")
1393 node_list = self.acquired_locks[locking.LEVEL_NODE]
1395 # if vg_name not None, checks given volume group on all nodes
1397 vglist = self.rpc.call_vg_list(node_list)
1398 for node in node_list:
1399 if vglist[node].failed:
1400 # ignoring down node
1401 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1403 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1405 constants.MIN_VG_SIZE)
1407 raise errors.OpPrereqError("Error on node '%s': %s" %
1410 self.cluster = cluster = self.cfg.GetClusterInfo()
1411 # validate beparams changes
1412 if self.op.beparams:
1413 utils.CheckBEParams(self.op.beparams)
1414 self.new_beparams = cluster.FillDict(
1415 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1417 # hypervisor list/parameters
1418 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1419 if self.op.hvparams:
1420 if not isinstance(self.op.hvparams, dict):
1421 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1422 for hv_name, hv_dict in self.op.hvparams.items():
1423 if hv_name not in self.new_hvparams:
1424 self.new_hvparams[hv_name] = hv_dict
1426 self.new_hvparams[hv_name].update(hv_dict)
1428 if self.op.enabled_hypervisors is not None:
1429 self.hv_list = self.op.enabled_hypervisors
1431 self.hv_list = cluster.enabled_hypervisors
1433 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1434 # either the enabled list has changed, or the parameters have, validate
1435 for hv_name, hv_params in self.new_hvparams.items():
1436 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1437 (self.op.enabled_hypervisors and
1438 hv_name in self.op.enabled_hypervisors)):
1439 # either this is a new hypervisor, or its parameters have changed
1440 hv_class = hypervisor.GetHypervisor(hv_name)
1441 hv_class.CheckParameterSyntax(hv_params)
1442 _CheckHVParams(self, node_list, hv_name, hv_params)
1444 def Exec(self, feedback_fn):
1445 """Change the parameters of the cluster.
1448 if self.op.vg_name is not None:
1449 if self.op.vg_name != self.cfg.GetVGName():
1450 self.cfg.SetVGName(self.op.vg_name)
1452 feedback_fn("Cluster LVM configuration already in desired"
1453 " state, not changing")
1454 if self.op.hvparams:
1455 self.cluster.hvparams = self.new_hvparams
1456 if self.op.enabled_hypervisors is not None:
1457 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1458 if self.op.beparams:
1459 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1460 if self.op.candidate_pool_size is not None:
1461 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1463 self.cfg.Update(self.cluster)
1465 # we want to update nodes after the cluster so that if any errors
1466 # happen, we have recorded and saved the cluster info
1467 if self.op.candidate_pool_size is not None:
1468 _AdjustCandidatePool(self)
1471 class LURedistributeConfig(NoHooksLU):
1472 """Force the redistribution of cluster configuration.
1474 This is a very simple LU.
1480 def ExpandNames(self):
1481 self.needed_locks = {
1482 locking.LEVEL_NODE: locking.ALL_SET,
1484 self.share_locks[locking.LEVEL_NODE] = 1
1486 def CheckPrereq(self):
1487 """Check prerequisites.
1491 def Exec(self, feedback_fn):
1492 """Redistribute the configuration.
1495 self.cfg.Update(self.cfg.GetClusterInfo())
1498 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1499 """Sleep and poll for an instance's disk to sync.
1502 if not instance.disks:
1506 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1508 node = instance.primary_node
1510 for dev in instance.disks:
1511 lu.cfg.SetDiskID(dev, node)
1517 cumul_degraded = False
1518 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1519 if rstats.failed or not rstats.data:
1520 lu.LogWarning("Can't get any data from node %s", node)
1523 raise errors.RemoteError("Can't contact node %s for mirror data,"
1524 " aborting." % node)
1527 rstats = rstats.data
1529 for i, mstat in enumerate(rstats):
1531 lu.LogWarning("Can't compute data for node %s/%s",
1532 node, instance.disks[i].iv_name)
1534 # we ignore the ldisk parameter
1535 perc_done, est_time, is_degraded, _ = mstat
1536 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1537 if perc_done is not None:
1539 if est_time is not None:
1540 rem_time = "%d estimated seconds remaining" % est_time
1543 rem_time = "no time estimate"
1544 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1545 (instance.disks[i].iv_name, perc_done, rem_time))
1549 time.sleep(min(60, max_time))
1552 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1553 return not cumul_degraded
1556 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1557 """Check that mirrors are not degraded.
1559 The ldisk parameter, if True, will change the test from the
1560 is_degraded attribute (which represents overall non-ok status for
1561 the device(s)) to the ldisk (representing the local storage status).
1564 lu.cfg.SetDiskID(dev, node)
1571 if on_primary or dev.AssembleOnSecondary():
1572 rstats = lu.rpc.call_blockdev_find(node, dev)
1573 if rstats.failed or not rstats.data:
1574 logging.warning("Node %s: disk degraded, not found or node down", node)
1577 result = result and (not rstats.data[idx])
1579 for child in dev.children:
1580 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1585 class LUDiagnoseOS(NoHooksLU):
1586 """Logical unit for OS diagnose/query.
1589 _OP_REQP = ["output_fields", "names"]
1591 _FIELDS_STATIC = utils.FieldSet()
1592 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1594 def ExpandNames(self):
1596 raise errors.OpPrereqError("Selective OS query not supported")
1598 _CheckOutputFields(static=self._FIELDS_STATIC,
1599 dynamic=self._FIELDS_DYNAMIC,
1600 selected=self.op.output_fields)
1602 # Lock all nodes, in shared mode
1603 self.needed_locks = {}
1604 self.share_locks[locking.LEVEL_NODE] = 1
1605 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1607 def CheckPrereq(self):
1608 """Check prerequisites.
1613 def _DiagnoseByOS(node_list, rlist):
1614 """Remaps a per-node return list into an a per-os per-node dictionary
1616 @param node_list: a list with the names of all nodes
1617 @param rlist: a map with node names as keys and OS objects as values
1620 @returns: a dictionary with osnames as keys and as value another map, with
1621 nodes as keys and list of OS objects as values, eg::
1623 {"debian-etch": {"node1": [<object>,...],
1624 "node2": [<object>,]}
1629 for node_name, nr in rlist.iteritems():
1630 if nr.failed or not nr.data:
1632 for os_obj in nr.data:
1633 if os_obj.name not in all_os:
1634 # build a list of nodes for this os containing empty lists
1635 # for each node in node_list
1636 all_os[os_obj.name] = {}
1637 for nname in node_list:
1638 all_os[os_obj.name][nname] = []
1639 all_os[os_obj.name][node_name].append(os_obj)
1642 def Exec(self, feedback_fn):
1643 """Compute the list of OSes.
1646 node_list = self.acquired_locks[locking.LEVEL_NODE]
1647 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1648 if node in node_list]
1649 node_data = self.rpc.call_os_diagnose(valid_nodes)
1650 if node_data == False:
1651 raise errors.OpExecError("Can't gather the list of OSes")
1652 pol = self._DiagnoseByOS(valid_nodes, node_data)
1654 for os_name, os_data in pol.iteritems():
1656 for field in self.op.output_fields:
1659 elif field == "valid":
1660 val = utils.all([osl and osl[0] for osl in os_data.values()])
1661 elif field == "node_status":
1663 for node_name, nos_list in os_data.iteritems():
1664 val[node_name] = [(v.status, v.path) for v in nos_list]
1666 raise errors.ParameterError(field)
1673 class LURemoveNode(LogicalUnit):
1674 """Logical unit for removing a node.
1677 HPATH = "node-remove"
1678 HTYPE = constants.HTYPE_NODE
1679 _OP_REQP = ["node_name"]
1681 def BuildHooksEnv(self):
1684 This doesn't run on the target node in the pre phase as a failed
1685 node would then be impossible to remove.
1689 "OP_TARGET": self.op.node_name,
1690 "NODE_NAME": self.op.node_name,
1692 all_nodes = self.cfg.GetNodeList()
1693 all_nodes.remove(self.op.node_name)
1694 return env, all_nodes, all_nodes
1696 def CheckPrereq(self):
1697 """Check prerequisites.
1700 - the node exists in the configuration
1701 - it does not have primary or secondary instances
1702 - it's not the master
1704 Any errors are signalled by raising errors.OpPrereqError.
1707 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1709 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1711 instance_list = self.cfg.GetInstanceList()
1713 masternode = self.cfg.GetMasterNode()
1714 if node.name == masternode:
1715 raise errors.OpPrereqError("Node is the master node,"
1716 " you need to failover first.")
1718 for instance_name in instance_list:
1719 instance = self.cfg.GetInstanceInfo(instance_name)
1720 if node.name in instance.all_nodes:
1721 raise errors.OpPrereqError("Instance %s is still running on the node,"
1722 " please remove first." % instance_name)
1723 self.op.node_name = node.name
1726 def Exec(self, feedback_fn):
1727 """Removes the node from the cluster.
1731 logging.info("Stopping the node daemon and removing configs from node %s",
1734 self.context.RemoveNode(node.name)
1736 self.rpc.call_node_leave_cluster(node.name)
1738 # Promote nodes to master candidate as needed
1739 _AdjustCandidatePool(self)
1742 class LUQueryNodes(NoHooksLU):
1743 """Logical unit for querying nodes.
1746 _OP_REQP = ["output_fields", "names"]
1748 _FIELDS_DYNAMIC = utils.FieldSet(
1750 "mtotal", "mnode", "mfree",
1755 _FIELDS_STATIC = utils.FieldSet(
1756 "name", "pinst_cnt", "sinst_cnt",
1757 "pinst_list", "sinst_list",
1758 "pip", "sip", "tags",
1765 def ExpandNames(self):
1766 _CheckOutputFields(static=self._FIELDS_STATIC,
1767 dynamic=self._FIELDS_DYNAMIC,
1768 selected=self.op.output_fields)
1770 self.needed_locks = {}
1771 self.share_locks[locking.LEVEL_NODE] = 1
1774 self.wanted = _GetWantedNodes(self, self.op.names)
1776 self.wanted = locking.ALL_SET
1778 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1780 # if we don't request only static fields, we need to lock the nodes
1781 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1784 def CheckPrereq(self):
1785 """Check prerequisites.
1788 # The validation of the node list is done in the _GetWantedNodes,
1789 # if non empty, and if empty, there's no validation to do
1792 def Exec(self, feedback_fn):
1793 """Computes the list of nodes and their attributes.
1796 all_info = self.cfg.GetAllNodesInfo()
1798 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1799 elif self.wanted != locking.ALL_SET:
1800 nodenames = self.wanted
1801 missing = set(nodenames).difference(all_info.keys())
1803 raise errors.OpExecError(
1804 "Some nodes were removed before retrieving their data: %s" % missing)
1806 nodenames = all_info.keys()
1808 nodenames = utils.NiceSort(nodenames)
1809 nodelist = [all_info[name] for name in nodenames]
1811 # begin data gathering
1815 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1816 self.cfg.GetHypervisorType())
1817 for name in nodenames:
1818 nodeinfo = node_data[name]
1819 if not nodeinfo.failed and nodeinfo.data:
1820 nodeinfo = nodeinfo.data
1821 fn = utils.TryConvert
1823 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1824 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1825 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1826 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1827 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1828 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1829 "bootid": nodeinfo.get('bootid', None),
1832 live_data[name] = {}
1834 live_data = dict.fromkeys(nodenames, {})
1836 node_to_primary = dict([(name, set()) for name in nodenames])
1837 node_to_secondary = dict([(name, set()) for name in nodenames])
1839 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1840 "sinst_cnt", "sinst_list"))
1841 if inst_fields & frozenset(self.op.output_fields):
1842 instancelist = self.cfg.GetInstanceList()
1844 for instance_name in instancelist:
1845 inst = self.cfg.GetInstanceInfo(instance_name)
1846 if inst.primary_node in node_to_primary:
1847 node_to_primary[inst.primary_node].add(inst.name)
1848 for secnode in inst.secondary_nodes:
1849 if secnode in node_to_secondary:
1850 node_to_secondary[secnode].add(inst.name)
1852 master_node = self.cfg.GetMasterNode()
1854 # end data gathering
1857 for node in nodelist:
1859 for field in self.op.output_fields:
1862 elif field == "pinst_list":
1863 val = list(node_to_primary[node.name])
1864 elif field == "sinst_list":
1865 val = list(node_to_secondary[node.name])
1866 elif field == "pinst_cnt":
1867 val = len(node_to_primary[node.name])
1868 elif field == "sinst_cnt":
1869 val = len(node_to_secondary[node.name])
1870 elif field == "pip":
1871 val = node.primary_ip
1872 elif field == "sip":
1873 val = node.secondary_ip
1874 elif field == "tags":
1875 val = list(node.GetTags())
1876 elif field == "serial_no":
1877 val = node.serial_no
1878 elif field == "master_candidate":
1879 val = node.master_candidate
1880 elif field == "master":
1881 val = node.name == master_node
1882 elif field == "offline":
1884 elif self._FIELDS_DYNAMIC.Matches(field):
1885 val = live_data[node.name].get(field, None)
1887 raise errors.ParameterError(field)
1888 node_output.append(val)
1889 output.append(node_output)
1894 class LUQueryNodeVolumes(NoHooksLU):
1895 """Logical unit for getting volumes on node(s).
1898 _OP_REQP = ["nodes", "output_fields"]
1900 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1901 _FIELDS_STATIC = utils.FieldSet("node")
1903 def ExpandNames(self):
1904 _CheckOutputFields(static=self._FIELDS_STATIC,
1905 dynamic=self._FIELDS_DYNAMIC,
1906 selected=self.op.output_fields)
1908 self.needed_locks = {}
1909 self.share_locks[locking.LEVEL_NODE] = 1
1910 if not self.op.nodes:
1911 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1913 self.needed_locks[locking.LEVEL_NODE] = \
1914 _GetWantedNodes(self, self.op.nodes)
1916 def CheckPrereq(self):
1917 """Check prerequisites.
1919 This checks that the fields required are valid output fields.
1922 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1924 def Exec(self, feedback_fn):
1925 """Computes the list of nodes and their attributes.
1928 nodenames = self.nodes
1929 volumes = self.rpc.call_node_volumes(nodenames)
1931 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1932 in self.cfg.GetInstanceList()]
1934 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1937 for node in nodenames:
1938 if node not in volumes or volumes[node].failed or not volumes[node].data:
1941 node_vols = volumes[node].data[:]
1942 node_vols.sort(key=lambda vol: vol['dev'])
1944 for vol in node_vols:
1946 for field in self.op.output_fields:
1949 elif field == "phys":
1953 elif field == "name":
1955 elif field == "size":
1956 val = int(float(vol['size']))
1957 elif field == "instance":
1959 if node not in lv_by_node[inst]:
1961 if vol['name'] in lv_by_node[inst][node]:
1967 raise errors.ParameterError(field)
1968 node_output.append(str(val))
1970 output.append(node_output)
1975 class LUAddNode(LogicalUnit):
1976 """Logical unit for adding node to the cluster.
1980 HTYPE = constants.HTYPE_NODE
1981 _OP_REQP = ["node_name"]
1983 def BuildHooksEnv(self):
1986 This will run on all nodes before, and on all nodes + the new node after.
1990 "OP_TARGET": self.op.node_name,
1991 "NODE_NAME": self.op.node_name,
1992 "NODE_PIP": self.op.primary_ip,
1993 "NODE_SIP": self.op.secondary_ip,
1995 nodes_0 = self.cfg.GetNodeList()
1996 nodes_1 = nodes_0 + [self.op.node_name, ]
1997 return env, nodes_0, nodes_1
1999 def CheckPrereq(self):
2000 """Check prerequisites.
2003 - the new node is not already in the config
2005 - its parameters (single/dual homed) matches the cluster
2007 Any errors are signalled by raising errors.OpPrereqError.
2010 node_name = self.op.node_name
2013 dns_data = utils.HostInfo(node_name)
2015 node = dns_data.name
2016 primary_ip = self.op.primary_ip = dns_data.ip
2017 secondary_ip = getattr(self.op, "secondary_ip", None)
2018 if secondary_ip is None:
2019 secondary_ip = primary_ip
2020 if not utils.IsValidIP(secondary_ip):
2021 raise errors.OpPrereqError("Invalid secondary IP given")
2022 self.op.secondary_ip = secondary_ip
2024 node_list = cfg.GetNodeList()
2025 if not self.op.readd and node in node_list:
2026 raise errors.OpPrereqError("Node %s is already in the configuration" %
2028 elif self.op.readd and node not in node_list:
2029 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2031 for existing_node_name in node_list:
2032 existing_node = cfg.GetNodeInfo(existing_node_name)
2034 if self.op.readd and node == existing_node_name:
2035 if (existing_node.primary_ip != primary_ip or
2036 existing_node.secondary_ip != secondary_ip):
2037 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2038 " address configuration as before")
2041 if (existing_node.primary_ip == primary_ip or
2042 existing_node.secondary_ip == primary_ip or
2043 existing_node.primary_ip == secondary_ip or
2044 existing_node.secondary_ip == secondary_ip):
2045 raise errors.OpPrereqError("New node ip address(es) conflict with"
2046 " existing node %s" % existing_node.name)
2048 # check that the type of the node (single versus dual homed) is the
2049 # same as for the master
2050 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2051 master_singlehomed = myself.secondary_ip == myself.primary_ip
2052 newbie_singlehomed = secondary_ip == primary_ip
2053 if master_singlehomed != newbie_singlehomed:
2054 if master_singlehomed:
2055 raise errors.OpPrereqError("The master has no private ip but the"
2056 " new node has one")
2058 raise errors.OpPrereqError("The master has a private ip but the"
2059 " new node doesn't have one")
2061 # checks reachablity
2062 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2063 raise errors.OpPrereqError("Node not reachable by ping")
2065 if not newbie_singlehomed:
2066 # check reachability from my secondary ip to newbie's secondary ip
2067 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2068 source=myself.secondary_ip):
2069 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2070 " based ping to noded port")
2072 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2073 mc_now, _ = self.cfg.GetMasterCandidateStats()
2074 master_candidate = mc_now < cp_size
2076 self.new_node = objects.Node(name=node,
2077 primary_ip=primary_ip,
2078 secondary_ip=secondary_ip,
2079 master_candidate=master_candidate,
2082 def Exec(self, feedback_fn):
2083 """Adds the new node to the cluster.
2086 new_node = self.new_node
2087 node = new_node.name
2089 # check connectivity
2090 result = self.rpc.call_version([node])[node]
2093 if constants.PROTOCOL_VERSION == result.data:
2094 logging.info("Communication to node %s fine, sw version %s match",
2097 raise errors.OpExecError("Version mismatch master version %s,"
2098 " node version %s" %
2099 (constants.PROTOCOL_VERSION, result.data))
2101 raise errors.OpExecError("Cannot get version from the new node")
2104 logging.info("Copy ssh key to node %s", node)
2105 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2107 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2108 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2114 keyarray.append(f.read())
2118 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2120 keyarray[3], keyarray[4], keyarray[5])
2122 if result.failed or not result.data:
2123 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2125 # Add node to our /etc/hosts, and add key to known_hosts
2126 utils.AddHostToEtcHosts(new_node.name)
2128 if new_node.secondary_ip != new_node.primary_ip:
2129 result = self.rpc.call_node_has_ip_address(new_node.name,
2130 new_node.secondary_ip)
2131 if result.failed or not result.data:
2132 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2133 " you gave (%s). Please fix and re-run this"
2134 " command." % new_node.secondary_ip)
2136 node_verify_list = [self.cfg.GetMasterNode()]
2137 node_verify_param = {
2139 # TODO: do a node-net-test as well?
2142 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2143 self.cfg.GetClusterName())
2144 for verifier in node_verify_list:
2145 if result[verifier].failed or not result[verifier].data:
2146 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2147 " for remote verification" % verifier)
2148 if result[verifier].data['nodelist']:
2149 for failed in result[verifier].data['nodelist']:
2150 feedback_fn("ssh/hostname verification failed %s -> %s" %
2151 (verifier, result[verifier]['nodelist'][failed]))
2152 raise errors.OpExecError("ssh/hostname verification failed.")
2154 # Distribute updated /etc/hosts and known_hosts to all nodes,
2155 # including the node just added
2156 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2157 dist_nodes = self.cfg.GetNodeList()
2158 if not self.op.readd:
2159 dist_nodes.append(node)
2160 if myself.name in dist_nodes:
2161 dist_nodes.remove(myself.name)
2163 logging.debug("Copying hosts and known_hosts to all nodes")
2164 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2165 result = self.rpc.call_upload_file(dist_nodes, fname)
2166 for to_node, to_result in result.iteritems():
2167 if to_result.failed or not to_result.data:
2168 logging.error("Copy of file %s to node %s failed", fname, to_node)
2171 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2172 to_copy.append(constants.VNC_PASSWORD_FILE)
2173 for fname in to_copy:
2174 result = self.rpc.call_upload_file([node], fname)
2175 if result[node].failed or not result[node]:
2176 logging.error("Could not copy file %s to node %s", fname, node)
2179 self.context.ReaddNode(new_node)
2181 self.context.AddNode(new_node)
2184 class LUSetNodeParams(LogicalUnit):
2185 """Modifies the parameters of a node.
2188 HPATH = "node-modify"
2189 HTYPE = constants.HTYPE_NODE
2190 _OP_REQP = ["node_name"]
2193 def CheckArguments(self):
2194 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2195 if node_name is None:
2196 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2197 self.op.node_name = node_name
2198 _CheckBooleanOpField(self.op, 'master_candidate')
2199 _CheckBooleanOpField(self.op, 'offline')
2200 if self.op.master_candidate is None and self.op.offline is None:
2201 raise errors.OpPrereqError("Please pass at least one modification")
2202 if self.op.offline == True and self.op.master_candidate == True:
2203 raise errors.OpPrereqError("Can't set the node into offline and"
2204 " master_candidate at the same time")
2206 def ExpandNames(self):
2207 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2209 def BuildHooksEnv(self):
2212 This runs on the master node.
2216 "OP_TARGET": self.op.node_name,
2217 "MASTER_CANDIDATE": str(self.op.master_candidate),
2218 "OFFLINE": str(self.op.offline),
2220 nl = [self.cfg.GetMasterNode(),
2224 def CheckPrereq(self):
2225 """Check prerequisites.
2227 This only checks the instance list against the existing names.
2230 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2232 if ((self.op.master_candidate == False or self.op.offline == True)
2233 and node.master_candidate):
2234 # we will demote the node from master_candidate
2235 if self.op.node_name == self.cfg.GetMasterNode():
2236 raise errors.OpPrereqError("The master node has to be a"
2237 " master candidate and online")
2238 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2239 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2240 if num_candidates <= cp_size:
2241 msg = ("Not enough master candidates (desired"
2242 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2244 self.LogWarning(msg)
2246 raise errors.OpPrereqError(msg)
2248 if (self.op.master_candidate == True and node.offline and
2249 not self.op.offline == False):
2250 raise errors.OpPrereqError("Can't set an offline node to"
2251 " master_candidate")
2255 def Exec(self, feedback_fn):
2263 if self.op.offline is not None:
2264 node.offline = self.op.offline
2265 result.append(("offline", str(self.op.offline)))
2266 if self.op.offline == True and node.master_candidate:
2267 node.master_candidate = False
2268 result.append(("master_candidate", "auto-demotion due to offline"))
2270 if self.op.master_candidate is not None:
2271 node.master_candidate = self.op.master_candidate
2272 result.append(("master_candidate", str(self.op.master_candidate)))
2273 if self.op.master_candidate == False:
2274 rrc = self.rpc.call_node_demote_from_mc(node.name)
2275 if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2276 or len(rrc.data) != 2):
2277 self.LogWarning("Node rpc error: %s" % rrc.error)
2278 elif not rrc.data[0]:
2279 self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2281 # this will trigger configuration file update, if needed
2282 self.cfg.Update(node)
2283 # this will trigger job queue propagation or cleanup
2284 if self.op.node_name != self.cfg.GetMasterNode():
2285 self.context.ReaddNode(node)
2290 class LUQueryClusterInfo(NoHooksLU):
2291 """Query cluster configuration.
2297 def ExpandNames(self):
2298 self.needed_locks = {}
2300 def CheckPrereq(self):
2301 """No prerequsites needed for this LU.
2306 def Exec(self, feedback_fn):
2307 """Return cluster config.
2310 cluster = self.cfg.GetClusterInfo()
2312 "software_version": constants.RELEASE_VERSION,
2313 "protocol_version": constants.PROTOCOL_VERSION,
2314 "config_version": constants.CONFIG_VERSION,
2315 "os_api_version": constants.OS_API_VERSION,
2316 "export_version": constants.EXPORT_VERSION,
2317 "architecture": (platform.architecture()[0], platform.machine()),
2318 "name": cluster.cluster_name,
2319 "master": cluster.master_node,
2320 "default_hypervisor": cluster.default_hypervisor,
2321 "enabled_hypervisors": cluster.enabled_hypervisors,
2322 "hvparams": cluster.hvparams,
2323 "beparams": cluster.beparams,
2324 "candidate_pool_size": cluster.candidate_pool_size,
2330 class LUQueryConfigValues(NoHooksLU):
2331 """Return configuration values.
2336 _FIELDS_DYNAMIC = utils.FieldSet()
2337 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2339 def ExpandNames(self):
2340 self.needed_locks = {}
2342 _CheckOutputFields(static=self._FIELDS_STATIC,
2343 dynamic=self._FIELDS_DYNAMIC,
2344 selected=self.op.output_fields)
2346 def CheckPrereq(self):
2347 """No prerequisites.
2352 def Exec(self, feedback_fn):
2353 """Dump a representation of the cluster config to the standard output.
2357 for field in self.op.output_fields:
2358 if field == "cluster_name":
2359 entry = self.cfg.GetClusterName()
2360 elif field == "master_node":
2361 entry = self.cfg.GetMasterNode()
2362 elif field == "drain_flag":
2363 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2365 raise errors.ParameterError(field)
2366 values.append(entry)
2370 class LUActivateInstanceDisks(NoHooksLU):
2371 """Bring up an instance's disks.
2374 _OP_REQP = ["instance_name"]
2377 def ExpandNames(self):
2378 self._ExpandAndLockInstance()
2379 self.needed_locks[locking.LEVEL_NODE] = []
2380 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2382 def DeclareLocks(self, level):
2383 if level == locking.LEVEL_NODE:
2384 self._LockInstancesNodes()
2386 def CheckPrereq(self):
2387 """Check prerequisites.
2389 This checks that the instance is in the cluster.
2392 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2393 assert self.instance is not None, \
2394 "Cannot retrieve locked instance %s" % self.op.instance_name
2395 _CheckNodeOnline(self, self.instance.primary_node)
2397 def Exec(self, feedback_fn):
2398 """Activate the disks.
2401 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2403 raise errors.OpExecError("Cannot activate block devices")
2408 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2409 """Prepare the block devices for an instance.
2411 This sets up the block devices on all nodes.
2413 @type lu: L{LogicalUnit}
2414 @param lu: the logical unit on whose behalf we execute
2415 @type instance: L{objects.Instance}
2416 @param instance: the instance for whose disks we assemble
2417 @type ignore_secondaries: boolean
2418 @param ignore_secondaries: if true, errors on secondary nodes
2419 won't result in an error return from the function
2420 @return: False if the operation failed, otherwise a list of
2421 (host, instance_visible_name, node_visible_name)
2422 with the mapping from node devices to instance devices
2427 iname = instance.name
2428 # With the two passes mechanism we try to reduce the window of
2429 # opportunity for the race condition of switching DRBD to primary
2430 # before handshaking occured, but we do not eliminate it
2432 # The proper fix would be to wait (with some limits) until the
2433 # connection has been made and drbd transitions from WFConnection
2434 # into any other network-connected state (Connected, SyncTarget,
2437 # 1st pass, assemble on all nodes in secondary mode
2438 for inst_disk in instance.disks:
2439 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2440 lu.cfg.SetDiskID(node_disk, node)
2441 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2442 if result.failed or not result:
2443 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2444 " (is_primary=False, pass=1)",
2445 inst_disk.iv_name, node)
2446 if not ignore_secondaries:
2449 # FIXME: race condition on drbd migration to primary
2451 # 2nd pass, do only the primary node
2452 for inst_disk in instance.disks:
2453 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2454 if node != instance.primary_node:
2456 lu.cfg.SetDiskID(node_disk, node)
2457 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2458 if result.failed or not result:
2459 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2460 " (is_primary=True, pass=2)",
2461 inst_disk.iv_name, node)
2463 device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2465 # leave the disks configured for the primary node
2466 # this is a workaround that would be fixed better by
2467 # improving the logical/physical id handling
2468 for disk in instance.disks:
2469 lu.cfg.SetDiskID(disk, instance.primary_node)
2471 return disks_ok, device_info
2474 def _StartInstanceDisks(lu, instance, force):
2475 """Start the disks of an instance.
2478 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2479 ignore_secondaries=force)
2481 _ShutdownInstanceDisks(lu, instance)
2482 if force is not None and not force:
2483 lu.proc.LogWarning("", hint="If the message above refers to a"
2485 " you can retry the operation using '--force'.")
2486 raise errors.OpExecError("Disk consistency error")
2489 class LUDeactivateInstanceDisks(NoHooksLU):
2490 """Shutdown an instance's disks.
2493 _OP_REQP = ["instance_name"]
2496 def ExpandNames(self):
2497 self._ExpandAndLockInstance()
2498 self.needed_locks[locking.LEVEL_NODE] = []
2499 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2501 def DeclareLocks(self, level):
2502 if level == locking.LEVEL_NODE:
2503 self._LockInstancesNodes()
2505 def CheckPrereq(self):
2506 """Check prerequisites.
2508 This checks that the instance is in the cluster.
2511 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512 assert self.instance is not None, \
2513 "Cannot retrieve locked instance %s" % self.op.instance_name
2515 def Exec(self, feedback_fn):
2516 """Deactivate the disks
2519 instance = self.instance
2520 _SafeShutdownInstanceDisks(self, instance)
2523 def _SafeShutdownInstanceDisks(lu, instance):
2524 """Shutdown block devices of an instance.
2526 This function checks if an instance is running, before calling
2527 _ShutdownInstanceDisks.
2530 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2531 [instance.hypervisor])
2532 ins_l = ins_l[instance.primary_node]
2533 if ins_l.failed or not isinstance(ins_l.data, list):
2534 raise errors.OpExecError("Can't contact node '%s'" %
2535 instance.primary_node)
2537 if instance.name in ins_l.data:
2538 raise errors.OpExecError("Instance is running, can't shutdown"
2541 _ShutdownInstanceDisks(lu, instance)
2544 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2545 """Shutdown block devices of an instance.
2547 This does the shutdown on all nodes of the instance.
2549 If the ignore_primary is false, errors on the primary node are
2554 for disk in instance.disks:
2555 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2556 lu.cfg.SetDiskID(top_disk, node)
2557 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2558 if result.failed or not result.data:
2559 logging.error("Could not shutdown block device %s on node %s",
2561 if not ignore_primary or node != instance.primary_node:
2566 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2567 """Checks if a node has enough free memory.
2569 This function check if a given node has the needed amount of free
2570 memory. In case the node has less memory or we cannot get the
2571 information from the node, this function raise an OpPrereqError
2574 @type lu: C{LogicalUnit}
2575 @param lu: a logical unit from which we get configuration data
2577 @param node: the node to check
2578 @type reason: C{str}
2579 @param reason: string to use in the error message
2580 @type requested: C{int}
2581 @param requested: the amount of memory in MiB to check for
2582 @type hypervisor_name: C{str}
2583 @param hypervisor_name: the hypervisor to ask for memory stats
2584 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2585 we cannot check the node
2588 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2589 nodeinfo[node].Raise()
2590 free_mem = nodeinfo[node].data.get('memory_free')
2591 if not isinstance(free_mem, int):
2592 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2593 " was '%s'" % (node, free_mem))
2594 if requested > free_mem:
2595 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2596 " needed %s MiB, available %s MiB" %
2597 (node, reason, requested, free_mem))
2600 class LUStartupInstance(LogicalUnit):
2601 """Starts an instance.
2604 HPATH = "instance-start"
2605 HTYPE = constants.HTYPE_INSTANCE
2606 _OP_REQP = ["instance_name", "force"]
2609 def ExpandNames(self):
2610 self._ExpandAndLockInstance()
2612 def BuildHooksEnv(self):
2615 This runs on master, primary and secondary nodes of the instance.
2619 "FORCE": self.op.force,
2621 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2622 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2625 def CheckPrereq(self):
2626 """Check prerequisites.
2628 This checks that the instance is in the cluster.
2631 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632 assert self.instance is not None, \
2633 "Cannot retrieve locked instance %s" % self.op.instance_name
2635 _CheckNodeOnline(self, instance.primary_node)
2637 bep = self.cfg.GetClusterInfo().FillBE(instance)
2638 # check bridges existance
2639 _CheckInstanceBridgesExist(self, instance)
2641 _CheckNodeFreeMemory(self, instance.primary_node,
2642 "starting instance %s" % instance.name,
2643 bep[constants.BE_MEMORY], instance.hypervisor)
2645 def Exec(self, feedback_fn):
2646 """Start the instance.
2649 instance = self.instance
2650 force = self.op.force
2651 extra_args = getattr(self.op, "extra_args", "")
2653 self.cfg.MarkInstanceUp(instance.name)
2655 node_current = instance.primary_node
2657 _StartInstanceDisks(self, instance, force)
2659 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2660 msg = result.RemoteFailMsg()
2662 _ShutdownInstanceDisks(self, instance)
2663 raise errors.OpExecError("Could not start instance: %s" % msg)
2666 class LURebootInstance(LogicalUnit):
2667 """Reboot an instance.
2670 HPATH = "instance-reboot"
2671 HTYPE = constants.HTYPE_INSTANCE
2672 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2675 def ExpandNames(self):
2676 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2677 constants.INSTANCE_REBOOT_HARD,
2678 constants.INSTANCE_REBOOT_FULL]:
2679 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2680 (constants.INSTANCE_REBOOT_SOFT,
2681 constants.INSTANCE_REBOOT_HARD,
2682 constants.INSTANCE_REBOOT_FULL))
2683 self._ExpandAndLockInstance()
2685 def BuildHooksEnv(self):
2688 This runs on master, primary and secondary nodes of the instance.
2692 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2694 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2695 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2698 def CheckPrereq(self):
2699 """Check prerequisites.
2701 This checks that the instance is in the cluster.
2704 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2705 assert self.instance is not None, \
2706 "Cannot retrieve locked instance %s" % self.op.instance_name
2708 _CheckNodeOnline(self, instance.primary_node)
2710 # check bridges existance
2711 _CheckInstanceBridgesExist(self, instance)
2713 def Exec(self, feedback_fn):
2714 """Reboot the instance.
2717 instance = self.instance
2718 ignore_secondaries = self.op.ignore_secondaries
2719 reboot_type = self.op.reboot_type
2720 extra_args = getattr(self.op, "extra_args", "")
2722 node_current = instance.primary_node
2724 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2725 constants.INSTANCE_REBOOT_HARD]:
2726 result = self.rpc.call_instance_reboot(node_current, instance,
2727 reboot_type, extra_args)
2728 if result.failed or not result.data:
2729 raise errors.OpExecError("Could not reboot instance")
2731 if not self.rpc.call_instance_shutdown(node_current, instance):
2732 raise errors.OpExecError("could not shutdown instance for full reboot")
2733 _ShutdownInstanceDisks(self, instance)
2734 _StartInstanceDisks(self, instance, ignore_secondaries)
2735 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2736 msg = result.RemoteFailMsg()
2738 _ShutdownInstanceDisks(self, instance)
2739 raise errors.OpExecError("Could not start instance for"
2740 " full reboot: %s" % msg)
2742 self.cfg.MarkInstanceUp(instance.name)
2745 class LUShutdownInstance(LogicalUnit):
2746 """Shutdown an instance.
2749 HPATH = "instance-stop"
2750 HTYPE = constants.HTYPE_INSTANCE
2751 _OP_REQP = ["instance_name"]
2754 def ExpandNames(self):
2755 self._ExpandAndLockInstance()
2757 def BuildHooksEnv(self):
2760 This runs on master, primary and secondary nodes of the instance.
2763 env = _BuildInstanceHookEnvByObject(self, self.instance)
2764 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2767 def CheckPrereq(self):
2768 """Check prerequisites.
2770 This checks that the instance is in the cluster.
2773 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2774 assert self.instance is not None, \
2775 "Cannot retrieve locked instance %s" % self.op.instance_name
2776 _CheckNodeOnline(self, self.instance.primary_node)
2778 def Exec(self, feedback_fn):
2779 """Shutdown the instance.
2782 instance = self.instance
2783 node_current = instance.primary_node
2784 self.cfg.MarkInstanceDown(instance.name)
2785 result = self.rpc.call_instance_shutdown(node_current, instance)
2786 if result.failed or not result.data:
2787 self.proc.LogWarning("Could not shutdown instance")
2789 _ShutdownInstanceDisks(self, instance)
2792 class LUReinstallInstance(LogicalUnit):
2793 """Reinstall an instance.
2796 HPATH = "instance-reinstall"
2797 HTYPE = constants.HTYPE_INSTANCE
2798 _OP_REQP = ["instance_name"]
2801 def ExpandNames(self):
2802 self._ExpandAndLockInstance()
2804 def BuildHooksEnv(self):
2807 This runs on master, primary and secondary nodes of the instance.
2810 env = _BuildInstanceHookEnvByObject(self, self.instance)
2811 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2814 def CheckPrereq(self):
2815 """Check prerequisites.
2817 This checks that the instance is in the cluster and is not running.
2820 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2821 assert instance is not None, \
2822 "Cannot retrieve locked instance %s" % self.op.instance_name
2823 _CheckNodeOnline(self, instance.primary_node)
2825 if instance.disk_template == constants.DT_DISKLESS:
2826 raise errors.OpPrereqError("Instance '%s' has no disks" %
2827 self.op.instance_name)
2828 if instance.admin_up:
2829 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2830 self.op.instance_name)
2831 remote_info = self.rpc.call_instance_info(instance.primary_node,
2833 instance.hypervisor)
2834 if remote_info.failed or remote_info.data:
2835 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2836 (self.op.instance_name,
2837 instance.primary_node))
2839 self.op.os_type = getattr(self.op, "os_type", None)
2840 if self.op.os_type is not None:
2842 pnode = self.cfg.GetNodeInfo(
2843 self.cfg.ExpandNodeName(instance.primary_node))
2845 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2847 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2849 if not isinstance(result.data, objects.OS):
2850 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2851 " primary node" % self.op.os_type)
2853 self.instance = instance
2855 def Exec(self, feedback_fn):
2856 """Reinstall the instance.
2859 inst = self.instance
2861 if self.op.os_type is not None:
2862 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2863 inst.os = self.op.os_type
2864 self.cfg.Update(inst)
2866 _StartInstanceDisks(self, inst, None)
2868 feedback_fn("Running the instance OS create scripts...")
2869 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2870 msg = result.RemoteFailMsg()
2872 raise errors.OpExecError("Could not install OS for instance %s"
2874 (inst.name, inst.primary_node, msg))
2876 _ShutdownInstanceDisks(self, inst)
2879 class LURenameInstance(LogicalUnit):
2880 """Rename an instance.
2883 HPATH = "instance-rename"
2884 HTYPE = constants.HTYPE_INSTANCE
2885 _OP_REQP = ["instance_name", "new_name"]
2887 def BuildHooksEnv(self):
2890 This runs on master, primary and secondary nodes of the instance.
2893 env = _BuildInstanceHookEnvByObject(self, self.instance)
2894 env["INSTANCE_NEW_NAME"] = self.op.new_name
2895 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2898 def CheckPrereq(self):
2899 """Check prerequisites.
2901 This checks that the instance is in the cluster and is not running.
2904 instance = self.cfg.GetInstanceInfo(
2905 self.cfg.ExpandInstanceName(self.op.instance_name))
2906 if instance is None:
2907 raise errors.OpPrereqError("Instance '%s' not known" %
2908 self.op.instance_name)
2909 _CheckNodeOnline(self, instance.primary_node)
2911 if instance.admin_up:
2912 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2913 self.op.instance_name)
2914 remote_info = self.rpc.call_instance_info(instance.primary_node,
2916 instance.hypervisor)
2918 if remote_info.data:
2919 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2920 (self.op.instance_name,
2921 instance.primary_node))
2922 self.instance = instance
2924 # new name verification
2925 name_info = utils.HostInfo(self.op.new_name)
2927 self.op.new_name = new_name = name_info.name
2928 instance_list = self.cfg.GetInstanceList()
2929 if new_name in instance_list:
2930 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2933 if not getattr(self.op, "ignore_ip", False):
2934 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2935 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2936 (name_info.ip, new_name))
2939 def Exec(self, feedback_fn):
2940 """Reinstall the instance.
2943 inst = self.instance
2944 old_name = inst.name
2946 if inst.disk_template == constants.DT_FILE:
2947 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2949 self.cfg.RenameInstance(inst.name, self.op.new_name)
2950 # Change the instance lock. This is definitely safe while we hold the BGL
2951 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2952 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2954 # re-read the instance from the configuration after rename
2955 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2957 if inst.disk_template == constants.DT_FILE:
2958 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2959 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2960 old_file_storage_dir,
2961 new_file_storage_dir)
2964 raise errors.OpExecError("Could not connect to node '%s' to rename"
2965 " directory '%s' to '%s' (but the instance"
2966 " has been renamed in Ganeti)" % (
2967 inst.primary_node, old_file_storage_dir,
2968 new_file_storage_dir))
2970 if not result.data[0]:
2971 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2972 " (but the instance has been renamed in"
2973 " Ganeti)" % (old_file_storage_dir,
2974 new_file_storage_dir))
2976 _StartInstanceDisks(self, inst, None)
2978 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2980 msg = result.RemoteFailMsg()
2982 msg = ("Could not run OS rename script for instance %s on node %s"
2983 " (but the instance has been renamed in Ganeti): %s" %
2984 (inst.name, inst.primary_node, msg))
2985 self.proc.LogWarning(msg)
2987 _ShutdownInstanceDisks(self, inst)
2990 class LURemoveInstance(LogicalUnit):
2991 """Remove an instance.
2994 HPATH = "instance-remove"
2995 HTYPE = constants.HTYPE_INSTANCE
2996 _OP_REQP = ["instance_name", "ignore_failures"]
2999 def ExpandNames(self):
3000 self._ExpandAndLockInstance()
3001 self.needed_locks[locking.LEVEL_NODE] = []
3002 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3004 def DeclareLocks(self, level):
3005 if level == locking.LEVEL_NODE:
3006 self._LockInstancesNodes()
3008 def BuildHooksEnv(self):
3011 This runs on master, primary and secondary nodes of the instance.
3014 env = _BuildInstanceHookEnvByObject(self, self.instance)
3015 nl = [self.cfg.GetMasterNode()]
3018 def CheckPrereq(self):
3019 """Check prerequisites.
3021 This checks that the instance is in the cluster.
3024 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3025 assert self.instance is not None, \
3026 "Cannot retrieve locked instance %s" % self.op.instance_name
3028 def Exec(self, feedback_fn):
3029 """Remove the instance.
3032 instance = self.instance
3033 logging.info("Shutting down instance %s on node %s",
3034 instance.name, instance.primary_node)
3036 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3037 if result.failed or not result.data:
3038 if self.op.ignore_failures:
3039 feedback_fn("Warning: can't shutdown instance")
3041 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3042 (instance.name, instance.primary_node))
3044 logging.info("Removing block devices for instance %s", instance.name)
3046 if not _RemoveDisks(self, instance):
3047 if self.op.ignore_failures:
3048 feedback_fn("Warning: can't remove instance's disks")
3050 raise errors.OpExecError("Can't remove instance's disks")
3052 logging.info("Removing instance %s out of cluster config", instance.name)
3054 self.cfg.RemoveInstance(instance.name)
3055 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3058 class LUQueryInstances(NoHooksLU):
3059 """Logical unit for querying instances.
3062 _OP_REQP = ["output_fields", "names"]
3064 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3065 "admin_state", "admin_ram",
3066 "disk_template", "ip", "mac", "bridge",
3067 "sda_size", "sdb_size", "vcpus", "tags",
3068 "network_port", "beparams",
3069 "(disk).(size)/([0-9]+)",
3071 "(nic).(mac|ip|bridge)/([0-9]+)",
3072 "(nic).(macs|ips|bridges)",
3073 "(disk|nic).(count)",
3074 "serial_no", "hypervisor", "hvparams",] +
3076 for name in constants.HVS_PARAMETERS] +
3078 for name in constants.BES_PARAMETERS])
3079 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3082 def ExpandNames(self):
3083 _CheckOutputFields(static=self._FIELDS_STATIC,
3084 dynamic=self._FIELDS_DYNAMIC,
3085 selected=self.op.output_fields)
3087 self.needed_locks = {}
3088 self.share_locks[locking.LEVEL_INSTANCE] = 1
3089 self.share_locks[locking.LEVEL_NODE] = 1
3092 self.wanted = _GetWantedInstances(self, self.op.names)
3094 self.wanted = locking.ALL_SET
3096 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3098 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3099 self.needed_locks[locking.LEVEL_NODE] = []
3100 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3102 def DeclareLocks(self, level):
3103 if level == locking.LEVEL_NODE and self.do_locking:
3104 self._LockInstancesNodes()
3106 def CheckPrereq(self):
3107 """Check prerequisites.
3112 def Exec(self, feedback_fn):
3113 """Computes the list of nodes and their attributes.
3116 all_info = self.cfg.GetAllInstancesInfo()
3118 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3119 elif self.wanted != locking.ALL_SET:
3120 instance_names = self.wanted
3121 missing = set(instance_names).difference(all_info.keys())
3123 raise errors.OpExecError(
3124 "Some instances were removed before retrieving their data: %s"
3127 instance_names = all_info.keys()
3129 instance_names = utils.NiceSort(instance_names)
3130 instance_list = [all_info[iname] for iname in instance_names]
3132 # begin data gathering
3134 nodes = frozenset([inst.primary_node for inst in instance_list])
3135 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3141 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3143 result = node_data[name]
3145 # offline nodes will be in both lists
3146 off_nodes.append(name)
3148 bad_nodes.append(name)
3151 live_data.update(result.data)
3152 # else no instance is alive
3154 live_data = dict([(name, {}) for name in instance_names])
3156 # end data gathering
3161 for instance in instance_list:
3163 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3164 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3165 for field in self.op.output_fields:
3166 st_match = self._FIELDS_STATIC.Matches(field)
3171 elif field == "pnode":
3172 val = instance.primary_node
3173 elif field == "snodes":
3174 val = list(instance.secondary_nodes)
3175 elif field == "admin_state":
3176 val = instance.admin_up
3177 elif field == "oper_state":
3178 if instance.primary_node in bad_nodes:
3181 val = bool(live_data.get(instance.name))
3182 elif field == "status":
3183 if instance.primary_node in off_nodes:
3184 val = "ERROR_nodeoffline"
3185 elif instance.primary_node in bad_nodes:
3186 val = "ERROR_nodedown"
3188 running = bool(live_data.get(instance.name))
3190 if instance.admin_up:
3195 if instance.admin_up:
3199 elif field == "oper_ram":
3200 if instance.primary_node in bad_nodes:
3202 elif instance.name in live_data:
3203 val = live_data[instance.name].get("memory", "?")
3206 elif field == "disk_template":
3207 val = instance.disk_template
3209 val = instance.nics[0].ip
3210 elif field == "bridge":
3211 val = instance.nics[0].bridge
3212 elif field == "mac":
3213 val = instance.nics[0].mac
3214 elif field == "sda_size" or field == "sdb_size":
3215 idx = ord(field[2]) - ord('a')
3217 val = instance.FindDisk(idx).size
3218 except errors.OpPrereqError:
3220 elif field == "tags":
3221 val = list(instance.GetTags())
3222 elif field == "serial_no":
3223 val = instance.serial_no
3224 elif field == "network_port":
3225 val = instance.network_port
3226 elif field == "hypervisor":
3227 val = instance.hypervisor
3228 elif field == "hvparams":
3230 elif (field.startswith(HVPREFIX) and
3231 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3232 val = i_hv.get(field[len(HVPREFIX):], None)
3233 elif field == "beparams":
3235 elif (field.startswith(BEPREFIX) and
3236 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3237 val = i_be.get(field[len(BEPREFIX):], None)
3238 elif st_match and st_match.groups():
3239 # matches a variable list
3240 st_groups = st_match.groups()
3241 if st_groups and st_groups[0] == "disk":
3242 if st_groups[1] == "count":
3243 val = len(instance.disks)
3244 elif st_groups[1] == "sizes":
3245 val = [disk.size for disk in instance.disks]
3246 elif st_groups[1] == "size":
3248 val = instance.FindDisk(st_groups[2]).size
3249 except errors.OpPrereqError:
3252 assert False, "Unhandled disk parameter"
3253 elif st_groups[0] == "nic":
3254 if st_groups[1] == "count":
3255 val = len(instance.nics)
3256 elif st_groups[1] == "macs":
3257 val = [nic.mac for nic in instance.nics]
3258 elif st_groups[1] == "ips":
3259 val = [nic.ip for nic in instance.nics]
3260 elif st_groups[1] == "bridges":
3261 val = [nic.bridge for nic in instance.nics]
3264 nic_idx = int(st_groups[2])
3265 if nic_idx >= len(instance.nics):
3268 if st_groups[1] == "mac":
3269 val = instance.nics[nic_idx].mac
3270 elif st_groups[1] == "ip":
3271 val = instance.nics[nic_idx].ip
3272 elif st_groups[1] == "bridge":
3273 val = instance.nics[nic_idx].bridge
3275 assert False, "Unhandled NIC parameter"
3277 assert False, "Unhandled variable parameter"
3279 raise errors.ParameterError(field)
3286 class LUFailoverInstance(LogicalUnit):
3287 """Failover an instance.
3290 HPATH = "instance-failover"
3291 HTYPE = constants.HTYPE_INSTANCE
3292 _OP_REQP = ["instance_name", "ignore_consistency"]
3295 def ExpandNames(self):
3296 self._ExpandAndLockInstance()
3297 self.needed_locks[locking.LEVEL_NODE] = []
3298 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3300 def DeclareLocks(self, level):
3301 if level == locking.LEVEL_NODE:
3302 self._LockInstancesNodes()
3304 def BuildHooksEnv(self):
3307 This runs on master, primary and secondary nodes of the instance.
3311 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3313 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3314 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3317 def CheckPrereq(self):
3318 """Check prerequisites.
3320 This checks that the instance is in the cluster.
3323 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3324 assert self.instance is not None, \
3325 "Cannot retrieve locked instance %s" % self.op.instance_name
3327 bep = self.cfg.GetClusterInfo().FillBE(instance)
3328 if instance.disk_template not in constants.DTS_NET_MIRROR:
3329 raise errors.OpPrereqError("Instance's disk layout is not"
3330 " network mirrored, cannot failover.")
3332 secondary_nodes = instance.secondary_nodes
3333 if not secondary_nodes:
3334 raise errors.ProgrammerError("no secondary node but using "
3335 "a mirrored disk template")
3337 target_node = secondary_nodes[0]
3338 _CheckNodeOnline(self, target_node)
3339 # check memory requirements on the secondary node
3340 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3341 instance.name, bep[constants.BE_MEMORY],
3342 instance.hypervisor)
3344 # check bridge existance
3345 brlist = [nic.bridge for nic in instance.nics]
3346 result = self.rpc.call_bridges_exist(target_node, brlist)
3349 raise errors.OpPrereqError("One or more target bridges %s does not"
3350 " exist on destination node '%s'" %
3351 (brlist, target_node))
3353 def Exec(self, feedback_fn):
3354 """Failover an instance.
3356 The failover is done by shutting it down on its present node and
3357 starting it on the secondary.
3360 instance = self.instance
3362 source_node = instance.primary_node
3363 target_node = instance.secondary_nodes[0]
3365 feedback_fn("* checking disk consistency between source and target")
3366 for dev in instance.disks:
3367 # for drbd, these are drbd over lvm
3368 if not _CheckDiskConsistency(self, dev, target_node, False):
3369 if instance.admin_up and not self.op.ignore_consistency:
3370 raise errors.OpExecError("Disk %s is degraded on target node,"
3371 " aborting failover." % dev.iv_name)
3373 feedback_fn("* shutting down instance on source node")
3374 logging.info("Shutting down instance %s on node %s",
3375 instance.name, source_node)
3377 result = self.rpc.call_instance_shutdown(source_node, instance)
3378 if result.failed or not result.data:
3379 if self.op.ignore_consistency:
3380 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3382 " anyway. Please make sure node %s is down",
3383 instance.name, source_node, source_node)
3385 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3386 (instance.name, source_node))
3388 feedback_fn("* deactivating the instance's disks on source node")
3389 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3390 raise errors.OpExecError("Can't shut down the instance's disks.")
3392 instance.primary_node = target_node
3393 # distribute new instance config to the other nodes
3394 self.cfg.Update(instance)
3396 # Only start the instance if it's marked as up
3397 if instance.admin_up:
3398 feedback_fn("* activating the instance's disks on target node")
3399 logging.info("Starting instance %s on node %s",
3400 instance.name, target_node)
3402 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3403 ignore_secondaries=True)
3405 _ShutdownInstanceDisks(self, instance)
3406 raise errors.OpExecError("Can't activate the instance's disks")
3408 feedback_fn("* starting the instance on the target node")
3409 result = self.rpc.call_instance_start(target_node, instance, None)
3410 msg = result.RemoteFailMsg()
3412 _ShutdownInstanceDisks(self, instance)
3413 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3414 (instance.name, target_node, msg))
3417 class LUMigrateInstance(LogicalUnit):
3418 """Migrate an instance.
3420 This is migration without shutting down, compared to the failover,
3421 which is done with shutdown.
3424 HPATH = "instance-migrate"
3425 HTYPE = constants.HTYPE_INSTANCE
3426 _OP_REQP = ["instance_name", "live", "cleanup"]
3430 def ExpandNames(self):
3431 self._ExpandAndLockInstance()
3432 self.needed_locks[locking.LEVEL_NODE] = []
3433 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3435 def DeclareLocks(self, level):
3436 if level == locking.LEVEL_NODE:
3437 self._LockInstancesNodes()
3439 def BuildHooksEnv(self):
3442 This runs on master, primary and secondary nodes of the instance.
3445 env = _BuildInstanceHookEnvByObject(self, self.instance)
3446 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3449 def CheckPrereq(self):
3450 """Check prerequisites.
3452 This checks that the instance is in the cluster.
3455 instance = self.cfg.GetInstanceInfo(
3456 self.cfg.ExpandInstanceName(self.op.instance_name))
3457 if instance is None:
3458 raise errors.OpPrereqError("Instance '%s' not known" %
3459 self.op.instance_name)
3461 if instance.disk_template != constants.DT_DRBD8:
3462 raise errors.OpPrereqError("Instance's disk layout is not"
3463 " drbd8, cannot migrate.")
3465 secondary_nodes = instance.secondary_nodes
3466 if not secondary_nodes:
3467 raise errors.ProgrammerError("no secondary node but using "
3468 "drbd8 disk template")
3470 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3472 target_node = secondary_nodes[0]
3473 # check memory requirements on the secondary node
3474 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3475 instance.name, i_be[constants.BE_MEMORY],
3476 instance.hypervisor)
3478 # check bridge existance
3479 brlist = [nic.bridge for nic in instance.nics]
3480 result = self.rpc.call_bridges_exist(target_node, brlist)
3481 if result.failed or not result.data:
3482 raise errors.OpPrereqError("One or more target bridges %s does not"
3483 " exist on destination node '%s'" %
3484 (brlist, target_node))
3486 if not self.op.cleanup:
3487 result = self.rpc.call_instance_migratable(instance.primary_node,
3489 msg = result.RemoteFailMsg()
3491 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3494 self.instance = instance
3496 def _WaitUntilSync(self):
3497 """Poll with custom rpc for disk sync.
3499 This uses our own step-based rpc call.
3502 self.feedback_fn("* wait until resync is done")
3506 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3508 self.instance.disks)
3510 for node, nres in result.items():
3511 msg = nres.RemoteFailMsg()
3513 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3515 node_done, node_percent = nres.data[1]
3516 all_done = all_done and node_done
3517 if node_percent is not None:
3518 min_percent = min(min_percent, node_percent)
3520 if min_percent < 100:
3521 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3524 def _EnsureSecondary(self, node):
3525 """Demote a node to secondary.
3528 self.feedback_fn("* switching node %s to secondary mode" % node)
3530 for dev in self.instance.disks:
3531 self.cfg.SetDiskID(dev, node)
3533 result = self.rpc.call_blockdev_close(node, self.instance.name,
3534 self.instance.disks)
3535 msg = result.RemoteFailMsg()
3537 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3538 " error %s" % (node, msg))
3540 def _GoStandalone(self):
3541 """Disconnect from the network.
3544 self.feedback_fn("* changing into standalone mode")
3545 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3546 self.instance.disks)
3547 for node, nres in result.items():
3548 msg = nres.RemoteFailMsg()
3550 raise errors.OpExecError("Cannot disconnect disks node %s,"
3551 " error %s" % (node, msg))
3553 def _GoReconnect(self, multimaster):
3554 """Reconnect to the network.
3560 msg = "single-master"
3561 self.feedback_fn("* changing disks into %s mode" % msg)
3562 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3563 self.instance.disks,
3564 self.instance.name, multimaster)
3565 for node, nres in result.items():
3566 msg = nres.RemoteFailMsg()
3568 raise errors.OpExecError("Cannot change disks config on node %s,"
3569 " error: %s" % (node, msg))
3571 def _ExecCleanup(self):
3572 """Try to cleanup after a failed migration.
3574 The cleanup is done by:
3575 - check that the instance is running only on one node
3576 (and update the config if needed)
3577 - change disks on its secondary node to secondary
3578 - wait until disks are fully synchronized
3579 - disconnect from the network
3580 - change disks into single-master mode
3581 - wait again until disks are fully synchronized
3584 instance = self.instance
3585 target_node = self.target_node
3586 source_node = self.source_node
3588 # check running on only one node
3589 self.feedback_fn("* checking where the instance actually runs"
3590 " (if this hangs, the hypervisor might be in"
3592 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3593 for node, result in ins_l.items():
3595 if not isinstance(result.data, list):
3596 raise errors.OpExecError("Can't contact node '%s'" % node)
3598 runningon_source = instance.name in ins_l[source_node].data
3599 runningon_target = instance.name in ins_l[target_node].data
3601 if runningon_source and runningon_target:
3602 raise errors.OpExecError("Instance seems to be running on two nodes,"
3603 " or the hypervisor is confused. You will have"
3604 " to ensure manually that it runs only on one"
3605 " and restart this operation.")
3607 if not (runningon_source or runningon_target):
3608 raise errors.OpExecError("Instance does not seem to be running at all."
3609 " In this case, it's safer to repair by"
3610 " running 'gnt-instance stop' to ensure disk"
3611 " shutdown, and then restarting it.")
3613 if runningon_target:
3614 # the migration has actually succeeded, we need to update the config
3615 self.feedback_fn("* instance running on secondary node (%s),"
3616 " updating config" % target_node)
3617 instance.primary_node = target_node
3618 self.cfg.Update(instance)
3619 demoted_node = source_node
3621 self.feedback_fn("* instance confirmed to be running on its"
3622 " primary node (%s)" % source_node)
3623 demoted_node = target_node
3625 self._EnsureSecondary(demoted_node)
3627 self._WaitUntilSync()
3628 except errors.OpExecError:
3629 # we ignore here errors, since if the device is standalone, it
3630 # won't be able to sync
3632 self._GoStandalone()
3633 self._GoReconnect(False)
3634 self._WaitUntilSync()
3636 self.feedback_fn("* done")
3638 def _RevertDiskStatus(self):
3639 """Try to revert the disk status after a failed migration.
3642 target_node = self.target_node
3644 self._EnsureSecondary(target_node)
3645 self._GoStandalone()
3646 self._GoReconnect(False)
3647 self._WaitUntilSync()
3648 except errors.OpExecError, err:
3649 self.LogWarning("Migration failed and I can't reconnect the"
3650 " drives: error '%s'\n"
3651 "Please look and recover the instance status" %
3654 def _AbortMigration(self):
3655 """Call the hypervisor code to abort a started migration.
3658 instance = self.instance
3659 target_node = self.target_node
3660 migration_info = self.migration_info
3662 abort_result = self.rpc.call_finalize_migration(target_node,
3666 abort_msg = abort_result.RemoteFailMsg()
3668 logging.error("Aborting migration failed on target node %s: %s" %
3669 (target_node, abort_msg))
3670 # Don't raise an exception here, as we stil have to try to revert the
3671 # disk status, even if this step failed.
3673 def _ExecMigration(self):
3674 """Migrate an instance.
3676 The migrate is done by:
3677 - change the disks into dual-master mode
3678 - wait until disks are fully synchronized again
3679 - migrate the instance
3680 - change disks on the new secondary node (the old primary) to secondary
3681 - wait until disks are fully synchronized
3682 - change disks into single-master mode
3685 instance = self.instance
3686 target_node = self.target_node
3687 source_node = self.source_node
3689 self.feedback_fn("* checking disk consistency between source and target")
3690 for dev in instance.disks:
3691 if not _CheckDiskConsistency(self, dev, target_node, False):
3692 raise errors.OpExecError("Disk %s is degraded or not fully"
3693 " synchronized on target node,"
3694 " aborting migrate." % dev.iv_name)
3696 # First get the migration information from the remote node
3697 result = self.rpc.call_migration_info(source_node, instance)
3698 msg = result.RemoteFailMsg()
3700 log_err = ("Failed fetching source migration information from %s: %s" %
3702 logging.error(log_err)
3703 raise errors.OpExecError(log_err)
3705 self.migration_info = migration_info = result.data[1]
3707 # Then switch the disks to master/master mode
3708 self._EnsureSecondary(target_node)
3709 self._GoStandalone()
3710 self._GoReconnect(True)
3711 self._WaitUntilSync()
3713 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3714 result = self.rpc.call_accept_instance(target_node,
3717 self.nodes_ip[target_node])
3719 msg = result.RemoteFailMsg()
3721 logging.error("Instance pre-migration failed, trying to revert"
3722 " disk status: %s", msg)
3723 self._AbortMigration()
3724 self._RevertDiskStatus()
3725 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3726 (instance.name, msg))
3728 self.feedback_fn("* migrating instance to %s" % target_node)
3730 result = self.rpc.call_instance_migrate(source_node, instance,
3731 self.nodes_ip[target_node],
3733 msg = result.RemoteFailMsg()
3735 logging.error("Instance migration failed, trying to revert"
3736 " disk status: %s", msg)
3737 self._AbortMigration()
3738 self._RevertDiskStatus()
3739 raise errors.OpExecError("Could not migrate instance %s: %s" %
3740 (instance.name, msg))
3743 instance.primary_node = target_node
3744 # distribute new instance config to the other nodes
3745 self.cfg.Update(instance)
3747 result = self.rpc.call_finalize_migration(target_node,
3751 msg = result.RemoteFailMsg()
3753 logging.error("Instance migration succeeded, but finalization failed:"
3755 raise errors.OpExecError("Could not finalize instance migration: %s" %
3758 self._EnsureSecondary(source_node)
3759 self._WaitUntilSync()
3760 self._GoStandalone()
3761 self._GoReconnect(False)
3762 self._WaitUntilSync()
3764 self.feedback_fn("* done")
3766 def Exec(self, feedback_fn):
3767 """Perform the migration.
3770 self.feedback_fn = feedback_fn
3772 self.source_node = self.instance.primary_node
3773 self.target_node = self.instance.secondary_nodes[0]
3774 self.all_nodes = [self.source_node, self.target_node]
3776 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3777 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3780 return self._ExecCleanup()
3782 return self._ExecMigration()
3785 def _CreateBlockDev(lu, node, instance, device, force_create,
3787 """Create a tree of block devices on a given node.
3789 If this device type has to be created on secondaries, create it and
3792 If not, just recurse to children keeping the same 'force' value.
3794 @param lu: the lu on whose behalf we execute
3795 @param node: the node on which to create the device
3796 @type instance: L{objects.Instance}
3797 @param instance: the instance which owns the device
3798 @type device: L{objects.Disk}
3799 @param device: the device to create
3800 @type force_create: boolean
3801 @param force_create: whether to force creation of this device; this
3802 will be change to True whenever we find a device which has
3803 CreateOnSecondary() attribute
3804 @param info: the extra 'metadata' we should attach to the device
3805 (this will be represented as a LVM tag)
3806 @type force_open: boolean
3807 @param force_open: this parameter will be passes to the
3808 L{backend.CreateBlockDevice} function where it specifies
3809 whether we run on primary or not, and it affects both
3810 the child assembly and the device own Open() execution
3813 if device.CreateOnSecondary():
3817 for child in device.children:
3818 _CreateBlockDev(lu, node, instance, child, force_create,
3821 if not force_create:
3824 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3827 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3828 """Create a single block device on a given node.
3830 This will not recurse over children of the device, so they must be
3833 @param lu: the lu on whose behalf we execute
3834 @param node: the node on which to create the device
3835 @type instance: L{objects.Instance}
3836 @param instance: the instance which owns the device
3837 @type device: L{objects.Disk}
3838 @param device: the device to create
3839 @param info: the extra 'metadata' we should attach to the device
3840 (this will be represented as a LVM tag)
3841 @type force_open: boolean
3842 @param force_open: this parameter will be passes to the
3843 L{backend.CreateBlockDevice} function where it specifies
3844 whether we run on primary or not, and it affects both
3845 the child assembly and the device own Open() execution
3848 lu.cfg.SetDiskID(device, node)
3849 result = lu.rpc.call_blockdev_create(node, device, device.size,
3850 instance.name, force_open, info)
3851 msg = result.RemoteFailMsg()
3853 raise errors.OpExecError("Can't create block device %s on"
3854 " node %s for instance %s: %s" %
3855 (device, node, instance.name, msg))
3856 if device.physical_id is None:
3857 device.physical_id = result.data[1]
3860 def _GenerateUniqueNames(lu, exts):
3861 """Generate a suitable LV name.
3863 This will generate a logical volume name for the given instance.
3868 new_id = lu.cfg.GenerateUniqueID()
3869 results.append("%s%s" % (new_id, val))
3873 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3875 """Generate a drbd8 device complete with its children.
3878 port = lu.cfg.AllocatePort()
3879 vgname = lu.cfg.GetVGName()
3880 shared_secret = lu.cfg.GenerateDRBDSecret()
3881 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3882 logical_id=(vgname, names[0]))
3883 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3884 logical_id=(vgname, names[1]))
3885 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3886 logical_id=(primary, secondary, port,
3889 children=[dev_data, dev_meta],
3894 def _GenerateDiskTemplate(lu, template_name,
3895 instance_name, primary_node,
3896 secondary_nodes, disk_info,
3897 file_storage_dir, file_driver,
3899 """Generate the entire disk layout for a given template type.
3902 #TODO: compute space requirements
3904 vgname = lu.cfg.GetVGName()
3905 disk_count = len(disk_info)
3907 if template_name == constants.DT_DISKLESS:
3909 elif template_name == constants.DT_PLAIN:
3910 if len(secondary_nodes) != 0:
3911 raise errors.ProgrammerError("Wrong template configuration")
3913 names = _GenerateUniqueNames(lu, [".disk%d" % i
3914 for i in range(disk_count)])
3915 for idx, disk in enumerate(disk_info):
3916 disk_index = idx + base_index
3917 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3918 logical_id=(vgname, names[idx]),
3919 iv_name="disk/%d" % disk_index,
3921 disks.append(disk_dev)
3922 elif template_name == constants.DT_DRBD8:
3923 if len(secondary_nodes) != 1:
3924 raise errors.ProgrammerError("Wrong template configuration")
3925 remote_node = secondary_nodes[0]
3926 minors = lu.cfg.AllocateDRBDMinor(
3927 [primary_node, remote_node] * len(disk_info), instance_name)
3930 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3931 for i in range(disk_count)]):
3932 names.append(lv_prefix + "_data")
3933 names.append(lv_prefix + "_meta")
3934 for idx, disk in enumerate(disk_info):
3935 disk_index = idx + base_index
3936 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3937 disk["size"], names[idx*2:idx*2+2],
3938 "disk/%d" % disk_index,
3939 minors[idx*2], minors[idx*2+1])
3940 disk_dev.mode = disk["mode"]
3941 disks.append(disk_dev)
3942 elif template_name == constants.DT_FILE:
3943 if len(secondary_nodes) != 0:
3944 raise errors.ProgrammerError("Wrong template configuration")
3946 for idx, disk in enumerate(disk_info):
3947 disk_index = idx + base_index
3948 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3949 iv_name="disk/%d" % disk_index,
3950 logical_id=(file_driver,
3951 "%s/disk%d" % (file_storage_dir,
3954 disks.append(disk_dev)
3956 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3960 def _GetInstanceInfoText(instance):
3961 """Compute that text that should be added to the disk's metadata.
3964 return "originstname+%s" % instance.name
3967 def _CreateDisks(lu, instance):
3968 """Create all disks for an instance.
3970 This abstracts away some work from AddInstance.
3972 @type lu: L{LogicalUnit}
3973 @param lu: the logical unit on whose behalf we execute
3974 @type instance: L{objects.Instance}
3975 @param instance: the instance whose disks we should create
3977 @return: the success of the creation
3980 info = _GetInstanceInfoText(instance)
3981 pnode = instance.primary_node
3983 if instance.disk_template == constants.DT_FILE:
3984 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3985 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
3987 if result.failed or not result.data:
3988 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
3990 if not result.data[0]:
3991 raise errors.OpExecError("Failed to create directory '%s'" %
3994 # Note: this needs to be kept in sync with adding of disks in
3995 # LUSetInstanceParams
3996 for device in instance.disks:
3997 logging.info("Creating volume %s for instance %s",
3998 device.iv_name, instance.name)
4000 for node in instance.all_nodes:
4001 f_create = node == pnode
4002 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4005 def _RemoveDisks(lu, instance):
4006 """Remove all disks for an instance.
4008 This abstracts away some work from `AddInstance()` and
4009 `RemoveInstance()`. Note that in case some of the devices couldn't
4010 be removed, the removal will continue with the other ones (compare
4011 with `_CreateDisks()`).
4013 @type lu: L{LogicalUnit}
4014 @param lu: the logical unit on whose behalf we execute
4015 @type instance: L{objects.Instance}
4016 @param instance: the instance whose disks we should remove
4018 @return: the success of the removal
4021 logging.info("Removing block devices for instance %s", instance.name)
4024 for device in instance.disks:
4025 for node, disk in device.ComputeNodeTree(instance.primary_node):
4026 lu.cfg.SetDiskID(disk, node)
4027 result = lu.rpc.call_blockdev_remove(node, disk)
4028 if result.failed or not result.data:
4029 lu.proc.LogWarning("Could not remove block device %s on node %s,"
4030 " continuing anyway", device.iv_name, node)
4033 if instance.disk_template == constants.DT_FILE:
4034 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4035 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4037 if result.failed or not result.data:
4038 logging.error("Could not remove directory '%s'", file_storage_dir)
4044 def _ComputeDiskSize(disk_template, disks):
4045 """Compute disk size requirements in the volume group
4048 # Required free disk space as a function of disk and swap space
4050 constants.DT_DISKLESS: None,
4051 constants.DT_PLAIN: sum(d["size"] for d in disks),
4052 # 128 MB are added for drbd metadata for each disk
4053 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4054 constants.DT_FILE: None,
4057 if disk_template not in req_size_dict:
4058 raise errors.ProgrammerError("Disk template '%s' size requirement"
4059 " is unknown" % disk_template)
4061 return req_size_dict[disk_template]
4064 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4065 """Hypervisor parameter validation.
4067 This function abstract the hypervisor parameter validation to be
4068 used in both instance create and instance modify.
4070 @type lu: L{LogicalUnit}
4071 @param lu: the logical unit for which we check
4072 @type nodenames: list
4073 @param nodenames: the list of nodes on which we should check
4074 @type hvname: string
4075 @param hvname: the name of the hypervisor we should use
4076 @type hvparams: dict
4077 @param hvparams: the parameters which we need to check
4078 @raise errors.OpPrereqError: if the parameters are not valid
4081 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4084 for node in nodenames:
4089 if not info.data or not isinstance(info.data, (tuple, list)):
4090 raise errors.OpPrereqError("Cannot get current information"
4091 " from node '%s' (%s)" % (node, info.data))
4092 if not info.data[0]:
4093 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4094 " %s" % info.data[1])
4097 class LUCreateInstance(LogicalUnit):
4098 """Create an instance.
4101 HPATH = "instance-add"
4102 HTYPE = constants.HTYPE_INSTANCE
4103 _OP_REQP = ["instance_name", "disks", "disk_template",
4105 "wait_for_sync", "ip_check", "nics",
4106 "hvparams", "beparams"]
4109 def _ExpandNode(self, node):
4110 """Expands and checks one node name.
4113 node_full = self.cfg.ExpandNodeName(node)
4114 if node_full is None:
4115 raise errors.OpPrereqError("Unknown node %s" % node)
4118 def ExpandNames(self):
4119 """ExpandNames for CreateInstance.
4121 Figure out the right locks for instance creation.
4124 self.needed_locks = {}
4126 # set optional parameters to none if they don't exist
4127 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4128 if not hasattr(self.op, attr):
4129 setattr(self.op, attr, None)
4131 # cheap checks, mostly valid constants given
4133 # verify creation mode
4134 if self.op.mode not in (constants.INSTANCE_CREATE,
4135 constants.INSTANCE_IMPORT):
4136 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4139 # disk template and mirror node verification
4140 if self.op.disk_template not in constants.DISK_TEMPLATES:
4141 raise errors.OpPrereqError("Invalid disk template name")
4143 if self.op.hypervisor is None:
4144 self.op.hypervisor = self.cfg.GetHypervisorType()
4146 cluster = self.cfg.GetClusterInfo()
4147 enabled_hvs = cluster.enabled_hypervisors
4148 if self.op.hypervisor not in enabled_hvs:
4149 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4150 " cluster (%s)" % (self.op.hypervisor,
4151 ",".join(enabled_hvs)))
4153 # check hypervisor parameter syntax (locally)
4155 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4157 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4158 hv_type.CheckParameterSyntax(filled_hvp)
4160 # fill and remember the beparams dict
4161 utils.CheckBEParams(self.op.beparams)
4162 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4165 #### instance parameters check
4167 # instance name verification
4168 hostname1 = utils.HostInfo(self.op.instance_name)
4169 self.op.instance_name = instance_name = hostname1.name
4171 # this is just a preventive check, but someone might still add this
4172 # instance in the meantime, and creation will fail at lock-add time
4173 if instance_name in self.cfg.GetInstanceList():
4174 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4177 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4181 for nic in self.op.nics:
4182 # ip validity checks
4183 ip = nic.get("ip", None)
4184 if ip is None or ip.lower() == "none":
4186 elif ip.lower() == constants.VALUE_AUTO:
4187 nic_ip = hostname1.ip
4189 if not utils.IsValidIP(ip):
4190 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4191 " like a valid IP" % ip)
4194 # MAC address verification
4195 mac = nic.get("mac", constants.VALUE_AUTO)
4196 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4197 if not utils.IsValidMac(mac.lower()):
4198 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4200 # bridge verification
4201 bridge = nic.get("bridge", None)
4203 bridge = self.cfg.GetDefBridge()
4204 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4206 # disk checks/pre-build
4208 for disk in self.op.disks:
4209 mode = disk.get("mode", constants.DISK_RDWR)
4210 if mode not in constants.DISK_ACCESS_SET:
4211 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4213 size = disk.get("size", None)
4215 raise errors.OpPrereqError("Missing disk size")
4219 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4220 self.disks.append({"size": size, "mode": mode})
4222 # used in CheckPrereq for ip ping check
4223 self.check_ip = hostname1.ip
4225 # file storage checks
4226 if (self.op.file_driver and
4227 not self.op.file_driver in constants.FILE_DRIVER):
4228 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4229 self.op.file_driver)
4231 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4232 raise errors.OpPrereqError("File storage directory path not absolute")
4234 ### Node/iallocator related checks
4235 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4236 raise errors.OpPrereqError("One and only one of iallocator and primary"
4237 " node must be given")
4239 if self.op.iallocator:
4240 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4242 self.op.pnode = self._ExpandNode(self.op.pnode)
4243 nodelist = [self.op.pnode]
4244 if self.op.snode is not None:
4245 self.op.snode = self._ExpandNode(self.op.snode)
4246 nodelist.append(self.op.snode)
4247 self.needed_locks[locking.LEVEL_NODE] = nodelist
4249 # in case of import lock the source node too
4250 if self.op.mode == constants.INSTANCE_IMPORT:
4251 src_node = getattr(self.op, "src_node", None)
4252 src_path = getattr(self.op, "src_path", None)
4254 if src_path is None:
4255 self.op.src_path = src_path = self.op.instance_name
4257 if src_node is None:
4258 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4259 self.op.src_node = None
4260 if os.path.isabs(src_path):
4261 raise errors.OpPrereqError("Importing an instance from an absolute"
4262 " path requires a source node option.")
4264 self.op.src_node = src_node = self._ExpandNode(src_node)
4265 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4266 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4267 if not os.path.isabs(src_path):
4268 self.op.src_path = src_path = \
4269 os.path.join(constants.EXPORT_DIR, src_path)
4271 else: # INSTANCE_CREATE
4272 if getattr(self.op, "os_type", None) is None:
4273 raise errors.OpPrereqError("No guest OS specified")
4275 def _RunAllocator(self):
4276 """Run the allocator based on input opcode.
4279 nics = [n.ToDict() for n in self.nics]
4280 ial = IAllocator(self,
4281 mode=constants.IALLOCATOR_MODE_ALLOC,
4282 name=self.op.instance_name,
4283 disk_template=self.op.disk_template,
4286 vcpus=self.be_full[constants.BE_VCPUS],
4287 mem_size=self.be_full[constants.BE_MEMORY],
4290 hypervisor=self.op.hypervisor,
4293 ial.Run(self.op.iallocator)
4296 raise errors.OpPrereqError("Can't compute nodes using"
4297 " iallocator '%s': %s" % (self.op.iallocator,
4299 if len(ial.nodes) != ial.required_nodes:
4300 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4301 " of nodes (%s), required %s" %
4302 (self.op.iallocator, len(ial.nodes),
4303 ial.required_nodes))
4304 self.op.pnode = ial.nodes[0]
4305 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4306 self.op.instance_name, self.op.iallocator,
4307 ", ".join(ial.nodes))
4308 if ial.required_nodes == 2:
4309 self.op.snode = ial.nodes[1]
4311 def BuildHooksEnv(self):
4314 This runs on master, primary and secondary nodes of the instance.
4318 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4319 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4320 "INSTANCE_ADD_MODE": self.op.mode,
4322 if self.op.mode == constants.INSTANCE_IMPORT:
4323 env["INSTANCE_SRC_NODE"] = self.op.src_node
4324 env["INSTANCE_SRC_PATH"] = self.op.src_path
4325 env["INSTANCE_SRC_IMAGES"] = self.src_images
4327 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4328 primary_node=self.op.pnode,
4329 secondary_nodes=self.secondaries,
4330 status=self.instance_status,
4331 os_type=self.op.os_type,
4332 memory=self.be_full[constants.BE_MEMORY],
4333 vcpus=self.be_full[constants.BE_VCPUS],
4334 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4337 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4342 def CheckPrereq(self):
4343 """Check prerequisites.
4346 if (not self.cfg.GetVGName() and
4347 self.op.disk_template not in constants.DTS_NOT_LVM):
4348 raise errors.OpPrereqError("Cluster does not support lvm-based"
4352 if self.op.mode == constants.INSTANCE_IMPORT:
4353 src_node = self.op.src_node
4354 src_path = self.op.src_path
4356 if src_node is None:
4357 exp_list = self.rpc.call_export_list(
4358 self.acquired_locks[locking.LEVEL_NODE])
4360 for node in exp_list:
4361 if not exp_list[node].failed and src_path in exp_list[node].data:
4363 self.op.src_node = src_node = node
4364 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4368 raise errors.OpPrereqError("No export found for relative path %s" %
4371 _CheckNodeOnline(self, src_node)
4372 result = self.rpc.call_export_info(src_node, src_path)
4375 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4377 export_info = result.data
4378 if not export_info.has_section(constants.INISECT_EXP):
4379 raise errors.ProgrammerError("Corrupted export config")
4381 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4382 if (int(ei_version) != constants.EXPORT_VERSION):
4383 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4384 (ei_version, constants.EXPORT_VERSION))
4386 # Check that the new instance doesn't have less disks than the export
4387 instance_disks = len(self.disks)
4388 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4389 if instance_disks < export_disks:
4390 raise errors.OpPrereqError("Not enough disks to import."
4391 " (instance: %d, export: %d)" %
4392 (instance_disks, export_disks))
4394 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4396 for idx in range(export_disks):
4397 option = 'disk%d_dump' % idx
4398 if export_info.has_option(constants.INISECT_INS, option):
4399 # FIXME: are the old os-es, disk sizes, etc. useful?
4400 export_name = export_info.get(constants.INISECT_INS, option)
4401 image = os.path.join(src_path, export_name)
4402 disk_images.append(image)
4404 disk_images.append(False)
4406 self.src_images = disk_images
4408 old_name = export_info.get(constants.INISECT_INS, 'name')
4409 # FIXME: int() here could throw a ValueError on broken exports
4410 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4411 if self.op.instance_name == old_name:
4412 for idx, nic in enumerate(self.nics):
4413 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4414 nic_mac_ini = 'nic%d_mac' % idx
4415 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4417 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4418 if self.op.start and not self.op.ip_check:
4419 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4420 " adding an instance in start mode")
4422 if self.op.ip_check:
4423 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4424 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4425 (self.check_ip, self.op.instance_name))
4429 if self.op.iallocator is not None:
4430 self._RunAllocator()
4432 #### node related checks
4434 # check primary node
4435 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4436 assert self.pnode is not None, \
4437 "Cannot retrieve locked node %s" % self.op.pnode
4439 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4442 self.secondaries = []
4444 # mirror node verification
4445 if self.op.disk_template in constants.DTS_NET_MIRROR:
4446 if self.op.snode is None:
4447 raise errors.OpPrereqError("The networked disk templates need"
4449 if self.op.snode == pnode.name:
4450 raise errors.OpPrereqError("The secondary node cannot be"
4451 " the primary node.")
4452 self.secondaries.append(self.op.snode)
4453 _CheckNodeOnline(self, self.op.snode)
4455 nodenames = [pnode.name] + self.secondaries
4457 req_size = _ComputeDiskSize(self.op.disk_template,
4460 # Check lv size requirements
4461 if req_size is not None:
4462 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4464 for node in nodenames:
4465 info = nodeinfo[node]
4469 raise errors.OpPrereqError("Cannot get current information"
4470 " from node '%s'" % node)
4471 vg_free = info.get('vg_free', None)
4472 if not isinstance(vg_free, int):
4473 raise errors.OpPrereqError("Can't compute free disk space on"
4475 if req_size > info['vg_free']:
4476 raise errors.OpPrereqError("Not enough disk space on target node %s."
4477 " %d MB available, %d MB required" %
4478 (node, info['vg_free'], req_size))
4480 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4483 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4485 if not isinstance(result.data, objects.OS):
4486 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4487 " primary node" % self.op.os_type)
4489 # bridge check on primary node
4490 bridges = [n.bridge for n in self.nics]
4491 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4494 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4495 " exist on destination node '%s'" %
4496 (",".join(bridges), pnode.name))
4498 # memory check on primary node
4500 _CheckNodeFreeMemory(self, self.pnode.name,
4501 "creating instance %s" % self.op.instance_name,
4502 self.be_full[constants.BE_MEMORY],
4505 self.instance_status = self.op.start
4507 def Exec(self, feedback_fn):
4508 """Create and add the instance to the cluster.
4511 instance = self.op.instance_name
4512 pnode_name = self.pnode.name
4514 for nic in self.nics:
4515 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4516 nic.mac = self.cfg.GenerateMAC()
4518 ht_kind = self.op.hypervisor
4519 if ht_kind in constants.HTS_REQ_PORT:
4520 network_port = self.cfg.AllocatePort()
4524 ##if self.op.vnc_bind_address is None:
4525 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4527 # this is needed because os.path.join does not accept None arguments
4528 if self.op.file_storage_dir is None:
4529 string_file_storage_dir = ""
4531 string_file_storage_dir = self.op.file_storage_dir
4533 # build the full file storage dir path
4534 file_storage_dir = os.path.normpath(os.path.join(
4535 self.cfg.GetFileStorageDir(),
4536 string_file_storage_dir, instance))
4539 disks = _GenerateDiskTemplate(self,
4540 self.op.disk_template,
4541 instance, pnode_name,
4545 self.op.file_driver,
4548 iobj = objects.Instance(name=instance, os=self.op.os_type,
4549 primary_node=pnode_name,
4550 nics=self.nics, disks=disks,
4551 disk_template=self.op.disk_template,
4552 admin_up=self.instance_status,
4553 network_port=network_port,
4554 beparams=self.op.beparams,
4555 hvparams=self.op.hvparams,
4556 hypervisor=self.op.hypervisor,
4559 feedback_fn("* creating instance disks...")
4561 _CreateDisks(self, iobj)
4562 except errors.OpExecError:
4563 self.LogWarning("Device creation failed, reverting...")
4565 _RemoveDisks(self, iobj)
4567 self.cfg.ReleaseDRBDMinors(instance)
4570 feedback_fn("adding instance %s to cluster config" % instance)
4572 self.cfg.AddInstance(iobj)
4573 # Declare that we don't want to remove the instance lock anymore, as we've
4574 # added the instance to the config
4575 del self.remove_locks[locking.LEVEL_INSTANCE]
4576 # Unlock all the nodes
4577 if self.op.mode == constants.INSTANCE_IMPORT:
4578 nodes_keep = [self.op.src_node]
4579 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4580 if node != self.op.src_node]
4581 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4582 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4584 self.context.glm.release(locking.LEVEL_NODE)
4585 del self.acquired_locks[locking.LEVEL_NODE]
4587 if self.op.wait_for_sync:
4588 disk_abort = not _WaitForSync(self, iobj)
4589 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4590 # make sure the disks are not degraded (still sync-ing is ok)
4592 feedback_fn("* checking mirrors status")
4593 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4598 _RemoveDisks(self, iobj)
4599 self.cfg.RemoveInstance(iobj.name)
4600 # Make sure the instance lock gets removed
4601 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4602 raise errors.OpExecError("There are some degraded disks for"
4605 feedback_fn("creating os for instance %s on node %s" %
4606 (instance, pnode_name))
4608 if iobj.disk_template != constants.DT_DISKLESS:
4609 if self.op.mode == constants.INSTANCE_CREATE:
4610 feedback_fn("* running the instance OS create scripts...")
4611 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4612 msg = result.RemoteFailMsg()
4614 raise errors.OpExecError("Could not add os for instance %s"
4616 (instance, pnode_name, msg))
4618 elif self.op.mode == constants.INSTANCE_IMPORT:
4619 feedback_fn("* running the instance OS import scripts...")
4620 src_node = self.op.src_node
4621 src_images = self.src_images
4622 cluster_name = self.cfg.GetClusterName()
4623 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4624 src_node, src_images,
4626 import_result.Raise()
4627 for idx, result in enumerate(import_result.data):
4629 self.LogWarning("Could not import the image %s for instance"
4630 " %s, disk %d, on node %s" %
4631 (src_images[idx], instance, idx, pnode_name))
4633 # also checked in the prereq part
4634 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4638 logging.info("Starting instance %s on node %s", instance, pnode_name)
4639 feedback_fn("* starting instance...")
4640 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4641 msg = result.RemoteFailMsg()
4643 raise errors.OpExecError("Could not start instance: %s" % msg)
4646 class LUConnectConsole(NoHooksLU):
4647 """Connect to an instance's console.
4649 This is somewhat special in that it returns the command line that
4650 you need to run on the master node in order to connect to the
4654 _OP_REQP = ["instance_name"]
4657 def ExpandNames(self):
4658 self._ExpandAndLockInstance()
4660 def CheckPrereq(self):
4661 """Check prerequisites.
4663 This checks that the instance is in the cluster.
4666 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4667 assert self.instance is not None, \
4668 "Cannot retrieve locked instance %s" % self.op.instance_name
4669 _CheckNodeOnline(self, self.instance.primary_node)
4671 def Exec(self, feedback_fn):
4672 """Connect to the console of an instance
4675 instance = self.instance
4676 node = instance.primary_node
4678 node_insts = self.rpc.call_instance_list([node],
4679 [instance.hypervisor])[node]
4682 if instance.name not in node_insts.data:
4683 raise errors.OpExecError("Instance %s is not running." % instance.name)
4685 logging.debug("Connecting to console of %s on %s", instance.name, node)
4687 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4688 console_cmd = hyper.GetShellCommandForConsole(instance)
4691 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4694 class LUReplaceDisks(LogicalUnit):
4695 """Replace the disks of an instance.
4698 HPATH = "mirrors-replace"
4699 HTYPE = constants.HTYPE_INSTANCE
4700 _OP_REQP = ["instance_name", "mode", "disks"]
4703 def CheckArguments(self):
4704 if not hasattr(self.op, "remote_node"):
4705 self.op.remote_node = None
4706 if not hasattr(self.op, "iallocator"):
4707 self.op.iallocator = None
4709 # check for valid parameter combination
4710 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4711 if self.op.mode == constants.REPLACE_DISK_CHG:
4713 raise errors.OpPrereqError("When changing the secondary either an"
4714 " iallocator script must be used or the"
4717 raise errors.OpPrereqError("Give either the iallocator or the new"
4718 " secondary, not both")
4719 else: # not replacing the secondary
4721 raise errors.OpPrereqError("The iallocator and new node options can"
4722 " be used only when changing the"
4725 def ExpandNames(self):
4726 self._ExpandAndLockInstance()
4728 if self.op.iallocator is not None:
4729 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4730 elif self.op.remote_node is not None:
4731 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4732 if remote_node is None:
4733 raise errors.OpPrereqError("Node '%s' not known" %
4734 self.op.remote_node)
4735 self.op.remote_node = remote_node
4736 # Warning: do not remove the locking of the new secondary here
4737 # unless DRBD8.AddChildren is changed to work in parallel;
4738 # currently it doesn't since parallel invocations of
4739 # FindUnusedMinor will conflict
4740 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4741 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4743 self.needed_locks[locking.LEVEL_NODE] = []
4744 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4746 def DeclareLocks(self, level):
4747 # If we're not already locking all nodes in the set we have to declare the
4748 # instance's primary/secondary nodes.
4749 if (level == locking.LEVEL_NODE and
4750 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4751 self._LockInstancesNodes()
4753 def _RunAllocator(self):
4754 """Compute a new secondary node using an IAllocator.
4757 ial = IAllocator(self,
4758 mode=constants.IALLOCATOR_MODE_RELOC,
4759 name=self.op.instance_name,
4760 relocate_from=[self.sec_node])
4762 ial.Run(self.op.iallocator)
4765 raise errors.OpPrereqError("Can't compute nodes using"
4766 " iallocator '%s': %s" % (self.op.iallocator,
4768 if len(ial.nodes) != ial.required_nodes:
4769 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4770 " of nodes (%s), required %s" %
4771 (len(ial.nodes), ial.required_nodes))
4772 self.op.remote_node = ial.nodes[0]
4773 self.LogInfo("Selected new secondary for the instance: %s",
4774 self.op.remote_node)
4776 def BuildHooksEnv(self):
4779 This runs on the master, the primary and all the secondaries.
4783 "MODE": self.op.mode,
4784 "NEW_SECONDARY": self.op.remote_node,
4785 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4787 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4789 self.cfg.GetMasterNode(),
4790 self.instance.primary_node,
4792 if self.op.remote_node is not None:
4793 nl.append(self.op.remote_node)
4796 def CheckPrereq(self):
4797 """Check prerequisites.
4799 This checks that the instance is in the cluster.
4802 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4803 assert instance is not None, \
4804 "Cannot retrieve locked instance %s" % self.op.instance_name
4805 self.instance = instance
4807 if instance.disk_template != constants.DT_DRBD8:
4808 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4811 if len(instance.secondary_nodes) != 1:
4812 raise errors.OpPrereqError("The instance has a strange layout,"
4813 " expected one secondary but found %d" %
4814 len(instance.secondary_nodes))
4816 self.sec_node = instance.secondary_nodes[0]
4818 if self.op.iallocator is not None:
4819 self._RunAllocator()
4821 remote_node = self.op.remote_node
4822 if remote_node is not None:
4823 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4824 assert self.remote_node_info is not None, \
4825 "Cannot retrieve locked node %s" % remote_node
4827 self.remote_node_info = None
4828 if remote_node == instance.primary_node:
4829 raise errors.OpPrereqError("The specified node is the primary node of"
4831 elif remote_node == self.sec_node:
4832 raise errors.OpPrereqError("The specified node is already the"
4833 " secondary node of the instance.")
4835 if self.op.mode == constants.REPLACE_DISK_PRI:
4836 n1 = self.tgt_node = instance.primary_node
4837 n2 = self.oth_node = self.sec_node
4838 elif self.op.mode == constants.REPLACE_DISK_SEC:
4839 n1 = self.tgt_node = self.sec_node
4840 n2 = self.oth_node = instance.primary_node
4841 elif self.op.mode == constants.REPLACE_DISK_CHG:
4842 n1 = self.new_node = remote_node
4843 n2 = self.oth_node = instance.primary_node
4844 self.tgt_node = self.sec_node
4846 raise errors.ProgrammerError("Unhandled disk replace mode")
4848 _CheckNodeOnline(self, n1)
4849 _CheckNodeOnline(self, n2)
4851 if not self.op.disks:
4852 self.op.disks = range(len(instance.disks))
4854 for disk_idx in self.op.disks:
4855 instance.FindDisk(disk_idx)
4857 def _ExecD8DiskOnly(self, feedback_fn):
4858 """Replace a disk on the primary or secondary for dbrd8.
4860 The algorithm for replace is quite complicated:
4862 1. for each disk to be replaced:
4864 1. create new LVs on the target node with unique names
4865 1. detach old LVs from the drbd device
4866 1. rename old LVs to name_replaced.<time_t>
4867 1. rename new LVs to old LVs
4868 1. attach the new LVs (with the old names now) to the drbd device
4870 1. wait for sync across all devices
4872 1. for each modified disk:
4874 1. remove old LVs (which have the name name_replaces.<time_t>)
4876 Failures are not very well handled.
4880 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4881 instance = self.instance
4883 vgname = self.cfg.GetVGName()
4886 tgt_node = self.tgt_node
4887 oth_node = self.oth_node
4889 # Step: check device activation
4890 self.proc.LogStep(1, steps_total, "check device existence")
4891 info("checking volume groups")
4892 my_vg = cfg.GetVGName()
4893 results = self.rpc.call_vg_list([oth_node, tgt_node])
4895 raise errors.OpExecError("Can't list volume groups on the nodes")
4896 for node in oth_node, tgt_node:
4898 if res.failed or not res.data or my_vg not in res.data:
4899 raise errors.OpExecError("Volume group '%s' not found on %s" %
4901 for idx, dev in enumerate(instance.disks):
4902 if idx not in self.op.disks:
4904 for node in tgt_node, oth_node:
4905 info("checking disk/%d on %s" % (idx, node))
4906 cfg.SetDiskID(dev, node)
4907 if not self.rpc.call_blockdev_find(node, dev):
4908 raise errors.OpExecError("Can't find disk/%d on node %s" %
4911 # Step: check other node consistency
4912 self.proc.LogStep(2, steps_total, "check peer consistency")
4913 for idx, dev in enumerate(instance.disks):
4914 if idx not in self.op.disks:
4916 info("checking disk/%d consistency on %s" % (idx, oth_node))
4917 if not _CheckDiskConsistency(self, dev, oth_node,
4918 oth_node==instance.primary_node):
4919 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4920 " to replace disks on this node (%s)" %
4921 (oth_node, tgt_node))
4923 # Step: create new storage
4924 self.proc.LogStep(3, steps_total, "allocate new storage")
4925 for idx, dev in enumerate(instance.disks):
4926 if idx not in self.op.disks:
4929 cfg.SetDiskID(dev, tgt_node)
4930 lv_names = [".disk%d_%s" % (idx, suf)
4931 for suf in ["data", "meta"]]
4932 names = _GenerateUniqueNames(self, lv_names)
4933 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4934 logical_id=(vgname, names[0]))
4935 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4936 logical_id=(vgname, names[1]))
4937 new_lvs = [lv_data, lv_meta]
4938 old_lvs = dev.children
4939 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4940 info("creating new local storage on %s for %s" %
4941 (tgt_node, dev.iv_name))
4942 # we pass force_create=True to force the LVM creation
4943 for new_lv in new_lvs:
4944 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4945 _GetInstanceInfoText(instance), False)
4947 # Step: for each lv, detach+rename*2+attach
4948 self.proc.LogStep(4, steps_total, "change drbd configuration")
4949 for dev, old_lvs, new_lvs in iv_names.itervalues():
4950 info("detaching %s drbd from local storage" % dev.iv_name)
4951 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4954 raise errors.OpExecError("Can't detach drbd from local storage on node"
4955 " %s for device %s" % (tgt_node, dev.iv_name))
4957 #cfg.Update(instance)
4959 # ok, we created the new LVs, so now we know we have the needed
4960 # storage; as such, we proceed on the target node to rename
4961 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4962 # using the assumption that logical_id == physical_id (which in
4963 # turn is the unique_id on that node)
4965 # FIXME(iustin): use a better name for the replaced LVs
4966 temp_suffix = int(time.time())
4967 ren_fn = lambda d, suff: (d.physical_id[0],
4968 d.physical_id[1] + "_replaced-%s" % suff)
4969 # build the rename list based on what LVs exist on the node
4971 for to_ren in old_lvs:
4972 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4973 if not find_res.failed and find_res.data is not None: # device exists
4974 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4976 info("renaming the old LVs on the target node")
4977 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4980 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4981 # now we rename the new LVs to the old LVs
4982 info("renaming the new LVs on the target node")
4983 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4984 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4987 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4989 for old, new in zip(old_lvs, new_lvs):
4990 new.logical_id = old.logical_id
4991 cfg.SetDiskID(new, tgt_node)
4993 for disk in old_lvs:
4994 disk.logical_id = ren_fn(disk, temp_suffix)
4995 cfg.SetDiskID(disk, tgt_node)
4997 # now that the new lvs have the old name, we can add them to the device
4998 info("adding new mirror component on %s" % tgt_node)
4999 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5000 if result.failed or not result.data:
5001 for new_lv in new_lvs:
5002 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5003 if result.failed or not result.data:
5004 warning("Can't rollback device %s", hint="manually cleanup unused"
5006 raise errors.OpExecError("Can't add local storage to drbd")
5008 dev.children = new_lvs
5009 cfg.Update(instance)
5011 # Step: wait for sync
5013 # this can fail as the old devices are degraded and _WaitForSync
5014 # does a combined result over all disks, so we don't check its
5016 self.proc.LogStep(5, steps_total, "sync devices")
5017 _WaitForSync(self, instance, unlock=True)
5019 # so check manually all the devices
5020 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5021 cfg.SetDiskID(dev, instance.primary_node)
5022 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5023 if result.failed or result.data[5]:
5024 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5026 # Step: remove old storage
5027 self.proc.LogStep(6, steps_total, "removing old storage")
5028 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5029 info("remove logical volumes for %s" % name)
5031 cfg.SetDiskID(lv, tgt_node)
5032 result = self.rpc.call_blockdev_remove(tgt_node, lv)
5033 if result.failed or not result.data:
5034 warning("Can't remove old LV", hint="manually remove unused LVs")
5037 def _ExecD8Secondary(self, feedback_fn):
5038 """Replace the secondary node for drbd8.
5040 The algorithm for replace is quite complicated:
5041 - for all disks of the instance:
5042 - create new LVs on the new node with same names
5043 - shutdown the drbd device on the old secondary
5044 - disconnect the drbd network on the primary
5045 - create the drbd device on the new secondary
5046 - network attach the drbd on the primary, using an artifice:
5047 the drbd code for Attach() will connect to the network if it
5048 finds a device which is connected to the good local disks but
5050 - wait for sync across all devices
5051 - remove all disks from the old secondary
5053 Failures are not very well handled.
5057 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5058 instance = self.instance
5062 old_node = self.tgt_node
5063 new_node = self.new_node
5064 pri_node = instance.primary_node
5066 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5067 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5068 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5071 # Step: check device activation
5072 self.proc.LogStep(1, steps_total, "check device existence")
5073 info("checking volume groups")
5074 my_vg = cfg.GetVGName()
5075 results = self.rpc.call_vg_list([pri_node, new_node])
5076 for node in pri_node, new_node:
5078 if res.failed or not res.data or my_vg not in res.data:
5079 raise errors.OpExecError("Volume group '%s' not found on %s" %
5081 for idx, dev in enumerate(instance.disks):
5082 if idx not in self.op.disks:
5084 info("checking disk/%d on %s" % (idx, pri_node))
5085 cfg.SetDiskID(dev, pri_node)
5086 result = self.rpc.call_blockdev_find(pri_node, dev)
5089 raise errors.OpExecError("Can't find disk/%d on node %s" %
5092 # Step: check other node consistency
5093 self.proc.LogStep(2, steps_total, "check peer consistency")
5094 for idx, dev in enumerate(instance.disks):
5095 if idx not in self.op.disks:
5097 info("checking disk/%d consistency on %s" % (idx, pri_node))
5098 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5099 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5100 " unsafe to replace the secondary" %
5103 # Step: create new storage
5104 self.proc.LogStep(3, steps_total, "allocate new storage")
5105 for idx, dev in enumerate(instance.disks):
5106 info("adding new local storage on %s for disk/%d" %
5108 # we pass force_create=True to force LVM creation
5109 for new_lv in dev.children:
5110 _CreateBlockDev(self, new_node, instance, new_lv, True,
5111 _GetInstanceInfoText(instance), False)
5113 # Step 4: dbrd minors and drbd setups changes
5114 # after this, we must manually remove the drbd minors on both the
5115 # error and the success paths
5116 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5118 logging.debug("Allocated minors %s" % (minors,))
5119 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5120 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5122 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5123 # create new devices on new_node; note that we create two IDs:
5124 # one without port, so the drbd will be activated without
5125 # networking information on the new node at this stage, and one
5126 # with network, for the latter activation in step 4
5127 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5128 if pri_node == o_node1:
5133 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5134 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5136 iv_names[idx] = (dev, dev.children, new_net_id)
5137 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5139 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5140 logical_id=new_alone_id,
5141 children=dev.children)
5143 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5144 _GetInstanceInfoText(instance), False)
5145 except errors.BlockDeviceError:
5146 self.cfg.ReleaseDRBDMinors(instance.name)
5149 for idx, dev in enumerate(instance.disks):
5150 # we have new devices, shutdown the drbd on the old secondary
5151 info("shutting down drbd for disk/%d on old node" % idx)
5152 cfg.SetDiskID(dev, old_node)
5153 result = self.rpc.call_blockdev_shutdown(old_node, dev)
5154 if result.failed or not result.data:
5155 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5156 hint="Please cleanup this device manually as soon as possible")
5158 info("detaching primary drbds from the network (=> standalone)")
5159 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5160 instance.disks)[pri_node]
5162 msg = result.RemoteFailMsg()
5164 # detaches didn't succeed (unlikely)
5165 self.cfg.ReleaseDRBDMinors(instance.name)
5166 raise errors.OpExecError("Can't detach the disks from the network on"
5167 " old node: %s" % (msg,))
5169 # if we managed to detach at least one, we update all the disks of
5170 # the instance to point to the new secondary
5171 info("updating instance configuration")
5172 for dev, _, new_logical_id in iv_names.itervalues():
5173 dev.logical_id = new_logical_id
5174 cfg.SetDiskID(dev, pri_node)
5175 cfg.Update(instance)
5177 # and now perform the drbd attach
5178 info("attaching primary drbds to new secondary (standalone => connected)")
5179 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5180 instance.disks, instance.name,
5182 for to_node, to_result in result.items():
5183 msg = to_result.RemoteFailMsg()
5185 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5186 hint="please do a gnt-instance info to see the"
5189 # this can fail as the old devices are degraded and _WaitForSync
5190 # does a combined result over all disks, so we don't check its
5192 self.proc.LogStep(5, steps_total, "sync devices")
5193 _WaitForSync(self, instance, unlock=True)
5195 # so check manually all the devices
5196 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5197 cfg.SetDiskID(dev, pri_node)
5198 result = self.rpc.call_blockdev_find(pri_node, dev)
5201 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5203 self.proc.LogStep(6, steps_total, "removing old storage")
5204 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5205 info("remove logical volumes for disk/%d" % idx)
5207 cfg.SetDiskID(lv, old_node)
5208 result = self.rpc.call_blockdev_remove(old_node, lv)
5209 if result.failed or not result.data:
5210 warning("Can't remove LV on old secondary",
5211 hint="Cleanup stale volumes by hand")
5213 def Exec(self, feedback_fn):
5214 """Execute disk replacement.
5216 This dispatches the disk replacement to the appropriate handler.
5219 instance = self.instance
5221 # Activate the instance disks if we're replacing them on a down instance
5222 if not instance.admin_up:
5223 _StartInstanceDisks(self, instance, True)
5225 if self.op.mode == constants.REPLACE_DISK_CHG:
5226 fn = self._ExecD8Secondary
5228 fn = self._ExecD8DiskOnly
5230 ret = fn(feedback_fn)
5232 # Deactivate the instance disks if we're replacing them on a down instance
5233 if not instance.admin_up:
5234 _SafeShutdownInstanceDisks(self, instance)
5239 class LUGrowDisk(LogicalUnit):
5240 """Grow a disk of an instance.
5244 HTYPE = constants.HTYPE_INSTANCE
5245 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5248 def ExpandNames(self):
5249 self._ExpandAndLockInstance()
5250 self.needed_locks[locking.LEVEL_NODE] = []
5251 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5253 def DeclareLocks(self, level):
5254 if level == locking.LEVEL_NODE:
5255 self._LockInstancesNodes()
5257 def BuildHooksEnv(self):
5260 This runs on the master, the primary and all the secondaries.
5264 "DISK": self.op.disk,
5265 "AMOUNT": self.op.amount,
5267 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5269 self.cfg.GetMasterNode(),
5270 self.instance.primary_node,
5274 def CheckPrereq(self):
5275 """Check prerequisites.
5277 This checks that the instance is in the cluster.
5280 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5281 assert instance is not None, \
5282 "Cannot retrieve locked instance %s" % self.op.instance_name
5283 nodenames = list(instance.all_nodes)
5284 for node in nodenames:
5285 _CheckNodeOnline(self, node)
5288 self.instance = instance
5290 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5291 raise errors.OpPrereqError("Instance's disk layout does not support"
5294 self.disk = instance.FindDisk(self.op.disk)
5296 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5297 instance.hypervisor)
5298 for node in nodenames:
5299 info = nodeinfo[node]
5300 if info.failed or not info.data:
5301 raise errors.OpPrereqError("Cannot get current information"
5302 " from node '%s'" % node)
5303 vg_free = info.data.get('vg_free', None)
5304 if not isinstance(vg_free, int):
5305 raise errors.OpPrereqError("Can't compute free disk space on"
5307 if self.op.amount > vg_free:
5308 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5309 " %d MiB available, %d MiB required" %
5310 (node, vg_free, self.op.amount))
5312 def Exec(self, feedback_fn):
5313 """Execute disk grow.
5316 instance = self.instance
5318 for node in instance.all_nodes:
5319 self.cfg.SetDiskID(disk, node)
5320 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5322 if (not result.data or not isinstance(result.data, (list, tuple)) or
5323 len(result.data) != 2):
5324 raise errors.OpExecError("Grow request failed to node %s" % node)
5325 elif not result.data[0]:
5326 raise errors.OpExecError("Grow request failed to node %s: %s" %
5327 (node, result.data[1]))
5328 disk.RecordGrow(self.op.amount)
5329 self.cfg.Update(instance)
5330 if self.op.wait_for_sync:
5331 disk_abort = not _WaitForSync(self, instance)
5333 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5334 " status.\nPlease check the instance.")
5337 class LUQueryInstanceData(NoHooksLU):
5338 """Query runtime instance data.
5341 _OP_REQP = ["instances", "static"]
5344 def ExpandNames(self):
5345 self.needed_locks = {}
5346 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5348 if not isinstance(self.op.instances, list):
5349 raise errors.OpPrereqError("Invalid argument type 'instances'")
5351 if self.op.instances:
5352 self.wanted_names = []
5353 for name in self.op.instances:
5354 full_name = self.cfg.ExpandInstanceName(name)
5355 if full_name is None:
5356 raise errors.OpPrereqError("Instance '%s' not known" % name)
5357 self.wanted_names.append(full_name)
5358 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5360 self.wanted_names = None
5361 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5363 self.needed_locks[locking.LEVEL_NODE] = []
5364 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5366 def DeclareLocks(self, level):
5367 if level == locking.LEVEL_NODE:
5368 self._LockInstancesNodes()
5370 def CheckPrereq(self):
5371 """Check prerequisites.
5373 This only checks the optional instance list against the existing names.
5376 if self.wanted_names is None:
5377 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5379 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5380 in self.wanted_names]
5383 def _ComputeDiskStatus(self, instance, snode, dev):
5384 """Compute block device status.
5387 static = self.op.static
5389 self.cfg.SetDiskID(dev, instance.primary_node)
5390 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5392 dev_pstatus = dev_pstatus.data
5396 if dev.dev_type in constants.LDS_DRBD:
5397 # we change the snode then (otherwise we use the one passed in)
5398 if dev.logical_id[0] == instance.primary_node:
5399 snode = dev.logical_id[1]
5401 snode = dev.logical_id[0]
5403 if snode and not static:
5404 self.cfg.SetDiskID(dev, snode)
5405 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5407 dev_sstatus = dev_sstatus.data
5412 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5413 for child in dev.children]
5418 "iv_name": dev.iv_name,
5419 "dev_type": dev.dev_type,
5420 "logical_id": dev.logical_id,
5421 "physical_id": dev.physical_id,
5422 "pstatus": dev_pstatus,
5423 "sstatus": dev_sstatus,
5424 "children": dev_children,
5430 def Exec(self, feedback_fn):
5431 """Gather and return data"""
5434 cluster = self.cfg.GetClusterInfo()
5436 for instance in self.wanted_instances:
5437 if not self.op.static:
5438 remote_info = self.rpc.call_instance_info(instance.primary_node,
5440 instance.hypervisor)
5442 remote_info = remote_info.data
5443 if remote_info and "state" in remote_info:
5446 remote_state = "down"
5449 if instance.admin_up:
5452 config_state = "down"
5454 disks = [self._ComputeDiskStatus(instance, None, device)
5455 for device in instance.disks]
5458 "name": instance.name,
5459 "config_state": config_state,
5460 "run_state": remote_state,
5461 "pnode": instance.primary_node,
5462 "snodes": instance.secondary_nodes,
5464 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5466 "hypervisor": instance.hypervisor,
5467 "network_port": instance.network_port,
5468 "hv_instance": instance.hvparams,
5469 "hv_actual": cluster.FillHV(instance),
5470 "be_instance": instance.beparams,
5471 "be_actual": cluster.FillBE(instance),
5474 result[instance.name] = idict
5479 class LUSetInstanceParams(LogicalUnit):
5480 """Modifies an instances's parameters.
5483 HPATH = "instance-modify"
5484 HTYPE = constants.HTYPE_INSTANCE
5485 _OP_REQP = ["instance_name"]
5488 def CheckArguments(self):
5489 if not hasattr(self.op, 'nics'):
5491 if not hasattr(self.op, 'disks'):
5493 if not hasattr(self.op, 'beparams'):
5494 self.op.beparams = {}
5495 if not hasattr(self.op, 'hvparams'):
5496 self.op.hvparams = {}
5497 self.op.force = getattr(self.op, "force", False)
5498 if not (self.op.nics or self.op.disks or
5499 self.op.hvparams or self.op.beparams):
5500 raise errors.OpPrereqError("No changes submitted")
5502 utils.CheckBEParams(self.op.beparams)
5506 for disk_op, disk_dict in self.op.disks:
5507 if disk_op == constants.DDM_REMOVE:
5510 elif disk_op == constants.DDM_ADD:
5513 if not isinstance(disk_op, int):
5514 raise errors.OpPrereqError("Invalid disk index")
5515 if disk_op == constants.DDM_ADD:
5516 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5517 if mode not in constants.DISK_ACCESS_SET:
5518 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5519 size = disk_dict.get('size', None)
5521 raise errors.OpPrereqError("Required disk parameter size missing")
5524 except ValueError, err:
5525 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5527 disk_dict['size'] = size
5529 # modification of disk
5530 if 'size' in disk_dict:
5531 raise errors.OpPrereqError("Disk size change not possible, use"
5534 if disk_addremove > 1:
5535 raise errors.OpPrereqError("Only one disk add or remove operation"
5536 " supported at a time")
5540 for nic_op, nic_dict in self.op.nics:
5541 if nic_op == constants.DDM_REMOVE:
5544 elif nic_op == constants.DDM_ADD:
5547 if not isinstance(nic_op, int):
5548 raise errors.OpPrereqError("Invalid nic index")
5550 # nic_dict should be a dict
5551 nic_ip = nic_dict.get('ip', None)
5552 if nic_ip is not None:
5553 if nic_ip.lower() == "none":
5554 nic_dict['ip'] = None
5556 if not utils.IsValidIP(nic_ip):
5557 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5558 # we can only check None bridges and assign the default one
5559 nic_bridge = nic_dict.get('bridge', None)
5560 if nic_bridge is None:
5561 nic_dict['bridge'] = self.cfg.GetDefBridge()
5562 # but we can validate MACs
5563 nic_mac = nic_dict.get('mac', None)
5564 if nic_mac is not None:
5565 if self.cfg.IsMacInUse(nic_mac):
5566 raise errors.OpPrereqError("MAC address %s already in use"
5567 " in cluster" % nic_mac)
5568 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5569 if not utils.IsValidMac(nic_mac):
5570 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5571 if nic_addremove > 1:
5572 raise errors.OpPrereqError("Only one NIC add or remove operation"
5573 " supported at a time")
5575 def ExpandNames(self):
5576 self._ExpandAndLockInstance()
5577 self.needed_locks[locking.LEVEL_NODE] = []
5578 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5580 def DeclareLocks(self, level):
5581 if level == locking.LEVEL_NODE:
5582 self._LockInstancesNodes()
5584 def BuildHooksEnv(self):
5587 This runs on the master, primary and secondaries.
5591 if constants.BE_MEMORY in self.be_new:
5592 args['memory'] = self.be_new[constants.BE_MEMORY]
5593 if constants.BE_VCPUS in self.be_new:
5594 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5595 # FIXME: readd disk/nic changes
5596 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5597 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5600 def CheckPrereq(self):
5601 """Check prerequisites.
5603 This only checks the instance list against the existing names.
5606 force = self.force = self.op.force
5608 # checking the new params on the primary/secondary nodes
5610 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5611 assert self.instance is not None, \
5612 "Cannot retrieve locked instance %s" % self.op.instance_name
5613 pnode = instance.primary_node
5614 nodelist = list(instance.all_nodes)
5616 # hvparams processing
5617 if self.op.hvparams:
5618 i_hvdict = copy.deepcopy(instance.hvparams)
5619 for key, val in self.op.hvparams.iteritems():
5620 if val == constants.VALUE_DEFAULT:
5625 elif val == constants.VALUE_NONE:
5626 i_hvdict[key] = None
5629 cluster = self.cfg.GetClusterInfo()
5630 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5633 hypervisor.GetHypervisor(
5634 instance.hypervisor).CheckParameterSyntax(hv_new)
5635 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5636 self.hv_new = hv_new # the new actual values
5637 self.hv_inst = i_hvdict # the new dict (without defaults)
5639 self.hv_new = self.hv_inst = {}
5641 # beparams processing
5642 if self.op.beparams:
5643 i_bedict = copy.deepcopy(instance.beparams)
5644 for key, val in self.op.beparams.iteritems():
5645 if val == constants.VALUE_DEFAULT:
5652 cluster = self.cfg.GetClusterInfo()
5653 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5655 self.be_new = be_new # the new actual values
5656 self.be_inst = i_bedict # the new dict (without defaults)
5658 self.be_new = self.be_inst = {}
5662 if constants.BE_MEMORY in self.op.beparams and not self.force:
5663 mem_check_list = [pnode]
5664 if be_new[constants.BE_AUTO_BALANCE]:
5665 # either we changed auto_balance to yes or it was from before
5666 mem_check_list.extend(instance.secondary_nodes)
5667 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5668 instance.hypervisor)
5669 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5670 instance.hypervisor)
5671 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5672 # Assume the primary node is unreachable and go ahead
5673 self.warn.append("Can't get info from primary node %s" % pnode)
5675 if not instance_info.failed and instance_info.data:
5676 current_mem = instance_info.data['memory']
5678 # Assume instance not running
5679 # (there is a slight race condition here, but it's not very probable,
5680 # and we have no other way to check)
5682 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5683 nodeinfo[pnode].data['memory_free'])
5685 raise errors.OpPrereqError("This change will prevent the instance"
5686 " from starting, due to %d MB of memory"
5687 " missing on its primary node" % miss_mem)
5689 if be_new[constants.BE_AUTO_BALANCE]:
5690 for node, nres in nodeinfo.iteritems():
5691 if node not in instance.secondary_nodes:
5693 if nres.failed or not isinstance(nres.data, dict):
5694 self.warn.append("Can't get info from secondary node %s" % node)
5695 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5696 self.warn.append("Not enough memory to failover instance to"
5697 " secondary node %s" % node)
5700 for nic_op, nic_dict in self.op.nics:
5701 if nic_op == constants.DDM_REMOVE:
5702 if not instance.nics:
5703 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5705 if nic_op != constants.DDM_ADD:
5707 if nic_op < 0 or nic_op >= len(instance.nics):
5708 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5710 (nic_op, len(instance.nics)))
5711 nic_bridge = nic_dict.get('bridge', None)
5712 if nic_bridge is not None:
5713 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5714 msg = ("Bridge '%s' doesn't exist on one of"
5715 " the instance nodes" % nic_bridge)
5717 self.warn.append(msg)
5719 raise errors.OpPrereqError(msg)
5722 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5723 raise errors.OpPrereqError("Disk operations not supported for"
5724 " diskless instances")
5725 for disk_op, disk_dict in self.op.disks:
5726 if disk_op == constants.DDM_REMOVE:
5727 if len(instance.disks) == 1:
5728 raise errors.OpPrereqError("Cannot remove the last disk of"
5730 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5731 ins_l = ins_l[pnode]
5732 if ins_l.failed or not isinstance(ins_l.data, list):
5733 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5734 if instance.name in ins_l.data:
5735 raise errors.OpPrereqError("Instance is running, can't remove"
5738 if (disk_op == constants.DDM_ADD and
5739 len(instance.nics) >= constants.MAX_DISKS):
5740 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5741 " add more" % constants.MAX_DISKS)
5742 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5744 if disk_op < 0 or disk_op >= len(instance.disks):
5745 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5747 (disk_op, len(instance.disks)))
5751 def Exec(self, feedback_fn):
5752 """Modifies an instance.
5754 All parameters take effect only at the next restart of the instance.
5757 # Process here the warnings from CheckPrereq, as we don't have a
5758 # feedback_fn there.
5759 for warn in self.warn:
5760 feedback_fn("WARNING: %s" % warn)
5763 instance = self.instance
5765 for disk_op, disk_dict in self.op.disks:
5766 if disk_op == constants.DDM_REMOVE:
5767 # remove the last disk
5768 device = instance.disks.pop()
5769 device_idx = len(instance.disks)
5770 for node, disk in device.ComputeNodeTree(instance.primary_node):
5771 self.cfg.SetDiskID(disk, node)
5772 rpc_result = self.rpc.call_blockdev_remove(node, disk)
5773 if rpc_result.failed or not rpc_result.data:
5774 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5775 " continuing anyway", device_idx, node)
5776 result.append(("disk/%d" % device_idx, "remove"))
5777 elif disk_op == constants.DDM_ADD:
5779 if instance.disk_template == constants.DT_FILE:
5780 file_driver, file_path = instance.disks[0].logical_id
5781 file_path = os.path.dirname(file_path)
5783 file_driver = file_path = None
5784 disk_idx_base = len(instance.disks)
5785 new_disk = _GenerateDiskTemplate(self,
5786 instance.disk_template,
5787 instance.name, instance.primary_node,
5788 instance.secondary_nodes,
5793 instance.disks.append(new_disk)
5794 info = _GetInstanceInfoText(instance)
5796 logging.info("Creating volume %s for instance %s",
5797 new_disk.iv_name, instance.name)
5798 # Note: this needs to be kept in sync with _CreateDisks
5800 for node in instance.all_nodes:
5801 f_create = node == instance.primary_node
5803 _CreateBlockDev(self, node, instance, new_disk,
5804 f_create, info, f_create)
5805 except errors.OpExecError, err:
5806 self.LogWarning("Failed to create volume %s (%s) on"
5808 new_disk.iv_name, new_disk, node, err)
5809 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5810 (new_disk.size, new_disk.mode)))
5812 # change a given disk
5813 instance.disks[disk_op].mode = disk_dict['mode']
5814 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5816 for nic_op, nic_dict in self.op.nics:
5817 if nic_op == constants.DDM_REMOVE:
5818 # remove the last nic
5819 del instance.nics[-1]
5820 result.append(("nic.%d" % len(instance.nics), "remove"))
5821 elif nic_op == constants.DDM_ADD:
5823 if 'mac' not in nic_dict:
5824 mac = constants.VALUE_GENERATE
5826 mac = nic_dict['mac']
5827 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5828 mac = self.cfg.GenerateMAC()
5829 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5830 bridge=nic_dict.get('bridge', None))
5831 instance.nics.append(new_nic)
5832 result.append(("nic.%d" % (len(instance.nics) - 1),
5833 "add:mac=%s,ip=%s,bridge=%s" %
5834 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5836 # change a given nic
5837 for key in 'mac', 'ip', 'bridge':
5839 setattr(instance.nics[nic_op], key, nic_dict[key])
5840 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5843 if self.op.hvparams:
5844 instance.hvparams = self.hv_new
5845 for key, val in self.op.hvparams.iteritems():
5846 result.append(("hv/%s" % key, val))
5849 if self.op.beparams:
5850 instance.beparams = self.be_inst
5851 for key, val in self.op.beparams.iteritems():
5852 result.append(("be/%s" % key, val))
5854 self.cfg.Update(instance)
5859 class LUQueryExports(NoHooksLU):
5860 """Query the exports list
5863 _OP_REQP = ['nodes']
5866 def ExpandNames(self):
5867 self.needed_locks = {}
5868 self.share_locks[locking.LEVEL_NODE] = 1
5869 if not self.op.nodes:
5870 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5872 self.needed_locks[locking.LEVEL_NODE] = \
5873 _GetWantedNodes(self, self.op.nodes)
5875 def CheckPrereq(self):
5876 """Check prerequisites.
5879 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5881 def Exec(self, feedback_fn):
5882 """Compute the list of all the exported system images.
5885 @return: a dictionary with the structure node->(export-list)
5886 where export-list is a list of the instances exported on
5890 rpcresult = self.rpc.call_export_list(self.nodes)
5892 for node in rpcresult:
5893 if rpcresult[node].failed:
5894 result[node] = False
5896 result[node] = rpcresult[node].data
5901 class LUExportInstance(LogicalUnit):
5902 """Export an instance to an image in the cluster.
5905 HPATH = "instance-export"
5906 HTYPE = constants.HTYPE_INSTANCE
5907 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5910 def ExpandNames(self):
5911 self._ExpandAndLockInstance()
5912 # FIXME: lock only instance primary and destination node
5914 # Sad but true, for now we have do lock all nodes, as we don't know where
5915 # the previous export might be, and and in this LU we search for it and
5916 # remove it from its current node. In the future we could fix this by:
5917 # - making a tasklet to search (share-lock all), then create the new one,
5918 # then one to remove, after
5919 # - removing the removal operation altoghether
5920 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5922 def DeclareLocks(self, level):
5923 """Last minute lock declaration."""
5924 # All nodes are locked anyway, so nothing to do here.
5926 def BuildHooksEnv(self):
5929 This will run on the master, primary node and target node.
5933 "EXPORT_NODE": self.op.target_node,
5934 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5936 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5937 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5938 self.op.target_node]
5941 def CheckPrereq(self):
5942 """Check prerequisites.
5944 This checks that the instance and node names are valid.
5947 instance_name = self.op.instance_name
5948 self.instance = self.cfg.GetInstanceInfo(instance_name)
5949 assert self.instance is not None, \
5950 "Cannot retrieve locked instance %s" % self.op.instance_name
5951 _CheckNodeOnline(self, self.instance.primary_node)
5953 self.dst_node = self.cfg.GetNodeInfo(
5954 self.cfg.ExpandNodeName(self.op.target_node))
5956 if self.dst_node is None:
5957 # This is wrong node name, not a non-locked node
5958 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5959 _CheckNodeOnline(self, self.dst_node.name)
5961 # instance disk type verification
5962 for disk in self.instance.disks:
5963 if disk.dev_type == constants.LD_FILE:
5964 raise errors.OpPrereqError("Export not supported for instances with"
5965 " file-based disks")
5967 def Exec(self, feedback_fn):
5968 """Export an instance to an image in the cluster.
5971 instance = self.instance
5972 dst_node = self.dst_node
5973 src_node = instance.primary_node
5974 if self.op.shutdown:
5975 # shutdown the instance, but not the disks
5976 result = self.rpc.call_instance_shutdown(src_node, instance)
5979 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5980 (instance.name, src_node))
5982 vgname = self.cfg.GetVGName()
5986 # set the disks ID correctly since call_instance_start needs the
5987 # correct drbd minor to create the symlinks
5988 for disk in instance.disks:
5989 self.cfg.SetDiskID(disk, src_node)
5992 for disk in instance.disks:
5993 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5994 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5995 if new_dev_name.failed or not new_dev_name.data:
5996 self.LogWarning("Could not snapshot block device %s on node %s",
5997 disk.logical_id[1], src_node)
5998 snap_disks.append(False)
6000 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6001 logical_id=(vgname, new_dev_name.data),
6002 physical_id=(vgname, new_dev_name.data),
6003 iv_name=disk.iv_name)
6004 snap_disks.append(new_dev)
6007 if self.op.shutdown and instance.admin_up:
6008 result = self.rpc.call_instance_start(src_node, instance, None)
6009 msg = result.RemoteFailMsg()
6011 _ShutdownInstanceDisks(self, instance)
6012 raise errors.OpExecError("Could not start instance: %s" % msg)
6014 # TODO: check for size
6016 cluster_name = self.cfg.GetClusterName()
6017 for idx, dev in enumerate(snap_disks):
6019 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6020 instance, cluster_name, idx)
6021 if result.failed or not result.data:
6022 self.LogWarning("Could not export block device %s from node %s to"
6023 " node %s", dev.logical_id[1], src_node,
6025 result = self.rpc.call_blockdev_remove(src_node, dev)
6026 if result.failed or not result.data:
6027 self.LogWarning("Could not remove snapshot block device %s from node"
6028 " %s", dev.logical_id[1], src_node)
6030 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6031 if result.failed or not result.data:
6032 self.LogWarning("Could not finalize export for instance %s on node %s",
6033 instance.name, dst_node.name)
6035 nodelist = self.cfg.GetNodeList()
6036 nodelist.remove(dst_node.name)
6038 # on one-node clusters nodelist will be empty after the removal
6039 # if we proceed the backup would be removed because OpQueryExports
6040 # substitutes an empty list with the full cluster node list.
6042 exportlist = self.rpc.call_export_list(nodelist)
6043 for node in exportlist:
6044 if exportlist[node].failed:
6046 if instance.name in exportlist[node].data:
6047 if not self.rpc.call_export_remove(node, instance.name):
6048 self.LogWarning("Could not remove older export for instance %s"
6049 " on node %s", instance.name, node)
6052 class LURemoveExport(NoHooksLU):
6053 """Remove exports related to the named instance.
6056 _OP_REQP = ["instance_name"]
6059 def ExpandNames(self):
6060 self.needed_locks = {}
6061 # We need all nodes to be locked in order for RemoveExport to work, but we
6062 # don't need to lock the instance itself, as nothing will happen to it (and
6063 # we can remove exports also for a removed instance)
6064 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6066 def CheckPrereq(self):
6067 """Check prerequisites.
6071 def Exec(self, feedback_fn):
6072 """Remove any export.
6075 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6076 # If the instance was not found we'll try with the name that was passed in.
6077 # This will only work if it was an FQDN, though.
6079 if not instance_name:
6081 instance_name = self.op.instance_name
6083 exportlist = self.rpc.call_export_list(self.acquired_locks[
6084 locking.LEVEL_NODE])
6086 for node in exportlist:
6087 if exportlist[node].failed:
6088 self.LogWarning("Failed to query node %s, continuing" % node)
6090 if instance_name in exportlist[node].data:
6092 result = self.rpc.call_export_remove(node, instance_name)
6093 if result.failed or not result.data:
6094 logging.error("Could not remove export for instance %s"
6095 " on node %s", instance_name, node)
6097 if fqdn_warn and not found:
6098 feedback_fn("Export not found. If trying to remove an export belonging"
6099 " to a deleted instance please use its Fully Qualified"
6103 class TagsLU(NoHooksLU):
6106 This is an abstract class which is the parent of all the other tags LUs.
6110 def ExpandNames(self):
6111 self.needed_locks = {}
6112 if self.op.kind == constants.TAG_NODE:
6113 name = self.cfg.ExpandNodeName(self.op.name)
6115 raise errors.OpPrereqError("Invalid node name (%s)" %
6118 self.needed_locks[locking.LEVEL_NODE] = name
6119 elif self.op.kind == constants.TAG_INSTANCE:
6120 name = self.cfg.ExpandInstanceName(self.op.name)
6122 raise errors.OpPrereqError("Invalid instance name (%s)" %
6125 self.needed_locks[locking.LEVEL_INSTANCE] = name
6127 def CheckPrereq(self):
6128 """Check prerequisites.
6131 if self.op.kind == constants.TAG_CLUSTER:
6132 self.target = self.cfg.GetClusterInfo()
6133 elif self.op.kind == constants.TAG_NODE:
6134 self.target = self.cfg.GetNodeInfo(self.op.name)
6135 elif self.op.kind == constants.TAG_INSTANCE:
6136 self.target = self.cfg.GetInstanceInfo(self.op.name)
6138 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6142 class LUGetTags(TagsLU):
6143 """Returns the tags of a given object.
6146 _OP_REQP = ["kind", "name"]
6149 def Exec(self, feedback_fn):
6150 """Returns the tag list.
6153 return list(self.target.GetTags())
6156 class LUSearchTags(NoHooksLU):
6157 """Searches the tags for a given pattern.
6160 _OP_REQP = ["pattern"]
6163 def ExpandNames(self):
6164 self.needed_locks = {}
6166 def CheckPrereq(self):
6167 """Check prerequisites.
6169 This checks the pattern passed for validity by compiling it.
6173 self.re = re.compile(self.op.pattern)
6174 except re.error, err:
6175 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6176 (self.op.pattern, err))
6178 def Exec(self, feedback_fn):
6179 """Returns the tag list.
6183 tgts = [("/cluster", cfg.GetClusterInfo())]
6184 ilist = cfg.GetAllInstancesInfo().values()
6185 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6186 nlist = cfg.GetAllNodesInfo().values()
6187 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6189 for path, target in tgts:
6190 for tag in target.GetTags():
6191 if self.re.search(tag):
6192 results.append((path, tag))
6196 class LUAddTags(TagsLU):
6197 """Sets a tag on a given object.
6200 _OP_REQP = ["kind", "name", "tags"]
6203 def CheckPrereq(self):
6204 """Check prerequisites.
6206 This checks the type and length of the tag name and value.
6209 TagsLU.CheckPrereq(self)
6210 for tag in self.op.tags:
6211 objects.TaggableObject.ValidateTag(tag)
6213 def Exec(self, feedback_fn):
6218 for tag in self.op.tags:
6219 self.target.AddTag(tag)
6220 except errors.TagError, err:
6221 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6223 self.cfg.Update(self.target)
6224 except errors.ConfigurationError:
6225 raise errors.OpRetryError("There has been a modification to the"
6226 " config file and the operation has been"
6227 " aborted. Please retry.")
6230 class LUDelTags(TagsLU):
6231 """Delete a list of tags from a given object.
6234 _OP_REQP = ["kind", "name", "tags"]
6237 def CheckPrereq(self):
6238 """Check prerequisites.
6240 This checks that we have the given tag.
6243 TagsLU.CheckPrereq(self)
6244 for tag in self.op.tags:
6245 objects.TaggableObject.ValidateTag(tag)
6246 del_tags = frozenset(self.op.tags)
6247 cur_tags = self.target.GetTags()
6248 if not del_tags <= cur_tags:
6249 diff_tags = del_tags - cur_tags
6250 diff_names = ["'%s'" % tag for tag in diff_tags]
6252 raise errors.OpPrereqError("Tag(s) %s not found" %
6253 (",".join(diff_names)))
6255 def Exec(self, feedback_fn):
6256 """Remove the tag from the object.
6259 for tag in self.op.tags:
6260 self.target.RemoveTag(tag)
6262 self.cfg.Update(self.target)
6263 except errors.ConfigurationError:
6264 raise errors.OpRetryError("There has been a modification to the"
6265 " config file and the operation has been"
6266 " aborted. Please retry.")
6269 class LUTestDelay(NoHooksLU):
6270 """Sleep for a specified amount of time.
6272 This LU sleeps on the master and/or nodes for a specified amount of
6276 _OP_REQP = ["duration", "on_master", "on_nodes"]
6279 def ExpandNames(self):
6280 """Expand names and set required locks.
6282 This expands the node list, if any.
6285 self.needed_locks = {}
6286 if self.op.on_nodes:
6287 # _GetWantedNodes can be used here, but is not always appropriate to use
6288 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6290 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6291 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6293 def CheckPrereq(self):
6294 """Check prerequisites.
6298 def Exec(self, feedback_fn):
6299 """Do the actual sleep.
6302 if self.op.on_master:
6303 if not utils.TestDelay(self.op.duration):
6304 raise errors.OpExecError("Error during master delay test")
6305 if self.op.on_nodes:
6306 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6308 raise errors.OpExecError("Complete failure from rpc call")
6309 for node, node_result in result.items():
6311 if not node_result.data:
6312 raise errors.OpExecError("Failure during rpc call to node %s,"
6313 " result: %s" % (node, node_result.data))
6316 class IAllocator(object):
6317 """IAllocator framework.
6319 An IAllocator instance has three sets of attributes:
6320 - cfg that is needed to query the cluster
6321 - input data (all members of the _KEYS class attribute are required)
6322 - four buffer attributes (in|out_data|text), that represent the
6323 input (to the external script) in text and data structure format,
6324 and the output from it, again in two formats
6325 - the result variables from the script (success, info, nodes) for
6330 "mem_size", "disks", "disk_template",
6331 "os", "tags", "nics", "vcpus", "hypervisor",
6337 def __init__(self, lu, mode, name, **kwargs):
6339 # init buffer variables
6340 self.in_text = self.out_text = self.in_data = self.out_data = None
6341 # init all input fields so that pylint is happy
6344 self.mem_size = self.disks = self.disk_template = None
6345 self.os = self.tags = self.nics = self.vcpus = None
6346 self.hypervisor = None
6347 self.relocate_from = None
6349 self.required_nodes = None
6350 # init result fields
6351 self.success = self.info = self.nodes = None
6352 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6353 keyset = self._ALLO_KEYS
6354 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6355 keyset = self._RELO_KEYS
6357 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6358 " IAllocator" % self.mode)
6360 if key not in keyset:
6361 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6362 " IAllocator" % key)
6363 setattr(self, key, kwargs[key])
6365 if key not in kwargs:
6366 raise errors.ProgrammerError("Missing input parameter '%s' to"
6367 " IAllocator" % key)
6368 self._BuildInputData()
6370 def _ComputeClusterData(self):
6371 """Compute the generic allocator input data.
6373 This is the data that is independent of the actual operation.
6377 cluster_info = cfg.GetClusterInfo()
6381 "cluster_name": cfg.GetClusterName(),
6382 "cluster_tags": list(cluster_info.GetTags()),
6383 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6384 # we don't have job IDs
6386 iinfo = cfg.GetAllInstancesInfo().values()
6387 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6391 node_list = cfg.GetNodeList()
6393 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6394 hypervisor_name = self.hypervisor
6395 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6396 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6398 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6400 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6401 cluster_info.enabled_hypervisors)
6402 for nname, nresult in node_data.items():
6403 # first fill in static (config-based) values
6404 ninfo = cfg.GetNodeInfo(nname)
6406 "tags": list(ninfo.GetTags()),
6407 "primary_ip": ninfo.primary_ip,
6408 "secondary_ip": ninfo.secondary_ip,
6409 "offline": ninfo.offline,
6410 "master_candidate": ninfo.master_candidate,
6413 if not ninfo.offline:
6415 if not isinstance(nresult.data, dict):
6416 raise errors.OpExecError("Can't get data for node %s" % nname)
6417 remote_info = nresult.data
6418 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6419 'vg_size', 'vg_free', 'cpu_total']:
6420 if attr not in remote_info:
6421 raise errors.OpExecError("Node '%s' didn't return attribute"
6422 " '%s'" % (nname, attr))
6424 remote_info[attr] = int(remote_info[attr])
6425 except ValueError, err:
6426 raise errors.OpExecError("Node '%s' returned invalid value"
6427 " for '%s': %s" % (nname, attr, err))
6428 # compute memory used by primary instances
6429 i_p_mem = i_p_up_mem = 0
6430 for iinfo, beinfo in i_list:
6431 if iinfo.primary_node == nname:
6432 i_p_mem += beinfo[constants.BE_MEMORY]
6433 if iinfo.name not in node_iinfo[nname].data:
6436 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6437 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6438 remote_info['memory_free'] -= max(0, i_mem_diff)
6441 i_p_up_mem += beinfo[constants.BE_MEMORY]
6443 # compute memory used by instances
6445 "total_memory": remote_info['memory_total'],
6446 "reserved_memory": remote_info['memory_dom0'],
6447 "free_memory": remote_info['memory_free'],
6448 "total_disk": remote_info['vg_size'],
6449 "free_disk": remote_info['vg_free'],
6450 "total_cpus": remote_info['cpu_total'],
6451 "i_pri_memory": i_p_mem,
6452 "i_pri_up_memory": i_p_up_mem,
6456 node_results[nname] = pnr
6457 data["nodes"] = node_results
6461 for iinfo, beinfo in i_list:
6462 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6463 for n in iinfo.nics]
6465 "tags": list(iinfo.GetTags()),
6466 "admin_up": iinfo.admin_up,
6467 "vcpus": beinfo[constants.BE_VCPUS],
6468 "memory": beinfo[constants.BE_MEMORY],
6470 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6472 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6473 "disk_template": iinfo.disk_template,
6474 "hypervisor": iinfo.hypervisor,
6476 instance_data[iinfo.name] = pir
6478 data["instances"] = instance_data
6482 def _AddNewInstance(self):
6483 """Add new instance data to allocator structure.
6485 This in combination with _AllocatorGetClusterData will create the
6486 correct structure needed as input for the allocator.
6488 The checks for the completeness of the opcode must have already been
6494 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6496 if self.disk_template in constants.DTS_NET_MIRROR:
6497 self.required_nodes = 2
6499 self.required_nodes = 1
6503 "disk_template": self.disk_template,
6506 "vcpus": self.vcpus,
6507 "memory": self.mem_size,
6508 "disks": self.disks,
6509 "disk_space_total": disk_space,
6511 "required_nodes": self.required_nodes,
6513 data["request"] = request
6515 def _AddRelocateInstance(self):
6516 """Add relocate instance data to allocator structure.
6518 This in combination with _IAllocatorGetClusterData will create the
6519 correct structure needed as input for the allocator.
6521 The checks for the completeness of the opcode must have already been
6525 instance = self.lu.cfg.GetInstanceInfo(self.name)
6526 if instance is None:
6527 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6528 " IAllocator" % self.name)
6530 if instance.disk_template not in constants.DTS_NET_MIRROR:
6531 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6533 if len(instance.secondary_nodes) != 1:
6534 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6536 self.required_nodes = 1
6537 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6538 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6543 "disk_space_total": disk_space,
6544 "required_nodes": self.required_nodes,
6545 "relocate_from": self.relocate_from,
6547 self.in_data["request"] = request
6549 def _BuildInputData(self):
6550 """Build input data structures.
6553 self._ComputeClusterData()
6555 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6556 self._AddNewInstance()
6558 self._AddRelocateInstance()
6560 self.in_text = serializer.Dump(self.in_data)
6562 def Run(self, name, validate=True, call_fn=None):
6563 """Run an instance allocator and return the results.
6567 call_fn = self.lu.rpc.call_iallocator_runner
6570 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6573 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6574 raise errors.OpExecError("Invalid result from master iallocator runner")
6576 rcode, stdout, stderr, fail = result.data
6578 if rcode == constants.IARUN_NOTFOUND:
6579 raise errors.OpExecError("Can't find allocator '%s'" % name)
6580 elif rcode == constants.IARUN_FAILURE:
6581 raise errors.OpExecError("Instance allocator call failed: %s,"
6582 " output: %s" % (fail, stdout+stderr))
6583 self.out_text = stdout
6585 self._ValidateResult()
6587 def _ValidateResult(self):
6588 """Process the allocator results.
6590 This will process and if successful save the result in
6591 self.out_data and the other parameters.
6595 rdict = serializer.Load(self.out_text)
6596 except Exception, err:
6597 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6599 if not isinstance(rdict, dict):
6600 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6602 for key in "success", "info", "nodes":
6603 if key not in rdict:
6604 raise errors.OpExecError("Can't parse iallocator results:"
6605 " missing key '%s'" % key)
6606 setattr(self, key, rdict[key])
6608 if not isinstance(rdict["nodes"], list):
6609 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6611 self.out_data = rdict
6614 class LUTestAllocator(NoHooksLU):
6615 """Run allocator tests.
6617 This LU runs the allocator tests
6620 _OP_REQP = ["direction", "mode", "name"]
6622 def CheckPrereq(self):
6623 """Check prerequisites.
6625 This checks the opcode parameters depending on the director and mode test.
6628 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6629 for attr in ["name", "mem_size", "disks", "disk_template",
6630 "os", "tags", "nics", "vcpus"]:
6631 if not hasattr(self.op, attr):
6632 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6634 iname = self.cfg.ExpandInstanceName(self.op.name)
6635 if iname is not None:
6636 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6638 if not isinstance(self.op.nics, list):
6639 raise errors.OpPrereqError("Invalid parameter 'nics'")
6640 for row in self.op.nics:
6641 if (not isinstance(row, dict) or
6644 "bridge" not in row):
6645 raise errors.OpPrereqError("Invalid contents of the"
6646 " 'nics' parameter")
6647 if not isinstance(self.op.disks, list):
6648 raise errors.OpPrereqError("Invalid parameter 'disks'")
6649 for row in self.op.disks:
6650 if (not isinstance(row, dict) or
6651 "size" not in row or
6652 not isinstance(row["size"], int) or
6653 "mode" not in row or
6654 row["mode"] not in ['r', 'w']):
6655 raise errors.OpPrereqError("Invalid contents of the"
6656 " 'disks' parameter")
6657 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6658 self.op.hypervisor = self.cfg.GetHypervisorType()
6659 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6660 if not hasattr(self.op, "name"):
6661 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6662 fname = self.cfg.ExpandInstanceName(self.op.name)
6664 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6666 self.op.name = fname
6667 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6669 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6672 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6673 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6674 raise errors.OpPrereqError("Missing allocator name")
6675 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6676 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6679 def Exec(self, feedback_fn):
6680 """Run the allocator test.
6683 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6684 ial = IAllocator(self,
6687 mem_size=self.op.mem_size,
6688 disks=self.op.disks,
6689 disk_template=self.op.disk_template,
6693 vcpus=self.op.vcpus,
6694 hypervisor=self.op.hypervisor,
6697 ial = IAllocator(self,
6700 relocate_from=list(self.relocate_from),
6703 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6704 result = ial.in_text
6706 ial.Run(self.op.allocator, validate=False)
6707 result = ial.out_text