4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _CheckBooleanOpField(op, name):
419 """Validates boolean opcode parameters.
421 This will ensure that an opcode parameter is either a boolean value,
422 or None (but that it always exists).
425 val = getattr(op, name, None)
426 if not (val is None or isinstance(val, bool)):
427 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429 setattr(op, name, val)
432 def _CheckNodeOnline(lu, node):
433 """Ensure that a given node is online.
435 @param lu: the LU on behalf of which we make the check
436 @param node: the node to check
437 @raise errors.OpPrereqError: if the nodes is offline
440 if lu.cfg.GetNodeInfo(node).offline:
441 raise errors.OpPrereqError("Can't use offline node %s" % node)
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445 memory, vcpus, nics):
446 """Builds instance related env variables for hooks
448 This builds the hook environment from individual variables.
451 @param name: the name of the instance
452 @type primary_node: string
453 @param primary_node: the name of the instance's primary node
454 @type secondary_nodes: list
455 @param secondary_nodes: list of secondary nodes as strings
456 @type os_type: string
457 @param os_type: the name of the instance's OS
458 @type status: boolean
459 @param status: the should_run status of the instance
461 @param memory: the memory size of the instance
463 @param vcpus: the count of VCPUs the instance has
465 @param nics: list of tuples (ip, bridge, mac) representing
466 the NICs the instance has
468 @return: the hook environment for this instance
477 "INSTANCE_NAME": name,
478 "INSTANCE_PRIMARY": primary_node,
479 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480 "INSTANCE_OS_TYPE": os_type,
481 "INSTANCE_STATUS": str_status,
482 "INSTANCE_MEMORY": memory,
483 "INSTANCE_VCPUS": vcpus,
487 nic_count = len(nics)
488 for idx, (ip, bridge, mac) in enumerate(nics):
491 env["INSTANCE_NIC%d_IP" % idx] = ip
492 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
497 env["INSTANCE_NIC_COUNT"] = nic_count
502 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503 """Builds instance related env variables for hooks from an object.
505 @type lu: L{LogicalUnit}
506 @param lu: the logical unit on whose behalf we execute
507 @type instance: L{objects.Instance}
508 @param instance: the instance for which we should build the
511 @param override: dictionary with key/values that will override
514 @return: the hook environment dictionary
517 bep = lu.cfg.GetClusterInfo().FillBE(instance)
519 'name': instance.name,
520 'primary_node': instance.primary_node,
521 'secondary_nodes': instance.secondary_nodes,
522 'os_type': instance.os,
523 'status': instance.admin_up,
524 'memory': bep[constants.BE_MEMORY],
525 'vcpus': bep[constants.BE_VCPUS],
526 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
529 args.update(override)
530 return _BuildInstanceHookEnv(**args)
533 def _AdjustCandidatePool(lu):
534 """Adjust the candidate pool after node operations.
537 mod_list = lu.cfg.MaintainCandidatePool()
539 lu.LogInfo("Promoted nodes to master candidate role: %s",
540 ", ".join(node.name for node in mod_list))
541 for name in mod_list:
542 lu.context.ReaddNode(name)
543 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
545 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
549 def _CheckInstanceBridgesExist(lu, instance):
550 """Check that the brigdes needed by an instance exist.
553 # check bridges existance
554 brlist = [nic.bridge for nic in instance.nics]
555 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
558 raise errors.OpPrereqError("One or more target bridges %s does not"
559 " exist on destination node '%s'" %
560 (brlist, instance.primary_node))
563 class LUDestroyCluster(NoHooksLU):
564 """Logical unit for destroying the cluster.
569 def CheckPrereq(self):
570 """Check prerequisites.
572 This checks whether the cluster is empty.
574 Any errors are signalled by raising errors.OpPrereqError.
577 master = self.cfg.GetMasterNode()
579 nodelist = self.cfg.GetNodeList()
580 if len(nodelist) != 1 or nodelist[0] != master:
581 raise errors.OpPrereqError("There are still %d node(s) in"
582 " this cluster." % (len(nodelist) - 1))
583 instancelist = self.cfg.GetInstanceList()
585 raise errors.OpPrereqError("There are still %d instance(s) in"
586 " this cluster." % len(instancelist))
588 def Exec(self, feedback_fn):
589 """Destroys the cluster.
592 master = self.cfg.GetMasterNode()
593 result = self.rpc.call_node_stop_master(master, False)
596 raise errors.OpExecError("Could not disable the master role")
597 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598 utils.CreateBackup(priv_key)
599 utils.CreateBackup(pub_key)
603 class LUVerifyCluster(LogicalUnit):
604 """Verifies the cluster status.
607 HPATH = "cluster-verify"
608 HTYPE = constants.HTYPE_CLUSTER
609 _OP_REQP = ["skip_checks"]
612 def ExpandNames(self):
613 self.needed_locks = {
614 locking.LEVEL_NODE: locking.ALL_SET,
615 locking.LEVEL_INSTANCE: locking.ALL_SET,
617 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
619 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620 node_result, feedback_fn, master_files,
622 """Run multiple tests against a node.
626 - compares ganeti version
627 - checks vg existance and size > 20G
628 - checks config file checksum
629 - checks ssh to other nodes
631 @type nodeinfo: L{objects.Node}
632 @param nodeinfo: the node to check
633 @param file_list: required list of files
634 @param local_cksum: dictionary of local files and their checksums
635 @param node_result: the results from the node
636 @param feedback_fn: function used to accumulate results
637 @param master_files: list of files that only masters should have
638 @param drbd_map: the useddrbd minors for this node, in
639 form of minor: (instance, must_exist) which correspond to instances
640 and their running status
645 # main result, node_result should be a non-empty dict
646 if not node_result or not isinstance(node_result, dict):
647 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
650 # compares ganeti version
651 local_version = constants.PROTOCOL_VERSION
652 remote_version = node_result.get('version', None)
653 if not (remote_version and isinstance(remote_version, (list, tuple)) and
654 len(remote_version) == 2):
655 feedback_fn(" - ERROR: connection to %s failed" % (node))
658 if local_version != remote_version[0]:
659 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
660 " node %s %s" % (local_version, node, remote_version[0]))
663 # node seems compatible, we can actually try to look into its results
667 # full package version
668 if constants.RELEASE_VERSION != remote_version[1]:
669 feedback_fn(" - WARNING: software version mismatch: master %s,"
671 (constants.RELEASE_VERSION, node, remote_version[1]))
673 # checks vg existence and size > 20G
675 vglist = node_result.get(constants.NV_VGLIST, None)
677 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
681 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
682 constants.MIN_VG_SIZE)
684 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
687 # checks config file checksum
689 remote_cksum = node_result.get(constants.NV_FILELIST, None)
690 if not isinstance(remote_cksum, dict):
692 feedback_fn(" - ERROR: node hasn't returned file checksum data")
694 for file_name in file_list:
695 node_is_mc = nodeinfo.master_candidate
696 must_have_file = file_name not in master_files
697 if file_name not in remote_cksum:
698 if node_is_mc or must_have_file:
700 feedback_fn(" - ERROR: file '%s' missing" % file_name)
701 elif remote_cksum[file_name] != local_cksum[file_name]:
702 if node_is_mc or must_have_file:
704 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
706 # not candidate and this is not a must-have file
708 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
711 # all good, except non-master/non-must have combination
712 if not node_is_mc and not must_have_file:
713 feedback_fn(" - ERROR: file '%s' should not exist on non master"
714 " candidates" % file_name)
718 if constants.NV_NODELIST not in node_result:
720 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
722 if node_result[constants.NV_NODELIST]:
724 for node in node_result[constants.NV_NODELIST]:
725 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
726 (node, node_result[constants.NV_NODELIST][node]))
728 if constants.NV_NODENETTEST not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result[constants.NV_NODENETTEST]:
734 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result[constants.NV_NODENETTEST][node]))
739 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
740 if isinstance(hyp_result, dict):
741 for hv_name, hv_result in hyp_result.iteritems():
742 if hv_result is not None:
743 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
744 (hv_name, hv_result))
746 # check used drbd list
747 used_minors = node_result.get(constants.NV_DRBDLIST, [])
748 for minor, (iname, must_exist) in drbd_map.items():
749 if minor not in used_minors and must_exist:
750 feedback_fn(" - ERROR: drbd minor %d of instance %s is not active" %
753 for minor in used_minors:
754 if minor not in drbd_map:
755 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % minor)
760 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
761 node_instance, feedback_fn, n_offline):
762 """Verify an instance.
764 This function checks to see if the required block devices are
765 available on the instance's node.
770 node_current = instanceconfig.primary_node
773 instanceconfig.MapLVsByNode(node_vol_should)
775 for node in node_vol_should:
776 if node in n_offline:
777 # ignore missing volumes on offline nodes
779 for volume in node_vol_should[node]:
780 if node not in node_vol_is or volume not in node_vol_is[node]:
781 feedback_fn(" - ERROR: volume %s missing on node %s" %
785 if instanceconfig.admin_up:
786 if ((node_current not in node_instance or
787 not instance in node_instance[node_current]) and
788 node_current not in n_offline):
789 feedback_fn(" - ERROR: instance %s not running on node %s" %
790 (instance, node_current))
793 for node in node_instance:
794 if (not node == node_current):
795 if instance in node_instance[node]:
796 feedback_fn(" - ERROR: instance %s should not run on node %s" %
802 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
803 """Verify if there are any unknown volumes in the cluster.
805 The .os, .swap and backup volumes are ignored. All other volumes are
811 for node in node_vol_is:
812 for volume in node_vol_is[node]:
813 if node not in node_vol_should or volume not in node_vol_should[node]:
814 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
819 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
820 """Verify the list of running instances.
822 This checks what instances are running but unknown to the cluster.
826 for node in node_instance:
827 for runninginstance in node_instance[node]:
828 if runninginstance not in instancelist:
829 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
830 (runninginstance, node))
834 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
835 """Verify N+1 Memory Resilience.
837 Check that if one single node dies we can still start all the instances it
843 for node, nodeinfo in node_info.iteritems():
844 # This code checks that every node which is now listed as secondary has
845 # enough memory to host all instances it is supposed to should a single
846 # other node in the cluster fail.
847 # FIXME: not ready for failover to an arbitrary node
848 # FIXME: does not support file-backed instances
849 # WARNING: we currently take into account down instances as well as up
850 # ones, considering that even if they're down someone might want to start
851 # them even in the event of a node failure.
852 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
854 for instance in instances:
855 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
856 if bep[constants.BE_AUTO_BALANCE]:
857 needed_mem += bep[constants.BE_MEMORY]
858 if nodeinfo['mfree'] < needed_mem:
859 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
860 " failovers should node %s fail" % (node, prinode))
864 def CheckPrereq(self):
865 """Check prerequisites.
867 Transform the list of checks we're going to skip into a set and check that
868 all its members are valid.
871 self.skip_set = frozenset(self.op.skip_checks)
872 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
873 raise errors.OpPrereqError("Invalid checks to be skipped specified")
875 def BuildHooksEnv(self):
878 Cluster-Verify hooks just rone in the post phase and their failure makes
879 the output be logged in the verify output and the verification to fail.
882 all_nodes = self.cfg.GetNodeList()
883 # TODO: populate the environment with useful information for verify hooks
885 return env, [], all_nodes
887 def Exec(self, feedback_fn):
888 """Verify integrity of cluster, performing various test on nodes.
892 feedback_fn("* Verifying global settings")
893 for msg in self.cfg.VerifyConfig():
894 feedback_fn(" - ERROR: %s" % msg)
896 vg_name = self.cfg.GetVGName()
897 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
898 nodelist = utils.NiceSort(self.cfg.GetNodeList())
899 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
900 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
901 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
902 for iname in instancelist)
903 i_non_redundant = [] # Non redundant instances
904 i_non_a_balanced = [] # Non auto-balanced instances
905 n_offline = [] # List of offline nodes
911 # FIXME: verify OS list
913 master_files = [constants.CLUSTER_CONF_FILE]
915 file_names = ssconf.SimpleStore().GetFileList()
916 file_names.append(constants.SSL_CERT_FILE)
917 file_names.append(constants.RAPI_CERT_FILE)
918 file_names.extend(master_files)
920 local_checksums = utils.FingerprintFiles(file_names)
922 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
923 node_verify_param = {
924 constants.NV_FILELIST: file_names,
925 constants.NV_NODELIST: [node.name for node in nodeinfo
926 if not node.offline],
927 constants.NV_HYPERVISOR: hypervisors,
928 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
929 node.secondary_ip) for node in nodeinfo
930 if not node.offline],
931 constants.NV_LVLIST: vg_name,
932 constants.NV_INSTANCELIST: hypervisors,
933 constants.NV_VGLIST: None,
934 constants.NV_VERSION: None,
935 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
936 constants.NV_DRBDLIST: None,
938 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
939 self.cfg.GetClusterName())
941 cluster = self.cfg.GetClusterInfo()
942 master_node = self.cfg.GetMasterNode()
943 all_drbd_map = self.cfg.ComputeDRBDMap()
945 for node_i in nodeinfo:
947 nresult = all_nvinfo[node].data
950 feedback_fn("* Skipping offline node %s" % (node,))
951 n_offline.append(node)
954 if node == master_node:
956 elif node_i.master_candidate:
957 ntype = "master candidate"
960 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
962 if all_nvinfo[node].failed or not isinstance(nresult, dict):
963 feedback_fn(" - ERROR: connection to %s failed" % (node,))
968 for minor, instance in all_drbd_map[node].items():
969 instance = instanceinfo[instance]
970 node_drbd[minor] = (instance.name, instance.admin_up)
971 result = self._VerifyNode(node_i, file_names, local_checksums,
972 nresult, feedback_fn, master_files,
976 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
977 if isinstance(lvdata, basestring):
978 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
979 (node, lvdata.encode('string_escape')))
981 node_volume[node] = {}
982 elif not isinstance(lvdata, dict):
983 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
987 node_volume[node] = lvdata
990 idata = nresult.get(constants.NV_INSTANCELIST, None)
991 if not isinstance(idata, list):
992 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
997 node_instance[node] = idata
1000 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1001 if not isinstance(nodeinfo, dict):
1002 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1008 "mfree": int(nodeinfo['memory_free']),
1009 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1012 # dictionary holding all instances this node is secondary for,
1013 # grouped by their primary node. Each key is a cluster node, and each
1014 # value is a list of instances which have the key as primary and the
1015 # current node as secondary. this is handy to calculate N+1 memory
1016 # availability if you can only failover from a primary to its
1018 "sinst-by-pnode": {},
1021 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
1025 node_vol_should = {}
1027 for instance in instancelist:
1028 feedback_fn("* Verifying instance %s" % instance)
1029 inst_config = instanceinfo[instance]
1030 result = self._VerifyInstance(instance, inst_config, node_volume,
1031 node_instance, feedback_fn, n_offline)
1033 inst_nodes_offline = []
1035 inst_config.MapLVsByNode(node_vol_should)
1037 instance_cfg[instance] = inst_config
1039 pnode = inst_config.primary_node
1040 if pnode in node_info:
1041 node_info[pnode]['pinst'].append(instance)
1042 elif pnode not in n_offline:
1043 feedback_fn(" - ERROR: instance %s, connection to primary node"
1044 " %s failed" % (instance, pnode))
1047 if pnode in n_offline:
1048 inst_nodes_offline.append(pnode)
1050 # If the instance is non-redundant we cannot survive losing its primary
1051 # node, so we are not N+1 compliant. On the other hand we have no disk
1052 # templates with more than one secondary so that situation is not well
1054 # FIXME: does not support file-backed instances
1055 if len(inst_config.secondary_nodes) == 0:
1056 i_non_redundant.append(instance)
1057 elif len(inst_config.secondary_nodes) > 1:
1058 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1061 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1062 i_non_a_balanced.append(instance)
1064 for snode in inst_config.secondary_nodes:
1065 if snode in node_info:
1066 node_info[snode]['sinst'].append(instance)
1067 if pnode not in node_info[snode]['sinst-by-pnode']:
1068 node_info[snode]['sinst-by-pnode'][pnode] = []
1069 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1070 elif snode not in n_offline:
1071 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1072 " %s failed" % (instance, snode))
1074 if snode in n_offline:
1075 inst_nodes_offline.append(snode)
1077 if inst_nodes_offline:
1078 # warn that the instance lives on offline nodes, and set bad=True
1079 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1080 ", ".join(inst_nodes_offline))
1083 feedback_fn("* Verifying orphan volumes")
1084 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1088 feedback_fn("* Verifying remaining instances")
1089 result = self._VerifyOrphanInstances(instancelist, node_instance,
1093 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1094 feedback_fn("* Verifying N+1 Memory redundancy")
1095 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1098 feedback_fn("* Other Notes")
1100 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1101 % len(i_non_redundant))
1103 if i_non_a_balanced:
1104 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1105 % len(i_non_a_balanced))
1108 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1112 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1113 """Analize the post-hooks' result
1115 This method analyses the hook result, handles it, and sends some
1116 nicely-formatted feedback back to the user.
1118 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1119 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1120 @param hooks_results: the results of the multi-node hooks rpc call
1121 @param feedback_fn: function used send feedback back to the caller
1122 @param lu_result: previous Exec result
1123 @return: the new Exec result, based on the previous result
1127 # We only really run POST phase hooks, and are only interested in
1129 if phase == constants.HOOKS_PHASE_POST:
1130 # Used to change hooks' output to proper indentation
1131 indent_re = re.compile('^', re.M)
1132 feedback_fn("* Hooks Results")
1133 if not hooks_results:
1134 feedback_fn(" - ERROR: general communication failure")
1137 for node_name in hooks_results:
1138 show_node_header = True
1139 res = hooks_results[node_name]
1140 if res.failed or res.data is False or not isinstance(res.data, list):
1142 # no need to warn or set fail return value
1144 feedback_fn(" Communication failure in hooks execution")
1147 for script, hkr, output in res.data:
1148 if hkr == constants.HKR_FAIL:
1149 # The node header is only shown once, if there are
1150 # failing hooks on that node
1151 if show_node_header:
1152 feedback_fn(" Node %s:" % node_name)
1153 show_node_header = False
1154 feedback_fn(" ERROR: Script %s failed, output:" % script)
1155 output = indent_re.sub(' ', output)
1156 feedback_fn("%s" % output)
1162 class LUVerifyDisks(NoHooksLU):
1163 """Verifies the cluster disks status.
1169 def ExpandNames(self):
1170 self.needed_locks = {
1171 locking.LEVEL_NODE: locking.ALL_SET,
1172 locking.LEVEL_INSTANCE: locking.ALL_SET,
1174 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1176 def CheckPrereq(self):
1177 """Check prerequisites.
1179 This has no prerequisites.
1184 def Exec(self, feedback_fn):
1185 """Verify integrity of cluster disks.
1188 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1190 vg_name = self.cfg.GetVGName()
1191 nodes = utils.NiceSort(self.cfg.GetNodeList())
1192 instances = [self.cfg.GetInstanceInfo(name)
1193 for name in self.cfg.GetInstanceList()]
1196 for inst in instances:
1198 if (not inst.admin_up or
1199 inst.disk_template not in constants.DTS_NET_MIRROR):
1201 inst.MapLVsByNode(inst_lvs)
1202 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1203 for node, vol_list in inst_lvs.iteritems():
1204 for vol in vol_list:
1205 nv_dict[(node, vol)] = inst
1210 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1215 lvs = node_lvs[node]
1218 self.LogWarning("Connection to node %s failed: %s" %
1222 if isinstance(lvs, basestring):
1223 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1224 res_nlvm[node] = lvs
1225 elif not isinstance(lvs, dict):
1226 logging.warning("Connection to node %s failed or invalid data"
1228 res_nodes.append(node)
1231 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1232 inst = nv_dict.pop((node, lv_name), None)
1233 if (not lv_online and inst is not None
1234 and inst.name not in res_instances):
1235 res_instances.append(inst.name)
1237 # any leftover items in nv_dict are missing LVs, let's arrange the
1239 for key, inst in nv_dict.iteritems():
1240 if inst.name not in res_missing:
1241 res_missing[inst.name] = []
1242 res_missing[inst.name].append(key)
1247 class LURenameCluster(LogicalUnit):
1248 """Rename the cluster.
1251 HPATH = "cluster-rename"
1252 HTYPE = constants.HTYPE_CLUSTER
1255 def BuildHooksEnv(self):
1260 "OP_TARGET": self.cfg.GetClusterName(),
1261 "NEW_NAME": self.op.name,
1263 mn = self.cfg.GetMasterNode()
1264 return env, [mn], [mn]
1266 def CheckPrereq(self):
1267 """Verify that the passed name is a valid one.
1270 hostname = utils.HostInfo(self.op.name)
1272 new_name = hostname.name
1273 self.ip = new_ip = hostname.ip
1274 old_name = self.cfg.GetClusterName()
1275 old_ip = self.cfg.GetMasterIP()
1276 if new_name == old_name and new_ip == old_ip:
1277 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1278 " cluster has changed")
1279 if new_ip != old_ip:
1280 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1281 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1282 " reachable on the network. Aborting." %
1285 self.op.name = new_name
1287 def Exec(self, feedback_fn):
1288 """Rename the cluster.
1291 clustername = self.op.name
1294 # shutdown the master IP
1295 master = self.cfg.GetMasterNode()
1296 result = self.rpc.call_node_stop_master(master, False)
1297 if result.failed or not result.data:
1298 raise errors.OpExecError("Could not disable the master role")
1301 cluster = self.cfg.GetClusterInfo()
1302 cluster.cluster_name = clustername
1303 cluster.master_ip = ip
1304 self.cfg.Update(cluster)
1306 # update the known hosts file
1307 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1308 node_list = self.cfg.GetNodeList()
1310 node_list.remove(master)
1313 result = self.rpc.call_upload_file(node_list,
1314 constants.SSH_KNOWN_HOSTS_FILE)
1315 for to_node, to_result in result.iteritems():
1316 if to_result.failed or not to_result.data:
1317 logging.error("Copy of file %s to node %s failed",
1318 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1321 result = self.rpc.call_node_start_master(master, False)
1322 if result.failed or not result.data:
1323 self.LogWarning("Could not re-enable the master role on"
1324 " the master, please restart manually.")
1327 def _RecursiveCheckIfLVMBased(disk):
1328 """Check if the given disk or its children are lvm-based.
1330 @type disk: L{objects.Disk}
1331 @param disk: the disk to check
1333 @return: boolean indicating whether a LD_LV dev_type was found or not
1337 for chdisk in disk.children:
1338 if _RecursiveCheckIfLVMBased(chdisk):
1340 return disk.dev_type == constants.LD_LV
1343 class LUSetClusterParams(LogicalUnit):
1344 """Change the parameters of the cluster.
1347 HPATH = "cluster-modify"
1348 HTYPE = constants.HTYPE_CLUSTER
1352 def CheckParameters(self):
1356 if not hasattr(self.op, "candidate_pool_size"):
1357 self.op.candidate_pool_size = None
1358 if self.op.candidate_pool_size is not None:
1360 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1361 except ValueError, err:
1362 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1364 if self.op.candidate_pool_size < 1:
1365 raise errors.OpPrereqError("At least one master candidate needed")
1367 def ExpandNames(self):
1368 # FIXME: in the future maybe other cluster params won't require checking on
1369 # all nodes to be modified.
1370 self.needed_locks = {
1371 locking.LEVEL_NODE: locking.ALL_SET,
1373 self.share_locks[locking.LEVEL_NODE] = 1
1375 def BuildHooksEnv(self):
1380 "OP_TARGET": self.cfg.GetClusterName(),
1381 "NEW_VG_NAME": self.op.vg_name,
1383 mn = self.cfg.GetMasterNode()
1384 return env, [mn], [mn]
1386 def CheckPrereq(self):
1387 """Check prerequisites.
1389 This checks whether the given params don't conflict and
1390 if the given volume group is valid.
1393 # FIXME: This only works because there is only one parameter that can be
1394 # changed or removed.
1395 if self.op.vg_name is not None and not self.op.vg_name:
1396 instances = self.cfg.GetAllInstancesInfo().values()
1397 for inst in instances:
1398 for disk in inst.disks:
1399 if _RecursiveCheckIfLVMBased(disk):
1400 raise errors.OpPrereqError("Cannot disable lvm storage while"
1401 " lvm-based instances exist")
1403 node_list = self.acquired_locks[locking.LEVEL_NODE]
1405 # if vg_name not None, checks given volume group on all nodes
1407 vglist = self.rpc.call_vg_list(node_list)
1408 for node in node_list:
1409 if vglist[node].failed:
1410 # ignoring down node
1411 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1413 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1415 constants.MIN_VG_SIZE)
1417 raise errors.OpPrereqError("Error on node '%s': %s" %
1420 self.cluster = cluster = self.cfg.GetClusterInfo()
1421 # validate beparams changes
1422 if self.op.beparams:
1423 utils.CheckBEParams(self.op.beparams)
1424 self.new_beparams = cluster.FillDict(
1425 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1427 # hypervisor list/parameters
1428 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1429 if self.op.hvparams:
1430 if not isinstance(self.op.hvparams, dict):
1431 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1432 for hv_name, hv_dict in self.op.hvparams.items():
1433 if hv_name not in self.new_hvparams:
1434 self.new_hvparams[hv_name] = hv_dict
1436 self.new_hvparams[hv_name].update(hv_dict)
1438 if self.op.enabled_hypervisors is not None:
1439 self.hv_list = self.op.enabled_hypervisors
1441 self.hv_list = cluster.enabled_hypervisors
1443 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1444 # either the enabled list has changed, or the parameters have, validate
1445 for hv_name, hv_params in self.new_hvparams.items():
1446 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1447 (self.op.enabled_hypervisors and
1448 hv_name in self.op.enabled_hypervisors)):
1449 # either this is a new hypervisor, or its parameters have changed
1450 hv_class = hypervisor.GetHypervisor(hv_name)
1451 hv_class.CheckParameterSyntax(hv_params)
1452 _CheckHVParams(self, node_list, hv_name, hv_params)
1454 def Exec(self, feedback_fn):
1455 """Change the parameters of the cluster.
1458 if self.op.vg_name is not None:
1459 if self.op.vg_name != self.cfg.GetVGName():
1460 self.cfg.SetVGName(self.op.vg_name)
1462 feedback_fn("Cluster LVM configuration already in desired"
1463 " state, not changing")
1464 if self.op.hvparams:
1465 self.cluster.hvparams = self.new_hvparams
1466 if self.op.enabled_hypervisors is not None:
1467 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1468 if self.op.beparams:
1469 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1470 if self.op.candidate_pool_size is not None:
1471 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1473 self.cfg.Update(self.cluster)
1475 # we want to update nodes after the cluster so that if any errors
1476 # happen, we have recorded and saved the cluster info
1477 if self.op.candidate_pool_size is not None:
1478 _AdjustCandidatePool(self)
1481 class LURedistributeConfig(NoHooksLU):
1482 """Force the redistribution of cluster configuration.
1484 This is a very simple LU.
1490 def ExpandNames(self):
1491 self.needed_locks = {
1492 locking.LEVEL_NODE: locking.ALL_SET,
1494 self.share_locks[locking.LEVEL_NODE] = 1
1496 def CheckPrereq(self):
1497 """Check prerequisites.
1501 def Exec(self, feedback_fn):
1502 """Redistribute the configuration.
1505 self.cfg.Update(self.cfg.GetClusterInfo())
1508 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1509 """Sleep and poll for an instance's disk to sync.
1512 if not instance.disks:
1516 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1518 node = instance.primary_node
1520 for dev in instance.disks:
1521 lu.cfg.SetDiskID(dev, node)
1527 cumul_degraded = False
1528 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1529 if rstats.failed or not rstats.data:
1530 lu.LogWarning("Can't get any data from node %s", node)
1533 raise errors.RemoteError("Can't contact node %s for mirror data,"
1534 " aborting." % node)
1537 rstats = rstats.data
1539 for i, mstat in enumerate(rstats):
1541 lu.LogWarning("Can't compute data for node %s/%s",
1542 node, instance.disks[i].iv_name)
1544 # we ignore the ldisk parameter
1545 perc_done, est_time, is_degraded, _ = mstat
1546 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1547 if perc_done is not None:
1549 if est_time is not None:
1550 rem_time = "%d estimated seconds remaining" % est_time
1553 rem_time = "no time estimate"
1554 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1555 (instance.disks[i].iv_name, perc_done, rem_time))
1559 time.sleep(min(60, max_time))
1562 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1563 return not cumul_degraded
1566 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1567 """Check that mirrors are not degraded.
1569 The ldisk parameter, if True, will change the test from the
1570 is_degraded attribute (which represents overall non-ok status for
1571 the device(s)) to the ldisk (representing the local storage status).
1574 lu.cfg.SetDiskID(dev, node)
1581 if on_primary or dev.AssembleOnSecondary():
1582 rstats = lu.rpc.call_blockdev_find(node, dev)
1583 if rstats.failed or not rstats.data:
1584 logging.warning("Node %s: disk degraded, not found or node down", node)
1587 result = result and (not rstats.data[idx])
1589 for child in dev.children:
1590 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1595 class LUDiagnoseOS(NoHooksLU):
1596 """Logical unit for OS diagnose/query.
1599 _OP_REQP = ["output_fields", "names"]
1601 _FIELDS_STATIC = utils.FieldSet()
1602 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1604 def ExpandNames(self):
1606 raise errors.OpPrereqError("Selective OS query not supported")
1608 _CheckOutputFields(static=self._FIELDS_STATIC,
1609 dynamic=self._FIELDS_DYNAMIC,
1610 selected=self.op.output_fields)
1612 # Lock all nodes, in shared mode
1613 self.needed_locks = {}
1614 self.share_locks[locking.LEVEL_NODE] = 1
1615 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1617 def CheckPrereq(self):
1618 """Check prerequisites.
1623 def _DiagnoseByOS(node_list, rlist):
1624 """Remaps a per-node return list into an a per-os per-node dictionary
1626 @param node_list: a list with the names of all nodes
1627 @param rlist: a map with node names as keys and OS objects as values
1630 @returns: a dictionary with osnames as keys and as value another map, with
1631 nodes as keys and list of OS objects as values, eg::
1633 {"debian-etch": {"node1": [<object>,...],
1634 "node2": [<object>,]}
1639 for node_name, nr in rlist.iteritems():
1640 if nr.failed or not nr.data:
1642 for os_obj in nr.data:
1643 if os_obj.name not in all_os:
1644 # build a list of nodes for this os containing empty lists
1645 # for each node in node_list
1646 all_os[os_obj.name] = {}
1647 for nname in node_list:
1648 all_os[os_obj.name][nname] = []
1649 all_os[os_obj.name][node_name].append(os_obj)
1652 def Exec(self, feedback_fn):
1653 """Compute the list of OSes.
1656 node_list = self.acquired_locks[locking.LEVEL_NODE]
1657 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1658 if node in node_list]
1659 node_data = self.rpc.call_os_diagnose(valid_nodes)
1660 if node_data == False:
1661 raise errors.OpExecError("Can't gather the list of OSes")
1662 pol = self._DiagnoseByOS(valid_nodes, node_data)
1664 for os_name, os_data in pol.iteritems():
1666 for field in self.op.output_fields:
1669 elif field == "valid":
1670 val = utils.all([osl and osl[0] for osl in os_data.values()])
1671 elif field == "node_status":
1673 for node_name, nos_list in os_data.iteritems():
1674 val[node_name] = [(v.status, v.path) for v in nos_list]
1676 raise errors.ParameterError(field)
1683 class LURemoveNode(LogicalUnit):
1684 """Logical unit for removing a node.
1687 HPATH = "node-remove"
1688 HTYPE = constants.HTYPE_NODE
1689 _OP_REQP = ["node_name"]
1691 def BuildHooksEnv(self):
1694 This doesn't run on the target node in the pre phase as a failed
1695 node would then be impossible to remove.
1699 "OP_TARGET": self.op.node_name,
1700 "NODE_NAME": self.op.node_name,
1702 all_nodes = self.cfg.GetNodeList()
1703 all_nodes.remove(self.op.node_name)
1704 return env, all_nodes, all_nodes
1706 def CheckPrereq(self):
1707 """Check prerequisites.
1710 - the node exists in the configuration
1711 - it does not have primary or secondary instances
1712 - it's not the master
1714 Any errors are signalled by raising errors.OpPrereqError.
1717 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1719 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1721 instance_list = self.cfg.GetInstanceList()
1723 masternode = self.cfg.GetMasterNode()
1724 if node.name == masternode:
1725 raise errors.OpPrereqError("Node is the master node,"
1726 " you need to failover first.")
1728 for instance_name in instance_list:
1729 instance = self.cfg.GetInstanceInfo(instance_name)
1730 if node.name in instance.all_nodes:
1731 raise errors.OpPrereqError("Instance %s is still running on the node,"
1732 " please remove first." % instance_name)
1733 self.op.node_name = node.name
1736 def Exec(self, feedback_fn):
1737 """Removes the node from the cluster.
1741 logging.info("Stopping the node daemon and removing configs from node %s",
1744 self.context.RemoveNode(node.name)
1746 self.rpc.call_node_leave_cluster(node.name)
1748 # Promote nodes to master candidate as needed
1749 _AdjustCandidatePool(self)
1752 class LUQueryNodes(NoHooksLU):
1753 """Logical unit for querying nodes.
1756 _OP_REQP = ["output_fields", "names"]
1758 _FIELDS_DYNAMIC = utils.FieldSet(
1760 "mtotal", "mnode", "mfree",
1765 _FIELDS_STATIC = utils.FieldSet(
1766 "name", "pinst_cnt", "sinst_cnt",
1767 "pinst_list", "sinst_list",
1768 "pip", "sip", "tags",
1775 def ExpandNames(self):
1776 _CheckOutputFields(static=self._FIELDS_STATIC,
1777 dynamic=self._FIELDS_DYNAMIC,
1778 selected=self.op.output_fields)
1780 self.needed_locks = {}
1781 self.share_locks[locking.LEVEL_NODE] = 1
1784 self.wanted = _GetWantedNodes(self, self.op.names)
1786 self.wanted = locking.ALL_SET
1788 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1790 # if we don't request only static fields, we need to lock the nodes
1791 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1794 def CheckPrereq(self):
1795 """Check prerequisites.
1798 # The validation of the node list is done in the _GetWantedNodes,
1799 # if non empty, and if empty, there's no validation to do
1802 def Exec(self, feedback_fn):
1803 """Computes the list of nodes and their attributes.
1806 all_info = self.cfg.GetAllNodesInfo()
1808 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1809 elif self.wanted != locking.ALL_SET:
1810 nodenames = self.wanted
1811 missing = set(nodenames).difference(all_info.keys())
1813 raise errors.OpExecError(
1814 "Some nodes were removed before retrieving their data: %s" % missing)
1816 nodenames = all_info.keys()
1818 nodenames = utils.NiceSort(nodenames)
1819 nodelist = [all_info[name] for name in nodenames]
1821 # begin data gathering
1825 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1826 self.cfg.GetHypervisorType())
1827 for name in nodenames:
1828 nodeinfo = node_data[name]
1829 if not nodeinfo.failed and nodeinfo.data:
1830 nodeinfo = nodeinfo.data
1831 fn = utils.TryConvert
1833 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1834 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1835 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1836 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1837 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1838 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1839 "bootid": nodeinfo.get('bootid', None),
1842 live_data[name] = {}
1844 live_data = dict.fromkeys(nodenames, {})
1846 node_to_primary = dict([(name, set()) for name in nodenames])
1847 node_to_secondary = dict([(name, set()) for name in nodenames])
1849 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1850 "sinst_cnt", "sinst_list"))
1851 if inst_fields & frozenset(self.op.output_fields):
1852 instancelist = self.cfg.GetInstanceList()
1854 for instance_name in instancelist:
1855 inst = self.cfg.GetInstanceInfo(instance_name)
1856 if inst.primary_node in node_to_primary:
1857 node_to_primary[inst.primary_node].add(inst.name)
1858 for secnode in inst.secondary_nodes:
1859 if secnode in node_to_secondary:
1860 node_to_secondary[secnode].add(inst.name)
1862 master_node = self.cfg.GetMasterNode()
1864 # end data gathering
1867 for node in nodelist:
1869 for field in self.op.output_fields:
1872 elif field == "pinst_list":
1873 val = list(node_to_primary[node.name])
1874 elif field == "sinst_list":
1875 val = list(node_to_secondary[node.name])
1876 elif field == "pinst_cnt":
1877 val = len(node_to_primary[node.name])
1878 elif field == "sinst_cnt":
1879 val = len(node_to_secondary[node.name])
1880 elif field == "pip":
1881 val = node.primary_ip
1882 elif field == "sip":
1883 val = node.secondary_ip
1884 elif field == "tags":
1885 val = list(node.GetTags())
1886 elif field == "serial_no":
1887 val = node.serial_no
1888 elif field == "master_candidate":
1889 val = node.master_candidate
1890 elif field == "master":
1891 val = node.name == master_node
1892 elif field == "offline":
1894 elif self._FIELDS_DYNAMIC.Matches(field):
1895 val = live_data[node.name].get(field, None)
1897 raise errors.ParameterError(field)
1898 node_output.append(val)
1899 output.append(node_output)
1904 class LUQueryNodeVolumes(NoHooksLU):
1905 """Logical unit for getting volumes on node(s).
1908 _OP_REQP = ["nodes", "output_fields"]
1910 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1911 _FIELDS_STATIC = utils.FieldSet("node")
1913 def ExpandNames(self):
1914 _CheckOutputFields(static=self._FIELDS_STATIC,
1915 dynamic=self._FIELDS_DYNAMIC,
1916 selected=self.op.output_fields)
1918 self.needed_locks = {}
1919 self.share_locks[locking.LEVEL_NODE] = 1
1920 if not self.op.nodes:
1921 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1923 self.needed_locks[locking.LEVEL_NODE] = \
1924 _GetWantedNodes(self, self.op.nodes)
1926 def CheckPrereq(self):
1927 """Check prerequisites.
1929 This checks that the fields required are valid output fields.
1932 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1934 def Exec(self, feedback_fn):
1935 """Computes the list of nodes and their attributes.
1938 nodenames = self.nodes
1939 volumes = self.rpc.call_node_volumes(nodenames)
1941 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1942 in self.cfg.GetInstanceList()]
1944 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1947 for node in nodenames:
1948 if node not in volumes or volumes[node].failed or not volumes[node].data:
1951 node_vols = volumes[node].data[:]
1952 node_vols.sort(key=lambda vol: vol['dev'])
1954 for vol in node_vols:
1956 for field in self.op.output_fields:
1959 elif field == "phys":
1963 elif field == "name":
1965 elif field == "size":
1966 val = int(float(vol['size']))
1967 elif field == "instance":
1969 if node not in lv_by_node[inst]:
1971 if vol['name'] in lv_by_node[inst][node]:
1977 raise errors.ParameterError(field)
1978 node_output.append(str(val))
1980 output.append(node_output)
1985 class LUAddNode(LogicalUnit):
1986 """Logical unit for adding node to the cluster.
1990 HTYPE = constants.HTYPE_NODE
1991 _OP_REQP = ["node_name"]
1993 def BuildHooksEnv(self):
1996 This will run on all nodes before, and on all nodes + the new node after.
2000 "OP_TARGET": self.op.node_name,
2001 "NODE_NAME": self.op.node_name,
2002 "NODE_PIP": self.op.primary_ip,
2003 "NODE_SIP": self.op.secondary_ip,
2005 nodes_0 = self.cfg.GetNodeList()
2006 nodes_1 = nodes_0 + [self.op.node_name, ]
2007 return env, nodes_0, nodes_1
2009 def CheckPrereq(self):
2010 """Check prerequisites.
2013 - the new node is not already in the config
2015 - its parameters (single/dual homed) matches the cluster
2017 Any errors are signalled by raising errors.OpPrereqError.
2020 node_name = self.op.node_name
2023 dns_data = utils.HostInfo(node_name)
2025 node = dns_data.name
2026 primary_ip = self.op.primary_ip = dns_data.ip
2027 secondary_ip = getattr(self.op, "secondary_ip", None)
2028 if secondary_ip is None:
2029 secondary_ip = primary_ip
2030 if not utils.IsValidIP(secondary_ip):
2031 raise errors.OpPrereqError("Invalid secondary IP given")
2032 self.op.secondary_ip = secondary_ip
2034 node_list = cfg.GetNodeList()
2035 if not self.op.readd and node in node_list:
2036 raise errors.OpPrereqError("Node %s is already in the configuration" %
2038 elif self.op.readd and node not in node_list:
2039 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2041 for existing_node_name in node_list:
2042 existing_node = cfg.GetNodeInfo(existing_node_name)
2044 if self.op.readd and node == existing_node_name:
2045 if (existing_node.primary_ip != primary_ip or
2046 existing_node.secondary_ip != secondary_ip):
2047 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2048 " address configuration as before")
2051 if (existing_node.primary_ip == primary_ip or
2052 existing_node.secondary_ip == primary_ip or
2053 existing_node.primary_ip == secondary_ip or
2054 existing_node.secondary_ip == secondary_ip):
2055 raise errors.OpPrereqError("New node ip address(es) conflict with"
2056 " existing node %s" % existing_node.name)
2058 # check that the type of the node (single versus dual homed) is the
2059 # same as for the master
2060 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061 master_singlehomed = myself.secondary_ip == myself.primary_ip
2062 newbie_singlehomed = secondary_ip == primary_ip
2063 if master_singlehomed != newbie_singlehomed:
2064 if master_singlehomed:
2065 raise errors.OpPrereqError("The master has no private ip but the"
2066 " new node has one")
2068 raise errors.OpPrereqError("The master has a private ip but the"
2069 " new node doesn't have one")
2071 # checks reachablity
2072 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2073 raise errors.OpPrereqError("Node not reachable by ping")
2075 if not newbie_singlehomed:
2076 # check reachability from my secondary ip to newbie's secondary ip
2077 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2078 source=myself.secondary_ip):
2079 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2080 " based ping to noded port")
2082 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2083 mc_now, _ = self.cfg.GetMasterCandidateStats()
2084 master_candidate = mc_now < cp_size
2086 self.new_node = objects.Node(name=node,
2087 primary_ip=primary_ip,
2088 secondary_ip=secondary_ip,
2089 master_candidate=master_candidate,
2092 def Exec(self, feedback_fn):
2093 """Adds the new node to the cluster.
2096 new_node = self.new_node
2097 node = new_node.name
2099 # check connectivity
2100 result = self.rpc.call_version([node])[node]
2103 if constants.PROTOCOL_VERSION == result.data:
2104 logging.info("Communication to node %s fine, sw version %s match",
2107 raise errors.OpExecError("Version mismatch master version %s,"
2108 " node version %s" %
2109 (constants.PROTOCOL_VERSION, result.data))
2111 raise errors.OpExecError("Cannot get version from the new node")
2114 logging.info("Copy ssh key to node %s", node)
2115 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2117 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2118 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2124 keyarray.append(f.read())
2128 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2130 keyarray[3], keyarray[4], keyarray[5])
2132 msg = result.RemoteFailMsg()
2134 raise errors.OpExecError("Cannot transfer ssh keys to the"
2135 " new node: %s" % msg)
2137 # Add node to our /etc/hosts, and add key to known_hosts
2138 utils.AddHostToEtcHosts(new_node.name)
2140 if new_node.secondary_ip != new_node.primary_ip:
2141 result = self.rpc.call_node_has_ip_address(new_node.name,
2142 new_node.secondary_ip)
2143 if result.failed or not result.data:
2144 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2145 " you gave (%s). Please fix and re-run this"
2146 " command." % new_node.secondary_ip)
2148 node_verify_list = [self.cfg.GetMasterNode()]
2149 node_verify_param = {
2151 # TODO: do a node-net-test as well?
2154 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2155 self.cfg.GetClusterName())
2156 for verifier in node_verify_list:
2157 if result[verifier].failed or not result[verifier].data:
2158 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2159 " for remote verification" % verifier)
2160 if result[verifier].data['nodelist']:
2161 for failed in result[verifier].data['nodelist']:
2162 feedback_fn("ssh/hostname verification failed %s -> %s" %
2163 (verifier, result[verifier]['nodelist'][failed]))
2164 raise errors.OpExecError("ssh/hostname verification failed.")
2166 # Distribute updated /etc/hosts and known_hosts to all nodes,
2167 # including the node just added
2168 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2169 dist_nodes = self.cfg.GetNodeList()
2170 if not self.op.readd:
2171 dist_nodes.append(node)
2172 if myself.name in dist_nodes:
2173 dist_nodes.remove(myself.name)
2175 logging.debug("Copying hosts and known_hosts to all nodes")
2176 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2177 result = self.rpc.call_upload_file(dist_nodes, fname)
2178 for to_node, to_result in result.iteritems():
2179 if to_result.failed or not to_result.data:
2180 logging.error("Copy of file %s to node %s failed", fname, to_node)
2183 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2184 if constants.HTS_USE_VNC.intersection(enabled_hypervisors):
2185 to_copy.append(constants.VNC_PASSWORD_FILE)
2187 for fname in to_copy:
2188 result = self.rpc.call_upload_file([node], fname)
2189 if result[node].failed or not result[node]:
2190 logging.error("Could not copy file %s to node %s", fname, node)
2193 self.context.ReaddNode(new_node)
2195 self.context.AddNode(new_node)
2198 class LUSetNodeParams(LogicalUnit):
2199 """Modifies the parameters of a node.
2202 HPATH = "node-modify"
2203 HTYPE = constants.HTYPE_NODE
2204 _OP_REQP = ["node_name"]
2207 def CheckArguments(self):
2208 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2209 if node_name is None:
2210 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2211 self.op.node_name = node_name
2212 _CheckBooleanOpField(self.op, 'master_candidate')
2213 _CheckBooleanOpField(self.op, 'offline')
2214 if self.op.master_candidate is None and self.op.offline is None:
2215 raise errors.OpPrereqError("Please pass at least one modification")
2216 if self.op.offline == True and self.op.master_candidate == True:
2217 raise errors.OpPrereqError("Can't set the node into offline and"
2218 " master_candidate at the same time")
2220 def ExpandNames(self):
2221 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2223 def BuildHooksEnv(self):
2226 This runs on the master node.
2230 "OP_TARGET": self.op.node_name,
2231 "MASTER_CANDIDATE": str(self.op.master_candidate),
2232 "OFFLINE": str(self.op.offline),
2234 nl = [self.cfg.GetMasterNode(),
2238 def CheckPrereq(self):
2239 """Check prerequisites.
2241 This only checks the instance list against the existing names.
2244 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2246 if ((self.op.master_candidate == False or self.op.offline == True)
2247 and node.master_candidate):
2248 # we will demote the node from master_candidate
2249 if self.op.node_name == self.cfg.GetMasterNode():
2250 raise errors.OpPrereqError("The master node has to be a"
2251 " master candidate and online")
2252 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2253 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2254 if num_candidates <= cp_size:
2255 msg = ("Not enough master candidates (desired"
2256 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2258 self.LogWarning(msg)
2260 raise errors.OpPrereqError(msg)
2262 if (self.op.master_candidate == True and node.offline and
2263 not self.op.offline == False):
2264 raise errors.OpPrereqError("Can't set an offline node to"
2265 " master_candidate")
2269 def Exec(self, feedback_fn):
2277 if self.op.offline is not None:
2278 node.offline = self.op.offline
2279 result.append(("offline", str(self.op.offline)))
2280 if self.op.offline == True and node.master_candidate:
2281 node.master_candidate = False
2282 result.append(("master_candidate", "auto-demotion due to offline"))
2284 if self.op.master_candidate is not None:
2285 node.master_candidate = self.op.master_candidate
2286 result.append(("master_candidate", str(self.op.master_candidate)))
2287 if self.op.master_candidate == False:
2288 rrc = self.rpc.call_node_demote_from_mc(node.name)
2289 if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2290 or len(rrc.data) != 2):
2291 self.LogWarning("Node rpc error: %s" % rrc.error)
2292 elif not rrc.data[0]:
2293 self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2295 # this will trigger configuration file update, if needed
2296 self.cfg.Update(node)
2297 # this will trigger job queue propagation or cleanup
2298 if self.op.node_name != self.cfg.GetMasterNode():
2299 self.context.ReaddNode(node)
2304 class LUQueryClusterInfo(NoHooksLU):
2305 """Query cluster configuration.
2311 def ExpandNames(self):
2312 self.needed_locks = {}
2314 def CheckPrereq(self):
2315 """No prerequsites needed for this LU.
2320 def Exec(self, feedback_fn):
2321 """Return cluster config.
2324 cluster = self.cfg.GetClusterInfo()
2326 "software_version": constants.RELEASE_VERSION,
2327 "protocol_version": constants.PROTOCOL_VERSION,
2328 "config_version": constants.CONFIG_VERSION,
2329 "os_api_version": constants.OS_API_VERSION,
2330 "export_version": constants.EXPORT_VERSION,
2331 "architecture": (platform.architecture()[0], platform.machine()),
2332 "name": cluster.cluster_name,
2333 "master": cluster.master_node,
2334 "default_hypervisor": cluster.default_hypervisor,
2335 "enabled_hypervisors": cluster.enabled_hypervisors,
2336 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2337 for hypervisor in cluster.enabled_hypervisors]),
2338 "beparams": cluster.beparams,
2339 "candidate_pool_size": cluster.candidate_pool_size,
2345 class LUQueryConfigValues(NoHooksLU):
2346 """Return configuration values.
2351 _FIELDS_DYNAMIC = utils.FieldSet()
2352 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2354 def ExpandNames(self):
2355 self.needed_locks = {}
2357 _CheckOutputFields(static=self._FIELDS_STATIC,
2358 dynamic=self._FIELDS_DYNAMIC,
2359 selected=self.op.output_fields)
2361 def CheckPrereq(self):
2362 """No prerequisites.
2367 def Exec(self, feedback_fn):
2368 """Dump a representation of the cluster config to the standard output.
2372 for field in self.op.output_fields:
2373 if field == "cluster_name":
2374 entry = self.cfg.GetClusterName()
2375 elif field == "master_node":
2376 entry = self.cfg.GetMasterNode()
2377 elif field == "drain_flag":
2378 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2380 raise errors.ParameterError(field)
2381 values.append(entry)
2385 class LUActivateInstanceDisks(NoHooksLU):
2386 """Bring up an instance's disks.
2389 _OP_REQP = ["instance_name"]
2392 def ExpandNames(self):
2393 self._ExpandAndLockInstance()
2394 self.needed_locks[locking.LEVEL_NODE] = []
2395 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2397 def DeclareLocks(self, level):
2398 if level == locking.LEVEL_NODE:
2399 self._LockInstancesNodes()
2401 def CheckPrereq(self):
2402 """Check prerequisites.
2404 This checks that the instance is in the cluster.
2407 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2408 assert self.instance is not None, \
2409 "Cannot retrieve locked instance %s" % self.op.instance_name
2410 _CheckNodeOnline(self, self.instance.primary_node)
2412 def Exec(self, feedback_fn):
2413 """Activate the disks.
2416 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2418 raise errors.OpExecError("Cannot activate block devices")
2423 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2424 """Prepare the block devices for an instance.
2426 This sets up the block devices on all nodes.
2428 @type lu: L{LogicalUnit}
2429 @param lu: the logical unit on whose behalf we execute
2430 @type instance: L{objects.Instance}
2431 @param instance: the instance for whose disks we assemble
2432 @type ignore_secondaries: boolean
2433 @param ignore_secondaries: if true, errors on secondary nodes
2434 won't result in an error return from the function
2435 @return: False if the operation failed, otherwise a list of
2436 (host, instance_visible_name, node_visible_name)
2437 with the mapping from node devices to instance devices
2442 iname = instance.name
2443 # With the two passes mechanism we try to reduce the window of
2444 # opportunity for the race condition of switching DRBD to primary
2445 # before handshaking occured, but we do not eliminate it
2447 # The proper fix would be to wait (with some limits) until the
2448 # connection has been made and drbd transitions from WFConnection
2449 # into any other network-connected state (Connected, SyncTarget,
2452 # 1st pass, assemble on all nodes in secondary mode
2453 for inst_disk in instance.disks:
2454 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2455 lu.cfg.SetDiskID(node_disk, node)
2456 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2457 if result.failed or not result:
2458 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2459 " (is_primary=False, pass=1)",
2460 inst_disk.iv_name, node)
2461 if not ignore_secondaries:
2464 # FIXME: race condition on drbd migration to primary
2466 # 2nd pass, do only the primary node
2467 for inst_disk in instance.disks:
2468 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2469 if node != instance.primary_node:
2471 lu.cfg.SetDiskID(node_disk, node)
2472 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2473 if result.failed or not result:
2474 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2475 " (is_primary=True, pass=2)",
2476 inst_disk.iv_name, node)
2478 device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2480 # leave the disks configured for the primary node
2481 # this is a workaround that would be fixed better by
2482 # improving the logical/physical id handling
2483 for disk in instance.disks:
2484 lu.cfg.SetDiskID(disk, instance.primary_node)
2486 return disks_ok, device_info
2489 def _StartInstanceDisks(lu, instance, force):
2490 """Start the disks of an instance.
2493 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2494 ignore_secondaries=force)
2496 _ShutdownInstanceDisks(lu, instance)
2497 if force is not None and not force:
2498 lu.proc.LogWarning("", hint="If the message above refers to a"
2500 " you can retry the operation using '--force'.")
2501 raise errors.OpExecError("Disk consistency error")
2504 class LUDeactivateInstanceDisks(NoHooksLU):
2505 """Shutdown an instance's disks.
2508 _OP_REQP = ["instance_name"]
2511 def ExpandNames(self):
2512 self._ExpandAndLockInstance()
2513 self.needed_locks[locking.LEVEL_NODE] = []
2514 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2516 def DeclareLocks(self, level):
2517 if level == locking.LEVEL_NODE:
2518 self._LockInstancesNodes()
2520 def CheckPrereq(self):
2521 """Check prerequisites.
2523 This checks that the instance is in the cluster.
2526 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2527 assert self.instance is not None, \
2528 "Cannot retrieve locked instance %s" % self.op.instance_name
2530 def Exec(self, feedback_fn):
2531 """Deactivate the disks
2534 instance = self.instance
2535 _SafeShutdownInstanceDisks(self, instance)
2538 def _SafeShutdownInstanceDisks(lu, instance):
2539 """Shutdown block devices of an instance.
2541 This function checks if an instance is running, before calling
2542 _ShutdownInstanceDisks.
2545 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2546 [instance.hypervisor])
2547 ins_l = ins_l[instance.primary_node]
2548 if ins_l.failed or not isinstance(ins_l.data, list):
2549 raise errors.OpExecError("Can't contact node '%s'" %
2550 instance.primary_node)
2552 if instance.name in ins_l.data:
2553 raise errors.OpExecError("Instance is running, can't shutdown"
2556 _ShutdownInstanceDisks(lu, instance)
2559 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2560 """Shutdown block devices of an instance.
2562 This does the shutdown on all nodes of the instance.
2564 If the ignore_primary is false, errors on the primary node are
2569 for disk in instance.disks:
2570 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2571 lu.cfg.SetDiskID(top_disk, node)
2572 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2573 if result.failed or not result.data:
2574 logging.error("Could not shutdown block device %s on node %s",
2576 if not ignore_primary or node != instance.primary_node:
2581 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2582 """Checks if a node has enough free memory.
2584 This function check if a given node has the needed amount of free
2585 memory. In case the node has less memory or we cannot get the
2586 information from the node, this function raise an OpPrereqError
2589 @type lu: C{LogicalUnit}
2590 @param lu: a logical unit from which we get configuration data
2592 @param node: the node to check
2593 @type reason: C{str}
2594 @param reason: string to use in the error message
2595 @type requested: C{int}
2596 @param requested: the amount of memory in MiB to check for
2597 @type hypervisor_name: C{str}
2598 @param hypervisor_name: the hypervisor to ask for memory stats
2599 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2600 we cannot check the node
2603 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2604 nodeinfo[node].Raise()
2605 free_mem = nodeinfo[node].data.get('memory_free')
2606 if not isinstance(free_mem, int):
2607 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2608 " was '%s'" % (node, free_mem))
2609 if requested > free_mem:
2610 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2611 " needed %s MiB, available %s MiB" %
2612 (node, reason, requested, free_mem))
2615 class LUStartupInstance(LogicalUnit):
2616 """Starts an instance.
2619 HPATH = "instance-start"
2620 HTYPE = constants.HTYPE_INSTANCE
2621 _OP_REQP = ["instance_name", "force"]
2624 def ExpandNames(self):
2625 self._ExpandAndLockInstance()
2627 def BuildHooksEnv(self):
2630 This runs on master, primary and secondary nodes of the instance.
2634 "FORCE": self.op.force,
2636 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2637 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2640 def CheckPrereq(self):
2641 """Check prerequisites.
2643 This checks that the instance is in the cluster.
2646 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2647 assert self.instance is not None, \
2648 "Cannot retrieve locked instance %s" % self.op.instance_name
2650 _CheckNodeOnline(self, instance.primary_node)
2652 bep = self.cfg.GetClusterInfo().FillBE(instance)
2653 # check bridges existance
2654 _CheckInstanceBridgesExist(self, instance)
2656 _CheckNodeFreeMemory(self, instance.primary_node,
2657 "starting instance %s" % instance.name,
2658 bep[constants.BE_MEMORY], instance.hypervisor)
2660 def Exec(self, feedback_fn):
2661 """Start the instance.
2664 instance = self.instance
2665 force = self.op.force
2666 extra_args = getattr(self.op, "extra_args", "")
2668 self.cfg.MarkInstanceUp(instance.name)
2670 node_current = instance.primary_node
2672 _StartInstanceDisks(self, instance, force)
2674 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2675 msg = result.RemoteFailMsg()
2677 _ShutdownInstanceDisks(self, instance)
2678 raise errors.OpExecError("Could not start instance: %s" % msg)
2681 class LURebootInstance(LogicalUnit):
2682 """Reboot an instance.
2685 HPATH = "instance-reboot"
2686 HTYPE = constants.HTYPE_INSTANCE
2687 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2690 def ExpandNames(self):
2691 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2692 constants.INSTANCE_REBOOT_HARD,
2693 constants.INSTANCE_REBOOT_FULL]:
2694 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2695 (constants.INSTANCE_REBOOT_SOFT,
2696 constants.INSTANCE_REBOOT_HARD,
2697 constants.INSTANCE_REBOOT_FULL))
2698 self._ExpandAndLockInstance()
2700 def BuildHooksEnv(self):
2703 This runs on master, primary and secondary nodes of the instance.
2707 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2709 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2710 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2713 def CheckPrereq(self):
2714 """Check prerequisites.
2716 This checks that the instance is in the cluster.
2719 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2720 assert self.instance is not None, \
2721 "Cannot retrieve locked instance %s" % self.op.instance_name
2723 _CheckNodeOnline(self, instance.primary_node)
2725 # check bridges existance
2726 _CheckInstanceBridgesExist(self, instance)
2728 def Exec(self, feedback_fn):
2729 """Reboot the instance.
2732 instance = self.instance
2733 ignore_secondaries = self.op.ignore_secondaries
2734 reboot_type = self.op.reboot_type
2735 extra_args = getattr(self.op, "extra_args", "")
2737 node_current = instance.primary_node
2739 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2740 constants.INSTANCE_REBOOT_HARD]:
2741 result = self.rpc.call_instance_reboot(node_current, instance,
2742 reboot_type, extra_args)
2743 if result.failed or not result.data:
2744 raise errors.OpExecError("Could not reboot instance")
2746 if not self.rpc.call_instance_shutdown(node_current, instance):
2747 raise errors.OpExecError("could not shutdown instance for full reboot")
2748 _ShutdownInstanceDisks(self, instance)
2749 _StartInstanceDisks(self, instance, ignore_secondaries)
2750 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2751 msg = result.RemoteFailMsg()
2753 _ShutdownInstanceDisks(self, instance)
2754 raise errors.OpExecError("Could not start instance for"
2755 " full reboot: %s" % msg)
2757 self.cfg.MarkInstanceUp(instance.name)
2760 class LUShutdownInstance(LogicalUnit):
2761 """Shutdown an instance.
2764 HPATH = "instance-stop"
2765 HTYPE = constants.HTYPE_INSTANCE
2766 _OP_REQP = ["instance_name"]
2769 def ExpandNames(self):
2770 self._ExpandAndLockInstance()
2772 def BuildHooksEnv(self):
2775 This runs on master, primary and secondary nodes of the instance.
2778 env = _BuildInstanceHookEnvByObject(self, self.instance)
2779 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2782 def CheckPrereq(self):
2783 """Check prerequisites.
2785 This checks that the instance is in the cluster.
2788 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2789 assert self.instance is not None, \
2790 "Cannot retrieve locked instance %s" % self.op.instance_name
2791 _CheckNodeOnline(self, self.instance.primary_node)
2793 def Exec(self, feedback_fn):
2794 """Shutdown the instance.
2797 instance = self.instance
2798 node_current = instance.primary_node
2799 self.cfg.MarkInstanceDown(instance.name)
2800 result = self.rpc.call_instance_shutdown(node_current, instance)
2801 if result.failed or not result.data:
2802 self.proc.LogWarning("Could not shutdown instance")
2804 _ShutdownInstanceDisks(self, instance)
2807 class LUReinstallInstance(LogicalUnit):
2808 """Reinstall an instance.
2811 HPATH = "instance-reinstall"
2812 HTYPE = constants.HTYPE_INSTANCE
2813 _OP_REQP = ["instance_name"]
2816 def ExpandNames(self):
2817 self._ExpandAndLockInstance()
2819 def BuildHooksEnv(self):
2822 This runs on master, primary and secondary nodes of the instance.
2825 env = _BuildInstanceHookEnvByObject(self, self.instance)
2826 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2829 def CheckPrereq(self):
2830 """Check prerequisites.
2832 This checks that the instance is in the cluster and is not running.
2835 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2836 assert instance is not None, \
2837 "Cannot retrieve locked instance %s" % self.op.instance_name
2838 _CheckNodeOnline(self, instance.primary_node)
2840 if instance.disk_template == constants.DT_DISKLESS:
2841 raise errors.OpPrereqError("Instance '%s' has no disks" %
2842 self.op.instance_name)
2843 if instance.admin_up:
2844 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2845 self.op.instance_name)
2846 remote_info = self.rpc.call_instance_info(instance.primary_node,
2848 instance.hypervisor)
2849 if remote_info.failed or remote_info.data:
2850 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2851 (self.op.instance_name,
2852 instance.primary_node))
2854 self.op.os_type = getattr(self.op, "os_type", None)
2855 if self.op.os_type is not None:
2857 pnode = self.cfg.GetNodeInfo(
2858 self.cfg.ExpandNodeName(instance.primary_node))
2860 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2862 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2864 if not isinstance(result.data, objects.OS):
2865 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2866 " primary node" % self.op.os_type)
2868 self.instance = instance
2870 def Exec(self, feedback_fn):
2871 """Reinstall the instance.
2874 inst = self.instance
2876 if self.op.os_type is not None:
2877 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2878 inst.os = self.op.os_type
2879 self.cfg.Update(inst)
2881 _StartInstanceDisks(self, inst, None)
2883 feedback_fn("Running the instance OS create scripts...")
2884 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2885 msg = result.RemoteFailMsg()
2887 raise errors.OpExecError("Could not install OS for instance %s"
2889 (inst.name, inst.primary_node, msg))
2891 _ShutdownInstanceDisks(self, inst)
2894 class LURenameInstance(LogicalUnit):
2895 """Rename an instance.
2898 HPATH = "instance-rename"
2899 HTYPE = constants.HTYPE_INSTANCE
2900 _OP_REQP = ["instance_name", "new_name"]
2902 def BuildHooksEnv(self):
2905 This runs on master, primary and secondary nodes of the instance.
2908 env = _BuildInstanceHookEnvByObject(self, self.instance)
2909 env["INSTANCE_NEW_NAME"] = self.op.new_name
2910 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2913 def CheckPrereq(self):
2914 """Check prerequisites.
2916 This checks that the instance is in the cluster and is not running.
2919 instance = self.cfg.GetInstanceInfo(
2920 self.cfg.ExpandInstanceName(self.op.instance_name))
2921 if instance is None:
2922 raise errors.OpPrereqError("Instance '%s' not known" %
2923 self.op.instance_name)
2924 _CheckNodeOnline(self, instance.primary_node)
2926 if instance.admin_up:
2927 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2928 self.op.instance_name)
2929 remote_info = self.rpc.call_instance_info(instance.primary_node,
2931 instance.hypervisor)
2933 if remote_info.data:
2934 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2935 (self.op.instance_name,
2936 instance.primary_node))
2937 self.instance = instance
2939 # new name verification
2940 name_info = utils.HostInfo(self.op.new_name)
2942 self.op.new_name = new_name = name_info.name
2943 instance_list = self.cfg.GetInstanceList()
2944 if new_name in instance_list:
2945 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2948 if not getattr(self.op, "ignore_ip", False):
2949 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2950 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2951 (name_info.ip, new_name))
2954 def Exec(self, feedback_fn):
2955 """Reinstall the instance.
2958 inst = self.instance
2959 old_name = inst.name
2961 if inst.disk_template == constants.DT_FILE:
2962 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2964 self.cfg.RenameInstance(inst.name, self.op.new_name)
2965 # Change the instance lock. This is definitely safe while we hold the BGL
2966 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2967 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2969 # re-read the instance from the configuration after rename
2970 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2972 if inst.disk_template == constants.DT_FILE:
2973 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2974 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2975 old_file_storage_dir,
2976 new_file_storage_dir)
2979 raise errors.OpExecError("Could not connect to node '%s' to rename"
2980 " directory '%s' to '%s' (but the instance"
2981 " has been renamed in Ganeti)" % (
2982 inst.primary_node, old_file_storage_dir,
2983 new_file_storage_dir))
2985 if not result.data[0]:
2986 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2987 " (but the instance has been renamed in"
2988 " Ganeti)" % (old_file_storage_dir,
2989 new_file_storage_dir))
2991 _StartInstanceDisks(self, inst, None)
2993 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2995 msg = result.RemoteFailMsg()
2997 msg = ("Could not run OS rename script for instance %s on node %s"
2998 " (but the instance has been renamed in Ganeti): %s" %
2999 (inst.name, inst.primary_node, msg))
3000 self.proc.LogWarning(msg)
3002 _ShutdownInstanceDisks(self, inst)
3005 class LURemoveInstance(LogicalUnit):
3006 """Remove an instance.
3009 HPATH = "instance-remove"
3010 HTYPE = constants.HTYPE_INSTANCE
3011 _OP_REQP = ["instance_name", "ignore_failures"]
3014 def ExpandNames(self):
3015 self._ExpandAndLockInstance()
3016 self.needed_locks[locking.LEVEL_NODE] = []
3017 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3019 def DeclareLocks(self, level):
3020 if level == locking.LEVEL_NODE:
3021 self._LockInstancesNodes()
3023 def BuildHooksEnv(self):
3026 This runs on master, primary and secondary nodes of the instance.
3029 env = _BuildInstanceHookEnvByObject(self, self.instance)
3030 nl = [self.cfg.GetMasterNode()]
3033 def CheckPrereq(self):
3034 """Check prerequisites.
3036 This checks that the instance is in the cluster.
3039 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3040 assert self.instance is not None, \
3041 "Cannot retrieve locked instance %s" % self.op.instance_name
3043 def Exec(self, feedback_fn):
3044 """Remove the instance.
3047 instance = self.instance
3048 logging.info("Shutting down instance %s on node %s",
3049 instance.name, instance.primary_node)
3051 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3052 if result.failed or not result.data:
3053 if self.op.ignore_failures:
3054 feedback_fn("Warning: can't shutdown instance")
3056 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3057 (instance.name, instance.primary_node))
3059 logging.info("Removing block devices for instance %s", instance.name)
3061 if not _RemoveDisks(self, instance):
3062 if self.op.ignore_failures:
3063 feedback_fn("Warning: can't remove instance's disks")
3065 raise errors.OpExecError("Can't remove instance's disks")
3067 logging.info("Removing instance %s out of cluster config", instance.name)
3069 self.cfg.RemoveInstance(instance.name)
3070 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3073 class LUQueryInstances(NoHooksLU):
3074 """Logical unit for querying instances.
3077 _OP_REQP = ["output_fields", "names"]
3079 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3080 "admin_state", "admin_ram",
3081 "disk_template", "ip", "mac", "bridge",
3082 "sda_size", "sdb_size", "vcpus", "tags",
3083 "network_port", "beparams",
3084 "(disk).(size)/([0-9]+)",
3086 "(nic).(mac|ip|bridge)/([0-9]+)",
3087 "(nic).(macs|ips|bridges)",
3088 "(disk|nic).(count)",
3089 "serial_no", "hypervisor", "hvparams",] +
3091 for name in constants.HVS_PARAMETERS] +
3093 for name in constants.BES_PARAMETERS])
3094 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3097 def ExpandNames(self):
3098 _CheckOutputFields(static=self._FIELDS_STATIC,
3099 dynamic=self._FIELDS_DYNAMIC,
3100 selected=self.op.output_fields)
3102 self.needed_locks = {}
3103 self.share_locks[locking.LEVEL_INSTANCE] = 1
3104 self.share_locks[locking.LEVEL_NODE] = 1
3107 self.wanted = _GetWantedInstances(self, self.op.names)
3109 self.wanted = locking.ALL_SET
3111 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3113 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3114 self.needed_locks[locking.LEVEL_NODE] = []
3115 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3117 def DeclareLocks(self, level):
3118 if level == locking.LEVEL_NODE and self.do_locking:
3119 self._LockInstancesNodes()
3121 def CheckPrereq(self):
3122 """Check prerequisites.
3127 def Exec(self, feedback_fn):
3128 """Computes the list of nodes and their attributes.
3131 all_info = self.cfg.GetAllInstancesInfo()
3132 if self.wanted == locking.ALL_SET:
3133 # caller didn't specify instance names, so ordering is not important
3135 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3137 instance_names = all_info.keys()
3138 instance_names = utils.NiceSort(instance_names)
3140 # caller did specify names, so we must keep the ordering
3142 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3144 tgt_set = all_info.keys()
3145 missing = set(self.wanted).difference(tgt_set)
3147 raise errors.OpExecError("Some instances were removed before"
3148 " retrieving their data: %s" % missing)
3149 instance_names = self.wanted
3151 instance_list = [all_info[iname] for iname in instance_names]
3153 # begin data gathering
3155 nodes = frozenset([inst.primary_node for inst in instance_list])
3156 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3162 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3164 result = node_data[name]
3166 # offline nodes will be in both lists
3167 off_nodes.append(name)
3169 bad_nodes.append(name)
3172 live_data.update(result.data)
3173 # else no instance is alive
3175 live_data = dict([(name, {}) for name in instance_names])
3177 # end data gathering
3182 for instance in instance_list:
3184 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3185 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3186 for field in self.op.output_fields:
3187 st_match = self._FIELDS_STATIC.Matches(field)
3192 elif field == "pnode":
3193 val = instance.primary_node
3194 elif field == "snodes":
3195 val = list(instance.secondary_nodes)
3196 elif field == "admin_state":
3197 val = instance.admin_up
3198 elif field == "oper_state":
3199 if instance.primary_node in bad_nodes:
3202 val = bool(live_data.get(instance.name))
3203 elif field == "status":
3204 if instance.primary_node in off_nodes:
3205 val = "ERROR_nodeoffline"
3206 elif instance.primary_node in bad_nodes:
3207 val = "ERROR_nodedown"
3209 running = bool(live_data.get(instance.name))
3211 if instance.admin_up:
3216 if instance.admin_up:
3220 elif field == "oper_ram":
3221 if instance.primary_node in bad_nodes:
3223 elif instance.name in live_data:
3224 val = live_data[instance.name].get("memory", "?")
3227 elif field == "disk_template":
3228 val = instance.disk_template
3230 val = instance.nics[0].ip
3231 elif field == "bridge":
3232 val = instance.nics[0].bridge
3233 elif field == "mac":
3234 val = instance.nics[0].mac
3235 elif field == "sda_size" or field == "sdb_size":
3236 idx = ord(field[2]) - ord('a')
3238 val = instance.FindDisk(idx).size
3239 except errors.OpPrereqError:
3241 elif field == "tags":
3242 val = list(instance.GetTags())
3243 elif field == "serial_no":
3244 val = instance.serial_no
3245 elif field == "network_port":
3246 val = instance.network_port
3247 elif field == "hypervisor":
3248 val = instance.hypervisor
3249 elif field == "hvparams":
3251 elif (field.startswith(HVPREFIX) and
3252 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3253 val = i_hv.get(field[len(HVPREFIX):], None)
3254 elif field == "beparams":
3256 elif (field.startswith(BEPREFIX) and
3257 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3258 val = i_be.get(field[len(BEPREFIX):], None)
3259 elif st_match and st_match.groups():
3260 # matches a variable list
3261 st_groups = st_match.groups()
3262 if st_groups and st_groups[0] == "disk":
3263 if st_groups[1] == "count":
3264 val = len(instance.disks)
3265 elif st_groups[1] == "sizes":
3266 val = [disk.size for disk in instance.disks]
3267 elif st_groups[1] == "size":
3269 val = instance.FindDisk(st_groups[2]).size
3270 except errors.OpPrereqError:
3273 assert False, "Unhandled disk parameter"
3274 elif st_groups[0] == "nic":
3275 if st_groups[1] == "count":
3276 val = len(instance.nics)
3277 elif st_groups[1] == "macs":
3278 val = [nic.mac for nic in instance.nics]
3279 elif st_groups[1] == "ips":
3280 val = [nic.ip for nic in instance.nics]
3281 elif st_groups[1] == "bridges":
3282 val = [nic.bridge for nic in instance.nics]
3285 nic_idx = int(st_groups[2])
3286 if nic_idx >= len(instance.nics):
3289 if st_groups[1] == "mac":
3290 val = instance.nics[nic_idx].mac
3291 elif st_groups[1] == "ip":
3292 val = instance.nics[nic_idx].ip
3293 elif st_groups[1] == "bridge":
3294 val = instance.nics[nic_idx].bridge
3296 assert False, "Unhandled NIC parameter"
3298 assert False, "Unhandled variable parameter"
3300 raise errors.ParameterError(field)
3307 class LUFailoverInstance(LogicalUnit):
3308 """Failover an instance.
3311 HPATH = "instance-failover"
3312 HTYPE = constants.HTYPE_INSTANCE
3313 _OP_REQP = ["instance_name", "ignore_consistency"]
3316 def ExpandNames(self):
3317 self._ExpandAndLockInstance()
3318 self.needed_locks[locking.LEVEL_NODE] = []
3319 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3321 def DeclareLocks(self, level):
3322 if level == locking.LEVEL_NODE:
3323 self._LockInstancesNodes()
3325 def BuildHooksEnv(self):
3328 This runs on master, primary and secondary nodes of the instance.
3332 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3334 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3335 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3338 def CheckPrereq(self):
3339 """Check prerequisites.
3341 This checks that the instance is in the cluster.
3344 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3345 assert self.instance is not None, \
3346 "Cannot retrieve locked instance %s" % self.op.instance_name
3348 bep = self.cfg.GetClusterInfo().FillBE(instance)
3349 if instance.disk_template not in constants.DTS_NET_MIRROR:
3350 raise errors.OpPrereqError("Instance's disk layout is not"
3351 " network mirrored, cannot failover.")
3353 secondary_nodes = instance.secondary_nodes
3354 if not secondary_nodes:
3355 raise errors.ProgrammerError("no secondary node but using "
3356 "a mirrored disk template")
3358 target_node = secondary_nodes[0]
3359 _CheckNodeOnline(self, target_node)
3360 # check memory requirements on the secondary node
3361 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3362 instance.name, bep[constants.BE_MEMORY],
3363 instance.hypervisor)
3365 # check bridge existance
3366 brlist = [nic.bridge for nic in instance.nics]
3367 result = self.rpc.call_bridges_exist(target_node, brlist)
3370 raise errors.OpPrereqError("One or more target bridges %s does not"
3371 " exist on destination node '%s'" %
3372 (brlist, target_node))
3374 def Exec(self, feedback_fn):
3375 """Failover an instance.
3377 The failover is done by shutting it down on its present node and
3378 starting it on the secondary.
3381 instance = self.instance
3383 source_node = instance.primary_node
3384 target_node = instance.secondary_nodes[0]
3386 feedback_fn("* checking disk consistency between source and target")
3387 for dev in instance.disks:
3388 # for drbd, these are drbd over lvm
3389 if not _CheckDiskConsistency(self, dev, target_node, False):
3390 if instance.admin_up and not self.op.ignore_consistency:
3391 raise errors.OpExecError("Disk %s is degraded on target node,"
3392 " aborting failover." % dev.iv_name)
3394 feedback_fn("* shutting down instance on source node")
3395 logging.info("Shutting down instance %s on node %s",
3396 instance.name, source_node)
3398 result = self.rpc.call_instance_shutdown(source_node, instance)
3399 if result.failed or not result.data:
3400 if self.op.ignore_consistency:
3401 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3403 " anyway. Please make sure node %s is down",
3404 instance.name, source_node, source_node)
3406 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3407 (instance.name, source_node))
3409 feedback_fn("* deactivating the instance's disks on source node")
3410 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3411 raise errors.OpExecError("Can't shut down the instance's disks.")
3413 instance.primary_node = target_node
3414 # distribute new instance config to the other nodes
3415 self.cfg.Update(instance)
3417 # Only start the instance if it's marked as up
3418 if instance.admin_up:
3419 feedback_fn("* activating the instance's disks on target node")
3420 logging.info("Starting instance %s on node %s",
3421 instance.name, target_node)
3423 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3424 ignore_secondaries=True)
3426 _ShutdownInstanceDisks(self, instance)
3427 raise errors.OpExecError("Can't activate the instance's disks")
3429 feedback_fn("* starting the instance on the target node")
3430 result = self.rpc.call_instance_start(target_node, instance, None)
3431 msg = result.RemoteFailMsg()
3433 _ShutdownInstanceDisks(self, instance)
3434 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3435 (instance.name, target_node, msg))
3438 class LUMigrateInstance(LogicalUnit):
3439 """Migrate an instance.
3441 This is migration without shutting down, compared to the failover,
3442 which is done with shutdown.
3445 HPATH = "instance-migrate"
3446 HTYPE = constants.HTYPE_INSTANCE
3447 _OP_REQP = ["instance_name", "live", "cleanup"]
3451 def ExpandNames(self):
3452 self._ExpandAndLockInstance()
3453 self.needed_locks[locking.LEVEL_NODE] = []
3454 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3456 def DeclareLocks(self, level):
3457 if level == locking.LEVEL_NODE:
3458 self._LockInstancesNodes()
3460 def BuildHooksEnv(self):
3463 This runs on master, primary and secondary nodes of the instance.
3466 env = _BuildInstanceHookEnvByObject(self, self.instance)
3467 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3470 def CheckPrereq(self):
3471 """Check prerequisites.
3473 This checks that the instance is in the cluster.
3476 instance = self.cfg.GetInstanceInfo(
3477 self.cfg.ExpandInstanceName(self.op.instance_name))
3478 if instance is None:
3479 raise errors.OpPrereqError("Instance '%s' not known" %
3480 self.op.instance_name)
3482 if instance.disk_template != constants.DT_DRBD8:
3483 raise errors.OpPrereqError("Instance's disk layout is not"
3484 " drbd8, cannot migrate.")
3486 secondary_nodes = instance.secondary_nodes
3487 if not secondary_nodes:
3488 raise errors.ProgrammerError("no secondary node but using "
3489 "drbd8 disk template")
3491 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3493 target_node = secondary_nodes[0]
3494 # check memory requirements on the secondary node
3495 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3496 instance.name, i_be[constants.BE_MEMORY],
3497 instance.hypervisor)
3499 # check bridge existance
3500 brlist = [nic.bridge for nic in instance.nics]
3501 result = self.rpc.call_bridges_exist(target_node, brlist)
3502 if result.failed or not result.data:
3503 raise errors.OpPrereqError("One or more target bridges %s does not"
3504 " exist on destination node '%s'" %
3505 (brlist, target_node))
3507 if not self.op.cleanup:
3508 result = self.rpc.call_instance_migratable(instance.primary_node,
3510 msg = result.RemoteFailMsg()
3512 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3515 self.instance = instance
3517 def _WaitUntilSync(self):
3518 """Poll with custom rpc for disk sync.
3520 This uses our own step-based rpc call.
3523 self.feedback_fn("* wait until resync is done")
3527 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3529 self.instance.disks)
3531 for node, nres in result.items():
3532 msg = nres.RemoteFailMsg()
3534 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3536 node_done, node_percent = nres.data[1]
3537 all_done = all_done and node_done
3538 if node_percent is not None:
3539 min_percent = min(min_percent, node_percent)
3541 if min_percent < 100:
3542 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3545 def _EnsureSecondary(self, node):
3546 """Demote a node to secondary.
3549 self.feedback_fn("* switching node %s to secondary mode" % node)
3551 for dev in self.instance.disks:
3552 self.cfg.SetDiskID(dev, node)
3554 result = self.rpc.call_blockdev_close(node, self.instance.name,
3555 self.instance.disks)
3556 msg = result.RemoteFailMsg()
3558 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3559 " error %s" % (node, msg))
3561 def _GoStandalone(self):
3562 """Disconnect from the network.
3565 self.feedback_fn("* changing into standalone mode")
3566 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3567 self.instance.disks)
3568 for node, nres in result.items():
3569 msg = nres.RemoteFailMsg()
3571 raise errors.OpExecError("Cannot disconnect disks node %s,"
3572 " error %s" % (node, msg))
3574 def _GoReconnect(self, multimaster):
3575 """Reconnect to the network.
3581 msg = "single-master"
3582 self.feedback_fn("* changing disks into %s mode" % msg)
3583 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3584 self.instance.disks,
3585 self.instance.name, multimaster)
3586 for node, nres in result.items():
3587 msg = nres.RemoteFailMsg()
3589 raise errors.OpExecError("Cannot change disks config on node %s,"
3590 " error: %s" % (node, msg))
3592 def _ExecCleanup(self):
3593 """Try to cleanup after a failed migration.
3595 The cleanup is done by:
3596 - check that the instance is running only on one node
3597 (and update the config if needed)
3598 - change disks on its secondary node to secondary
3599 - wait until disks are fully synchronized
3600 - disconnect from the network
3601 - change disks into single-master mode
3602 - wait again until disks are fully synchronized
3605 instance = self.instance
3606 target_node = self.target_node
3607 source_node = self.source_node
3609 # check running on only one node
3610 self.feedback_fn("* checking where the instance actually runs"
3611 " (if this hangs, the hypervisor might be in"
3613 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3614 for node, result in ins_l.items():
3616 if not isinstance(result.data, list):
3617 raise errors.OpExecError("Can't contact node '%s'" % node)
3619 runningon_source = instance.name in ins_l[source_node].data
3620 runningon_target = instance.name in ins_l[target_node].data
3622 if runningon_source and runningon_target:
3623 raise errors.OpExecError("Instance seems to be running on two nodes,"
3624 " or the hypervisor is confused. You will have"
3625 " to ensure manually that it runs only on one"
3626 " and restart this operation.")
3628 if not (runningon_source or runningon_target):
3629 raise errors.OpExecError("Instance does not seem to be running at all."
3630 " In this case, it's safer to repair by"
3631 " running 'gnt-instance stop' to ensure disk"
3632 " shutdown, and then restarting it.")
3634 if runningon_target:
3635 # the migration has actually succeeded, we need to update the config
3636 self.feedback_fn("* instance running on secondary node (%s),"
3637 " updating config" % target_node)
3638 instance.primary_node = target_node
3639 self.cfg.Update(instance)
3640 demoted_node = source_node
3642 self.feedback_fn("* instance confirmed to be running on its"
3643 " primary node (%s)" % source_node)
3644 demoted_node = target_node
3646 self._EnsureSecondary(demoted_node)
3648 self._WaitUntilSync()
3649 except errors.OpExecError:
3650 # we ignore here errors, since if the device is standalone, it
3651 # won't be able to sync
3653 self._GoStandalone()
3654 self._GoReconnect(False)
3655 self._WaitUntilSync()
3657 self.feedback_fn("* done")
3659 def _RevertDiskStatus(self):
3660 """Try to revert the disk status after a failed migration.
3663 target_node = self.target_node
3665 self._EnsureSecondary(target_node)
3666 self._GoStandalone()
3667 self._GoReconnect(False)
3668 self._WaitUntilSync()
3669 except errors.OpExecError, err:
3670 self.LogWarning("Migration failed and I can't reconnect the"
3671 " drives: error '%s'\n"
3672 "Please look and recover the instance status" %
3675 def _AbortMigration(self):
3676 """Call the hypervisor code to abort a started migration.
3679 instance = self.instance
3680 target_node = self.target_node
3681 migration_info = self.migration_info
3683 abort_result = self.rpc.call_finalize_migration(target_node,
3687 abort_msg = abort_result.RemoteFailMsg()
3689 logging.error("Aborting migration failed on target node %s: %s" %
3690 (target_node, abort_msg))
3691 # Don't raise an exception here, as we stil have to try to revert the
3692 # disk status, even if this step failed.
3694 def _ExecMigration(self):
3695 """Migrate an instance.
3697 The migrate is done by:
3698 - change the disks into dual-master mode
3699 - wait until disks are fully synchronized again
3700 - migrate the instance
3701 - change disks on the new secondary node (the old primary) to secondary
3702 - wait until disks are fully synchronized
3703 - change disks into single-master mode
3706 instance = self.instance
3707 target_node = self.target_node
3708 source_node = self.source_node
3710 self.feedback_fn("* checking disk consistency between source and target")
3711 for dev in instance.disks:
3712 if not _CheckDiskConsistency(self, dev, target_node, False):
3713 raise errors.OpExecError("Disk %s is degraded or not fully"
3714 " synchronized on target node,"
3715 " aborting migrate." % dev.iv_name)
3717 # First get the migration information from the remote node
3718 result = self.rpc.call_migration_info(source_node, instance)
3719 msg = result.RemoteFailMsg()
3721 log_err = ("Failed fetching source migration information from %s: %s" %
3723 logging.error(log_err)
3724 raise errors.OpExecError(log_err)
3726 self.migration_info = migration_info = result.data[1]
3728 # Then switch the disks to master/master mode
3729 self._EnsureSecondary(target_node)
3730 self._GoStandalone()
3731 self._GoReconnect(True)
3732 self._WaitUntilSync()
3734 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3735 result = self.rpc.call_accept_instance(target_node,
3738 self.nodes_ip[target_node])
3740 msg = result.RemoteFailMsg()
3742 logging.error("Instance pre-migration failed, trying to revert"
3743 " disk status: %s", msg)
3744 self._AbortMigration()
3745 self._RevertDiskStatus()
3746 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3747 (instance.name, msg))
3749 self.feedback_fn("* migrating instance to %s" % target_node)
3751 result = self.rpc.call_instance_migrate(source_node, instance,
3752 self.nodes_ip[target_node],
3754 msg = result.RemoteFailMsg()
3756 logging.error("Instance migration failed, trying to revert"
3757 " disk status: %s", msg)
3758 self._AbortMigration()
3759 self._RevertDiskStatus()
3760 raise errors.OpExecError("Could not migrate instance %s: %s" %
3761 (instance.name, msg))
3764 instance.primary_node = target_node
3765 # distribute new instance config to the other nodes
3766 self.cfg.Update(instance)
3768 result = self.rpc.call_finalize_migration(target_node,
3772 msg = result.RemoteFailMsg()
3774 logging.error("Instance migration succeeded, but finalization failed:"
3776 raise errors.OpExecError("Could not finalize instance migration: %s" %
3779 self._EnsureSecondary(source_node)
3780 self._WaitUntilSync()
3781 self._GoStandalone()
3782 self._GoReconnect(False)
3783 self._WaitUntilSync()
3785 self.feedback_fn("* done")
3787 def Exec(self, feedback_fn):
3788 """Perform the migration.
3791 self.feedback_fn = feedback_fn
3793 self.source_node = self.instance.primary_node
3794 self.target_node = self.instance.secondary_nodes[0]
3795 self.all_nodes = [self.source_node, self.target_node]
3797 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3798 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3801 return self._ExecCleanup()
3803 return self._ExecMigration()
3806 def _CreateBlockDev(lu, node, instance, device, force_create,
3808 """Create a tree of block devices on a given node.
3810 If this device type has to be created on secondaries, create it and
3813 If not, just recurse to children keeping the same 'force' value.
3815 @param lu: the lu on whose behalf we execute
3816 @param node: the node on which to create the device
3817 @type instance: L{objects.Instance}
3818 @param instance: the instance which owns the device
3819 @type device: L{objects.Disk}
3820 @param device: the device to create
3821 @type force_create: boolean
3822 @param force_create: whether to force creation of this device; this
3823 will be change to True whenever we find a device which has
3824 CreateOnSecondary() attribute
3825 @param info: the extra 'metadata' we should attach to the device
3826 (this will be represented as a LVM tag)
3827 @type force_open: boolean
3828 @param force_open: this parameter will be passes to the
3829 L{backend.CreateBlockDevice} function where it specifies
3830 whether we run on primary or not, and it affects both
3831 the child assembly and the device own Open() execution
3834 if device.CreateOnSecondary():
3838 for child in device.children:
3839 _CreateBlockDev(lu, node, instance, child, force_create,
3842 if not force_create:
3845 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3848 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3849 """Create a single block device on a given node.
3851 This will not recurse over children of the device, so they must be
3854 @param lu: the lu on whose behalf we execute
3855 @param node: the node on which to create the device
3856 @type instance: L{objects.Instance}
3857 @param instance: the instance which owns the device
3858 @type device: L{objects.Disk}
3859 @param device: the device to create
3860 @param info: the extra 'metadata' we should attach to the device
3861 (this will be represented as a LVM tag)
3862 @type force_open: boolean
3863 @param force_open: this parameter will be passes to the
3864 L{backend.CreateBlockDevice} function where it specifies
3865 whether we run on primary or not, and it affects both
3866 the child assembly and the device own Open() execution
3869 lu.cfg.SetDiskID(device, node)
3870 result = lu.rpc.call_blockdev_create(node, device, device.size,
3871 instance.name, force_open, info)
3872 msg = result.RemoteFailMsg()
3874 raise errors.OpExecError("Can't create block device %s on"
3875 " node %s for instance %s: %s" %
3876 (device, node, instance.name, msg))
3877 if device.physical_id is None:
3878 device.physical_id = result.data[1]
3881 def _GenerateUniqueNames(lu, exts):
3882 """Generate a suitable LV name.
3884 This will generate a logical volume name for the given instance.
3889 new_id = lu.cfg.GenerateUniqueID()
3890 results.append("%s%s" % (new_id, val))
3894 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3896 """Generate a drbd8 device complete with its children.
3899 port = lu.cfg.AllocatePort()
3900 vgname = lu.cfg.GetVGName()
3901 shared_secret = lu.cfg.GenerateDRBDSecret()
3902 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3903 logical_id=(vgname, names[0]))
3904 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3905 logical_id=(vgname, names[1]))
3906 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3907 logical_id=(primary, secondary, port,
3910 children=[dev_data, dev_meta],
3915 def _GenerateDiskTemplate(lu, template_name,
3916 instance_name, primary_node,
3917 secondary_nodes, disk_info,
3918 file_storage_dir, file_driver,
3920 """Generate the entire disk layout for a given template type.
3923 #TODO: compute space requirements
3925 vgname = lu.cfg.GetVGName()
3926 disk_count = len(disk_info)
3928 if template_name == constants.DT_DISKLESS:
3930 elif template_name == constants.DT_PLAIN:
3931 if len(secondary_nodes) != 0:
3932 raise errors.ProgrammerError("Wrong template configuration")
3934 names = _GenerateUniqueNames(lu, [".disk%d" % i
3935 for i in range(disk_count)])
3936 for idx, disk in enumerate(disk_info):
3937 disk_index = idx + base_index
3938 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3939 logical_id=(vgname, names[idx]),
3940 iv_name="disk/%d" % disk_index,
3942 disks.append(disk_dev)
3943 elif template_name == constants.DT_DRBD8:
3944 if len(secondary_nodes) != 1:
3945 raise errors.ProgrammerError("Wrong template configuration")
3946 remote_node = secondary_nodes[0]
3947 minors = lu.cfg.AllocateDRBDMinor(
3948 [primary_node, remote_node] * len(disk_info), instance_name)
3951 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3952 for i in range(disk_count)]):
3953 names.append(lv_prefix + "_data")
3954 names.append(lv_prefix + "_meta")
3955 for idx, disk in enumerate(disk_info):
3956 disk_index = idx + base_index
3957 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3958 disk["size"], names[idx*2:idx*2+2],
3959 "disk/%d" % disk_index,
3960 minors[idx*2], minors[idx*2+1])
3961 disk_dev.mode = disk["mode"]
3962 disks.append(disk_dev)
3963 elif template_name == constants.DT_FILE:
3964 if len(secondary_nodes) != 0:
3965 raise errors.ProgrammerError("Wrong template configuration")
3967 for idx, disk in enumerate(disk_info):
3968 disk_index = idx + base_index
3969 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3970 iv_name="disk/%d" % disk_index,
3971 logical_id=(file_driver,
3972 "%s/disk%d" % (file_storage_dir,
3975 disks.append(disk_dev)
3977 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3981 def _GetInstanceInfoText(instance):
3982 """Compute that text that should be added to the disk's metadata.
3985 return "originstname+%s" % instance.name
3988 def _CreateDisks(lu, instance):
3989 """Create all disks for an instance.
3991 This abstracts away some work from AddInstance.
3993 @type lu: L{LogicalUnit}
3994 @param lu: the logical unit on whose behalf we execute
3995 @type instance: L{objects.Instance}
3996 @param instance: the instance whose disks we should create
3998 @return: the success of the creation
4001 info = _GetInstanceInfoText(instance)
4002 pnode = instance.primary_node
4004 if instance.disk_template == constants.DT_FILE:
4005 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4006 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4008 if result.failed or not result.data:
4009 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4011 if not result.data[0]:
4012 raise errors.OpExecError("Failed to create directory '%s'" %
4015 # Note: this needs to be kept in sync with adding of disks in
4016 # LUSetInstanceParams
4017 for device in instance.disks:
4018 logging.info("Creating volume %s for instance %s",
4019 device.iv_name, instance.name)
4021 for node in instance.all_nodes:
4022 f_create = node == pnode
4023 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4026 def _RemoveDisks(lu, instance):
4027 """Remove all disks for an instance.
4029 This abstracts away some work from `AddInstance()` and
4030 `RemoveInstance()`. Note that in case some of the devices couldn't
4031 be removed, the removal will continue with the other ones (compare
4032 with `_CreateDisks()`).
4034 @type lu: L{LogicalUnit}
4035 @param lu: the logical unit on whose behalf we execute
4036 @type instance: L{objects.Instance}
4037 @param instance: the instance whose disks we should remove
4039 @return: the success of the removal
4042 logging.info("Removing block devices for instance %s", instance.name)
4045 for device in instance.disks:
4046 for node, disk in device.ComputeNodeTree(instance.primary_node):
4047 lu.cfg.SetDiskID(disk, node)
4048 result = lu.rpc.call_blockdev_remove(node, disk)
4049 if result.failed or not result.data:
4050 lu.proc.LogWarning("Could not remove block device %s on node %s,"
4051 " continuing anyway", device.iv_name, node)
4054 if instance.disk_template == constants.DT_FILE:
4055 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4056 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4058 if result.failed or not result.data:
4059 logging.error("Could not remove directory '%s'", file_storage_dir)
4065 def _ComputeDiskSize(disk_template, disks):
4066 """Compute disk size requirements in the volume group
4069 # Required free disk space as a function of disk and swap space
4071 constants.DT_DISKLESS: None,
4072 constants.DT_PLAIN: sum(d["size"] for d in disks),
4073 # 128 MB are added for drbd metadata for each disk
4074 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4075 constants.DT_FILE: None,
4078 if disk_template not in req_size_dict:
4079 raise errors.ProgrammerError("Disk template '%s' size requirement"
4080 " is unknown" % disk_template)
4082 return req_size_dict[disk_template]
4085 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4086 """Hypervisor parameter validation.
4088 This function abstract the hypervisor parameter validation to be
4089 used in both instance create and instance modify.
4091 @type lu: L{LogicalUnit}
4092 @param lu: the logical unit for which we check
4093 @type nodenames: list
4094 @param nodenames: the list of nodes on which we should check
4095 @type hvname: string
4096 @param hvname: the name of the hypervisor we should use
4097 @type hvparams: dict
4098 @param hvparams: the parameters which we need to check
4099 @raise errors.OpPrereqError: if the parameters are not valid
4102 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4105 for node in nodenames:
4110 if not info.data or not isinstance(info.data, (tuple, list)):
4111 raise errors.OpPrereqError("Cannot get current information"
4112 " from node '%s' (%s)" % (node, info.data))
4113 if not info.data[0]:
4114 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4115 " %s" % info.data[1])
4118 class LUCreateInstance(LogicalUnit):
4119 """Create an instance.
4122 HPATH = "instance-add"
4123 HTYPE = constants.HTYPE_INSTANCE
4124 _OP_REQP = ["instance_name", "disks", "disk_template",
4126 "wait_for_sync", "ip_check", "nics",
4127 "hvparams", "beparams"]
4130 def _ExpandNode(self, node):
4131 """Expands and checks one node name.
4134 node_full = self.cfg.ExpandNodeName(node)
4135 if node_full is None:
4136 raise errors.OpPrereqError("Unknown node %s" % node)
4139 def ExpandNames(self):
4140 """ExpandNames for CreateInstance.
4142 Figure out the right locks for instance creation.
4145 self.needed_locks = {}
4147 # set optional parameters to none if they don't exist
4148 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4149 if not hasattr(self.op, attr):
4150 setattr(self.op, attr, None)
4152 # cheap checks, mostly valid constants given
4154 # verify creation mode
4155 if self.op.mode not in (constants.INSTANCE_CREATE,
4156 constants.INSTANCE_IMPORT):
4157 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4160 # disk template and mirror node verification
4161 if self.op.disk_template not in constants.DISK_TEMPLATES:
4162 raise errors.OpPrereqError("Invalid disk template name")
4164 if self.op.hypervisor is None:
4165 self.op.hypervisor = self.cfg.GetHypervisorType()
4167 cluster = self.cfg.GetClusterInfo()
4168 enabled_hvs = cluster.enabled_hypervisors
4169 if self.op.hypervisor not in enabled_hvs:
4170 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4171 " cluster (%s)" % (self.op.hypervisor,
4172 ",".join(enabled_hvs)))
4174 # check hypervisor parameter syntax (locally)
4176 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4178 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4179 hv_type.CheckParameterSyntax(filled_hvp)
4181 # fill and remember the beparams dict
4182 utils.CheckBEParams(self.op.beparams)
4183 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4186 #### instance parameters check
4188 # instance name verification
4189 hostname1 = utils.HostInfo(self.op.instance_name)
4190 self.op.instance_name = instance_name = hostname1.name
4192 # this is just a preventive check, but someone might still add this
4193 # instance in the meantime, and creation will fail at lock-add time
4194 if instance_name in self.cfg.GetInstanceList():
4195 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4198 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4202 for nic in self.op.nics:
4203 # ip validity checks
4204 ip = nic.get("ip", None)
4205 if ip is None or ip.lower() == "none":
4207 elif ip.lower() == constants.VALUE_AUTO:
4208 nic_ip = hostname1.ip
4210 if not utils.IsValidIP(ip):
4211 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4212 " like a valid IP" % ip)
4215 # MAC address verification
4216 mac = nic.get("mac", constants.VALUE_AUTO)
4217 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4218 if not utils.IsValidMac(mac.lower()):
4219 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4221 # bridge verification
4222 bridge = nic.get("bridge", None)
4224 bridge = self.cfg.GetDefBridge()
4225 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4227 # disk checks/pre-build
4229 for disk in self.op.disks:
4230 mode = disk.get("mode", constants.DISK_RDWR)
4231 if mode not in constants.DISK_ACCESS_SET:
4232 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4234 size = disk.get("size", None)
4236 raise errors.OpPrereqError("Missing disk size")
4240 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4241 self.disks.append({"size": size, "mode": mode})
4243 # used in CheckPrereq for ip ping check
4244 self.check_ip = hostname1.ip
4246 # file storage checks
4247 if (self.op.file_driver and
4248 not self.op.file_driver in constants.FILE_DRIVER):
4249 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4250 self.op.file_driver)
4252 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4253 raise errors.OpPrereqError("File storage directory path not absolute")
4255 ### Node/iallocator related checks
4256 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4257 raise errors.OpPrereqError("One and only one of iallocator and primary"
4258 " node must be given")
4260 if self.op.iallocator:
4261 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4263 self.op.pnode = self._ExpandNode(self.op.pnode)
4264 nodelist = [self.op.pnode]
4265 if self.op.snode is not None:
4266 self.op.snode = self._ExpandNode(self.op.snode)
4267 nodelist.append(self.op.snode)
4268 self.needed_locks[locking.LEVEL_NODE] = nodelist
4270 # in case of import lock the source node too
4271 if self.op.mode == constants.INSTANCE_IMPORT:
4272 src_node = getattr(self.op, "src_node", None)
4273 src_path = getattr(self.op, "src_path", None)
4275 if src_path is None:
4276 self.op.src_path = src_path = self.op.instance_name
4278 if src_node is None:
4279 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4280 self.op.src_node = None
4281 if os.path.isabs(src_path):
4282 raise errors.OpPrereqError("Importing an instance from an absolute"
4283 " path requires a source node option.")
4285 self.op.src_node = src_node = self._ExpandNode(src_node)
4286 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4287 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4288 if not os.path.isabs(src_path):
4289 self.op.src_path = src_path = \
4290 os.path.join(constants.EXPORT_DIR, src_path)
4292 else: # INSTANCE_CREATE
4293 if getattr(self.op, "os_type", None) is None:
4294 raise errors.OpPrereqError("No guest OS specified")
4296 def _RunAllocator(self):
4297 """Run the allocator based on input opcode.
4300 nics = [n.ToDict() for n in self.nics]
4301 ial = IAllocator(self,
4302 mode=constants.IALLOCATOR_MODE_ALLOC,
4303 name=self.op.instance_name,
4304 disk_template=self.op.disk_template,
4307 vcpus=self.be_full[constants.BE_VCPUS],
4308 mem_size=self.be_full[constants.BE_MEMORY],
4311 hypervisor=self.op.hypervisor,
4314 ial.Run(self.op.iallocator)
4317 raise errors.OpPrereqError("Can't compute nodes using"
4318 " iallocator '%s': %s" % (self.op.iallocator,
4320 if len(ial.nodes) != ial.required_nodes:
4321 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4322 " of nodes (%s), required %s" %
4323 (self.op.iallocator, len(ial.nodes),
4324 ial.required_nodes))
4325 self.op.pnode = ial.nodes[0]
4326 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4327 self.op.instance_name, self.op.iallocator,
4328 ", ".join(ial.nodes))
4329 if ial.required_nodes == 2:
4330 self.op.snode = ial.nodes[1]
4332 def BuildHooksEnv(self):
4335 This runs on master, primary and secondary nodes of the instance.
4339 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4340 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4341 "INSTANCE_ADD_MODE": self.op.mode,
4343 if self.op.mode == constants.INSTANCE_IMPORT:
4344 env["INSTANCE_SRC_NODE"] = self.op.src_node
4345 env["INSTANCE_SRC_PATH"] = self.op.src_path
4346 env["INSTANCE_SRC_IMAGES"] = self.src_images
4348 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4349 primary_node=self.op.pnode,
4350 secondary_nodes=self.secondaries,
4351 status=self.instance_status,
4352 os_type=self.op.os_type,
4353 memory=self.be_full[constants.BE_MEMORY],
4354 vcpus=self.be_full[constants.BE_VCPUS],
4355 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4358 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4363 def CheckPrereq(self):
4364 """Check prerequisites.
4367 if (not self.cfg.GetVGName() and
4368 self.op.disk_template not in constants.DTS_NOT_LVM):
4369 raise errors.OpPrereqError("Cluster does not support lvm-based"
4373 if self.op.mode == constants.INSTANCE_IMPORT:
4374 src_node = self.op.src_node
4375 src_path = self.op.src_path
4377 if src_node is None:
4378 exp_list = self.rpc.call_export_list(
4379 self.acquired_locks[locking.LEVEL_NODE])
4381 for node in exp_list:
4382 if not exp_list[node].failed and src_path in exp_list[node].data:
4384 self.op.src_node = src_node = node
4385 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4389 raise errors.OpPrereqError("No export found for relative path %s" %
4392 _CheckNodeOnline(self, src_node)
4393 result = self.rpc.call_export_info(src_node, src_path)
4396 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4398 export_info = result.data
4399 if not export_info.has_section(constants.INISECT_EXP):
4400 raise errors.ProgrammerError("Corrupted export config")
4402 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4403 if (int(ei_version) != constants.EXPORT_VERSION):
4404 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4405 (ei_version, constants.EXPORT_VERSION))
4407 # Check that the new instance doesn't have less disks than the export
4408 instance_disks = len(self.disks)
4409 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4410 if instance_disks < export_disks:
4411 raise errors.OpPrereqError("Not enough disks to import."
4412 " (instance: %d, export: %d)" %
4413 (instance_disks, export_disks))
4415 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4417 for idx in range(export_disks):
4418 option = 'disk%d_dump' % idx
4419 if export_info.has_option(constants.INISECT_INS, option):
4420 # FIXME: are the old os-es, disk sizes, etc. useful?
4421 export_name = export_info.get(constants.INISECT_INS, option)
4422 image = os.path.join(src_path, export_name)
4423 disk_images.append(image)
4425 disk_images.append(False)
4427 self.src_images = disk_images
4429 old_name = export_info.get(constants.INISECT_INS, 'name')
4430 # FIXME: int() here could throw a ValueError on broken exports
4431 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4432 if self.op.instance_name == old_name:
4433 for idx, nic in enumerate(self.nics):
4434 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4435 nic_mac_ini = 'nic%d_mac' % idx
4436 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4438 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4439 if self.op.start and not self.op.ip_check:
4440 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4441 " adding an instance in start mode")
4443 if self.op.ip_check:
4444 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4445 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4446 (self.check_ip, self.op.instance_name))
4450 if self.op.iallocator is not None:
4451 self._RunAllocator()
4453 #### node related checks
4455 # check primary node
4456 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4457 assert self.pnode is not None, \
4458 "Cannot retrieve locked node %s" % self.op.pnode
4460 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4463 self.secondaries = []
4465 # mirror node verification
4466 if self.op.disk_template in constants.DTS_NET_MIRROR:
4467 if self.op.snode is None:
4468 raise errors.OpPrereqError("The networked disk templates need"
4470 if self.op.snode == pnode.name:
4471 raise errors.OpPrereqError("The secondary node cannot be"
4472 " the primary node.")
4473 self.secondaries.append(self.op.snode)
4474 _CheckNodeOnline(self, self.op.snode)
4476 nodenames = [pnode.name] + self.secondaries
4478 req_size = _ComputeDiskSize(self.op.disk_template,
4481 # Check lv size requirements
4482 if req_size is not None:
4483 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4485 for node in nodenames:
4486 info = nodeinfo[node]
4490 raise errors.OpPrereqError("Cannot get current information"
4491 " from node '%s'" % node)
4492 vg_free = info.get('vg_free', None)
4493 if not isinstance(vg_free, int):
4494 raise errors.OpPrereqError("Can't compute free disk space on"
4496 if req_size > info['vg_free']:
4497 raise errors.OpPrereqError("Not enough disk space on target node %s."
4498 " %d MB available, %d MB required" %
4499 (node, info['vg_free'], req_size))
4501 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4504 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4506 if not isinstance(result.data, objects.OS):
4507 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4508 " primary node" % self.op.os_type)
4510 # bridge check on primary node
4511 bridges = [n.bridge for n in self.nics]
4512 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4515 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4516 " exist on destination node '%s'" %
4517 (",".join(bridges), pnode.name))
4519 # memory check on primary node
4521 _CheckNodeFreeMemory(self, self.pnode.name,
4522 "creating instance %s" % self.op.instance_name,
4523 self.be_full[constants.BE_MEMORY],
4526 self.instance_status = self.op.start
4528 def Exec(self, feedback_fn):
4529 """Create and add the instance to the cluster.
4532 instance = self.op.instance_name
4533 pnode_name = self.pnode.name
4535 for nic in self.nics:
4536 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4537 nic.mac = self.cfg.GenerateMAC()
4539 ht_kind = self.op.hypervisor
4540 if ht_kind in constants.HTS_REQ_PORT:
4541 network_port = self.cfg.AllocatePort()
4545 ##if self.op.vnc_bind_address is None:
4546 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4548 # this is needed because os.path.join does not accept None arguments
4549 if self.op.file_storage_dir is None:
4550 string_file_storage_dir = ""
4552 string_file_storage_dir = self.op.file_storage_dir
4554 # build the full file storage dir path
4555 file_storage_dir = os.path.normpath(os.path.join(
4556 self.cfg.GetFileStorageDir(),
4557 string_file_storage_dir, instance))
4560 disks = _GenerateDiskTemplate(self,
4561 self.op.disk_template,
4562 instance, pnode_name,
4566 self.op.file_driver,
4569 iobj = objects.Instance(name=instance, os=self.op.os_type,
4570 primary_node=pnode_name,
4571 nics=self.nics, disks=disks,
4572 disk_template=self.op.disk_template,
4573 admin_up=self.instance_status,
4574 network_port=network_port,
4575 beparams=self.op.beparams,
4576 hvparams=self.op.hvparams,
4577 hypervisor=self.op.hypervisor,
4580 feedback_fn("* creating instance disks...")
4582 _CreateDisks(self, iobj)
4583 except errors.OpExecError:
4584 self.LogWarning("Device creation failed, reverting...")
4586 _RemoveDisks(self, iobj)
4588 self.cfg.ReleaseDRBDMinors(instance)
4591 feedback_fn("adding instance %s to cluster config" % instance)
4593 self.cfg.AddInstance(iobj)
4594 # Declare that we don't want to remove the instance lock anymore, as we've
4595 # added the instance to the config
4596 del self.remove_locks[locking.LEVEL_INSTANCE]
4597 # Unlock all the nodes
4598 if self.op.mode == constants.INSTANCE_IMPORT:
4599 nodes_keep = [self.op.src_node]
4600 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4601 if node != self.op.src_node]
4602 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4603 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4605 self.context.glm.release(locking.LEVEL_NODE)
4606 del self.acquired_locks[locking.LEVEL_NODE]
4608 if self.op.wait_for_sync:
4609 disk_abort = not _WaitForSync(self, iobj)
4610 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4611 # make sure the disks are not degraded (still sync-ing is ok)
4613 feedback_fn("* checking mirrors status")
4614 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4619 _RemoveDisks(self, iobj)
4620 self.cfg.RemoveInstance(iobj.name)
4621 # Make sure the instance lock gets removed
4622 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4623 raise errors.OpExecError("There are some degraded disks for"
4626 feedback_fn("creating os for instance %s on node %s" %
4627 (instance, pnode_name))
4629 if iobj.disk_template != constants.DT_DISKLESS:
4630 if self.op.mode == constants.INSTANCE_CREATE:
4631 feedback_fn("* running the instance OS create scripts...")
4632 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4633 msg = result.RemoteFailMsg()
4635 raise errors.OpExecError("Could not add os for instance %s"
4637 (instance, pnode_name, msg))
4639 elif self.op.mode == constants.INSTANCE_IMPORT:
4640 feedback_fn("* running the instance OS import scripts...")
4641 src_node = self.op.src_node
4642 src_images = self.src_images
4643 cluster_name = self.cfg.GetClusterName()
4644 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4645 src_node, src_images,
4647 import_result.Raise()
4648 for idx, result in enumerate(import_result.data):
4650 self.LogWarning("Could not import the image %s for instance"
4651 " %s, disk %d, on node %s" %
4652 (src_images[idx], instance, idx, pnode_name))
4654 # also checked in the prereq part
4655 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4659 logging.info("Starting instance %s on node %s", instance, pnode_name)
4660 feedback_fn("* starting instance...")
4661 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4662 msg = result.RemoteFailMsg()
4664 raise errors.OpExecError("Could not start instance: %s" % msg)
4667 class LUConnectConsole(NoHooksLU):
4668 """Connect to an instance's console.
4670 This is somewhat special in that it returns the command line that
4671 you need to run on the master node in order to connect to the
4675 _OP_REQP = ["instance_name"]
4678 def ExpandNames(self):
4679 self._ExpandAndLockInstance()
4681 def CheckPrereq(self):
4682 """Check prerequisites.
4684 This checks that the instance is in the cluster.
4687 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4688 assert self.instance is not None, \
4689 "Cannot retrieve locked instance %s" % self.op.instance_name
4690 _CheckNodeOnline(self, self.instance.primary_node)
4692 def Exec(self, feedback_fn):
4693 """Connect to the console of an instance
4696 instance = self.instance
4697 node = instance.primary_node
4699 node_insts = self.rpc.call_instance_list([node],
4700 [instance.hypervisor])[node]
4703 if instance.name not in node_insts.data:
4704 raise errors.OpExecError("Instance %s is not running." % instance.name)
4706 logging.debug("Connecting to console of %s on %s", instance.name, node)
4708 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4709 cluster = self.cfg.GetClusterInfo()
4710 # beparams and hvparams are passed separately, to avoid editing the
4711 # instance and then saving the defaults in the instance itself.
4712 hvparams = cluster.FillHV(instance)
4713 beparams = cluster.FillBE(instance)
4714 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4717 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4720 class LUReplaceDisks(LogicalUnit):
4721 """Replace the disks of an instance.
4724 HPATH = "mirrors-replace"
4725 HTYPE = constants.HTYPE_INSTANCE
4726 _OP_REQP = ["instance_name", "mode", "disks"]
4729 def CheckArguments(self):
4730 if not hasattr(self.op, "remote_node"):
4731 self.op.remote_node = None
4732 if not hasattr(self.op, "iallocator"):
4733 self.op.iallocator = None
4735 # check for valid parameter combination
4736 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4737 if self.op.mode == constants.REPLACE_DISK_CHG:
4739 raise errors.OpPrereqError("When changing the secondary either an"
4740 " iallocator script must be used or the"
4743 raise errors.OpPrereqError("Give either the iallocator or the new"
4744 " secondary, not both")
4745 else: # not replacing the secondary
4747 raise errors.OpPrereqError("The iallocator and new node options can"
4748 " be used only when changing the"
4751 def ExpandNames(self):
4752 self._ExpandAndLockInstance()
4754 if self.op.iallocator is not None:
4755 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4756 elif self.op.remote_node is not None:
4757 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4758 if remote_node is None:
4759 raise errors.OpPrereqError("Node '%s' not known" %
4760 self.op.remote_node)
4761 self.op.remote_node = remote_node
4762 # Warning: do not remove the locking of the new secondary here
4763 # unless DRBD8.AddChildren is changed to work in parallel;
4764 # currently it doesn't since parallel invocations of
4765 # FindUnusedMinor will conflict
4766 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4767 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4769 self.needed_locks[locking.LEVEL_NODE] = []
4770 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4772 def DeclareLocks(self, level):
4773 # If we're not already locking all nodes in the set we have to declare the
4774 # instance's primary/secondary nodes.
4775 if (level == locking.LEVEL_NODE and
4776 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4777 self._LockInstancesNodes()
4779 def _RunAllocator(self):
4780 """Compute a new secondary node using an IAllocator.
4783 ial = IAllocator(self,
4784 mode=constants.IALLOCATOR_MODE_RELOC,
4785 name=self.op.instance_name,
4786 relocate_from=[self.sec_node])
4788 ial.Run(self.op.iallocator)
4791 raise errors.OpPrereqError("Can't compute nodes using"
4792 " iallocator '%s': %s" % (self.op.iallocator,
4794 if len(ial.nodes) != ial.required_nodes:
4795 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4796 " of nodes (%s), required %s" %
4797 (len(ial.nodes), ial.required_nodes))
4798 self.op.remote_node = ial.nodes[0]
4799 self.LogInfo("Selected new secondary for the instance: %s",
4800 self.op.remote_node)
4802 def BuildHooksEnv(self):
4805 This runs on the master, the primary and all the secondaries.
4809 "MODE": self.op.mode,
4810 "NEW_SECONDARY": self.op.remote_node,
4811 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4813 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4815 self.cfg.GetMasterNode(),
4816 self.instance.primary_node,
4818 if self.op.remote_node is not None:
4819 nl.append(self.op.remote_node)
4822 def CheckPrereq(self):
4823 """Check prerequisites.
4825 This checks that the instance is in the cluster.
4828 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4829 assert instance is not None, \
4830 "Cannot retrieve locked instance %s" % self.op.instance_name
4831 self.instance = instance
4833 if instance.disk_template != constants.DT_DRBD8:
4834 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4837 if len(instance.secondary_nodes) != 1:
4838 raise errors.OpPrereqError("The instance has a strange layout,"
4839 " expected one secondary but found %d" %
4840 len(instance.secondary_nodes))
4842 self.sec_node = instance.secondary_nodes[0]
4844 if self.op.iallocator is not None:
4845 self._RunAllocator()
4847 remote_node = self.op.remote_node
4848 if remote_node is not None:
4849 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4850 assert self.remote_node_info is not None, \
4851 "Cannot retrieve locked node %s" % remote_node
4853 self.remote_node_info = None
4854 if remote_node == instance.primary_node:
4855 raise errors.OpPrereqError("The specified node is the primary node of"
4857 elif remote_node == self.sec_node:
4858 raise errors.OpPrereqError("The specified node is already the"
4859 " secondary node of the instance.")
4861 if self.op.mode == constants.REPLACE_DISK_PRI:
4862 n1 = self.tgt_node = instance.primary_node
4863 n2 = self.oth_node = self.sec_node
4864 elif self.op.mode == constants.REPLACE_DISK_SEC:
4865 n1 = self.tgt_node = self.sec_node
4866 n2 = self.oth_node = instance.primary_node
4867 elif self.op.mode == constants.REPLACE_DISK_CHG:
4868 n1 = self.new_node = remote_node
4869 n2 = self.oth_node = instance.primary_node
4870 self.tgt_node = self.sec_node
4872 raise errors.ProgrammerError("Unhandled disk replace mode")
4874 _CheckNodeOnline(self, n1)
4875 _CheckNodeOnline(self, n2)
4877 if not self.op.disks:
4878 self.op.disks = range(len(instance.disks))
4880 for disk_idx in self.op.disks:
4881 instance.FindDisk(disk_idx)
4883 def _ExecD8DiskOnly(self, feedback_fn):
4884 """Replace a disk on the primary or secondary for dbrd8.
4886 The algorithm for replace is quite complicated:
4888 1. for each disk to be replaced:
4890 1. create new LVs on the target node with unique names
4891 1. detach old LVs from the drbd device
4892 1. rename old LVs to name_replaced.<time_t>
4893 1. rename new LVs to old LVs
4894 1. attach the new LVs (with the old names now) to the drbd device
4896 1. wait for sync across all devices
4898 1. for each modified disk:
4900 1. remove old LVs (which have the name name_replaces.<time_t>)
4902 Failures are not very well handled.
4906 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4907 instance = self.instance
4909 vgname = self.cfg.GetVGName()
4912 tgt_node = self.tgt_node
4913 oth_node = self.oth_node
4915 # Step: check device activation
4916 self.proc.LogStep(1, steps_total, "check device existence")
4917 info("checking volume groups")
4918 my_vg = cfg.GetVGName()
4919 results = self.rpc.call_vg_list([oth_node, tgt_node])
4921 raise errors.OpExecError("Can't list volume groups on the nodes")
4922 for node in oth_node, tgt_node:
4924 if res.failed or not res.data or my_vg not in res.data:
4925 raise errors.OpExecError("Volume group '%s' not found on %s" %
4927 for idx, dev in enumerate(instance.disks):
4928 if idx not in self.op.disks:
4930 for node in tgt_node, oth_node:
4931 info("checking disk/%d on %s" % (idx, node))
4932 cfg.SetDiskID(dev, node)
4933 if not self.rpc.call_blockdev_find(node, dev):
4934 raise errors.OpExecError("Can't find disk/%d on node %s" %
4937 # Step: check other node consistency
4938 self.proc.LogStep(2, steps_total, "check peer consistency")
4939 for idx, dev in enumerate(instance.disks):
4940 if idx not in self.op.disks:
4942 info("checking disk/%d consistency on %s" % (idx, oth_node))
4943 if not _CheckDiskConsistency(self, dev, oth_node,
4944 oth_node==instance.primary_node):
4945 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4946 " to replace disks on this node (%s)" %
4947 (oth_node, tgt_node))
4949 # Step: create new storage
4950 self.proc.LogStep(3, steps_total, "allocate new storage")
4951 for idx, dev in enumerate(instance.disks):
4952 if idx not in self.op.disks:
4955 cfg.SetDiskID(dev, tgt_node)
4956 lv_names = [".disk%d_%s" % (idx, suf)
4957 for suf in ["data", "meta"]]
4958 names = _GenerateUniqueNames(self, lv_names)
4959 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4960 logical_id=(vgname, names[0]))
4961 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4962 logical_id=(vgname, names[1]))
4963 new_lvs = [lv_data, lv_meta]
4964 old_lvs = dev.children
4965 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4966 info("creating new local storage on %s for %s" %
4967 (tgt_node, dev.iv_name))
4968 # we pass force_create=True to force the LVM creation
4969 for new_lv in new_lvs:
4970 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4971 _GetInstanceInfoText(instance), False)
4973 # Step: for each lv, detach+rename*2+attach
4974 self.proc.LogStep(4, steps_total, "change drbd configuration")
4975 for dev, old_lvs, new_lvs in iv_names.itervalues():
4976 info("detaching %s drbd from local storage" % dev.iv_name)
4977 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4980 raise errors.OpExecError("Can't detach drbd from local storage on node"
4981 " %s for device %s" % (tgt_node, dev.iv_name))
4983 #cfg.Update(instance)
4985 # ok, we created the new LVs, so now we know we have the needed
4986 # storage; as such, we proceed on the target node to rename
4987 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4988 # using the assumption that logical_id == physical_id (which in
4989 # turn is the unique_id on that node)
4991 # FIXME(iustin): use a better name for the replaced LVs
4992 temp_suffix = int(time.time())
4993 ren_fn = lambda d, suff: (d.physical_id[0],
4994 d.physical_id[1] + "_replaced-%s" % suff)
4995 # build the rename list based on what LVs exist on the node
4997 for to_ren in old_lvs:
4998 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4999 if not find_res.failed and find_res.data is not None: # device exists
5000 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5002 info("renaming the old LVs on the target node")
5003 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5006 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5007 # now we rename the new LVs to the old LVs
5008 info("renaming the new LVs on the target node")
5009 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5010 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5013 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5015 for old, new in zip(old_lvs, new_lvs):
5016 new.logical_id = old.logical_id
5017 cfg.SetDiskID(new, tgt_node)
5019 for disk in old_lvs:
5020 disk.logical_id = ren_fn(disk, temp_suffix)
5021 cfg.SetDiskID(disk, tgt_node)
5023 # now that the new lvs have the old name, we can add them to the device
5024 info("adding new mirror component on %s" % tgt_node)
5025 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5026 if result.failed or not result.data:
5027 for new_lv in new_lvs:
5028 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5029 if result.failed or not result.data:
5030 warning("Can't rollback device %s", hint="manually cleanup unused"
5032 raise errors.OpExecError("Can't add local storage to drbd")
5034 dev.children = new_lvs
5035 cfg.Update(instance)
5037 # Step: wait for sync
5039 # this can fail as the old devices are degraded and _WaitForSync
5040 # does a combined result over all disks, so we don't check its
5042 self.proc.LogStep(5, steps_total, "sync devices")
5043 _WaitForSync(self, instance, unlock=True)
5045 # so check manually all the devices
5046 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5047 cfg.SetDiskID(dev, instance.primary_node)
5048 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5049 if result.failed or result.data[5]:
5050 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5052 # Step: remove old storage
5053 self.proc.LogStep(6, steps_total, "removing old storage")
5054 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5055 info("remove logical volumes for %s" % name)
5057 cfg.SetDiskID(lv, tgt_node)
5058 result = self.rpc.call_blockdev_remove(tgt_node, lv)
5059 if result.failed or not result.data:
5060 warning("Can't remove old LV", hint="manually remove unused LVs")
5063 def _ExecD8Secondary(self, feedback_fn):
5064 """Replace the secondary node for drbd8.
5066 The algorithm for replace is quite complicated:
5067 - for all disks of the instance:
5068 - create new LVs on the new node with same names
5069 - shutdown the drbd device on the old secondary
5070 - disconnect the drbd network on the primary
5071 - create the drbd device on the new secondary
5072 - network attach the drbd on the primary, using an artifice:
5073 the drbd code for Attach() will connect to the network if it
5074 finds a device which is connected to the good local disks but
5076 - wait for sync across all devices
5077 - remove all disks from the old secondary
5079 Failures are not very well handled.
5083 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5084 instance = self.instance
5088 old_node = self.tgt_node
5089 new_node = self.new_node
5090 pri_node = instance.primary_node
5092 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5093 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5094 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5097 # Step: check device activation
5098 self.proc.LogStep(1, steps_total, "check device existence")
5099 info("checking volume groups")
5100 my_vg = cfg.GetVGName()
5101 results = self.rpc.call_vg_list([pri_node, new_node])
5102 for node in pri_node, new_node:
5104 if res.failed or not res.data or my_vg not in res.data:
5105 raise errors.OpExecError("Volume group '%s' not found on %s" %
5107 for idx, dev in enumerate(instance.disks):
5108 if idx not in self.op.disks:
5110 info("checking disk/%d on %s" % (idx, pri_node))
5111 cfg.SetDiskID(dev, pri_node)
5112 result = self.rpc.call_blockdev_find(pri_node, dev)
5115 raise errors.OpExecError("Can't find disk/%d on node %s" %
5118 # Step: check other node consistency
5119 self.proc.LogStep(2, steps_total, "check peer consistency")
5120 for idx, dev in enumerate(instance.disks):
5121 if idx not in self.op.disks:
5123 info("checking disk/%d consistency on %s" % (idx, pri_node))
5124 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5125 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5126 " unsafe to replace the secondary" %
5129 # Step: create new storage
5130 self.proc.LogStep(3, steps_total, "allocate new storage")
5131 for idx, dev in enumerate(instance.disks):
5132 info("adding new local storage on %s for disk/%d" %
5134 # we pass force_create=True to force LVM creation
5135 for new_lv in dev.children:
5136 _CreateBlockDev(self, new_node, instance, new_lv, True,
5137 _GetInstanceInfoText(instance), False)
5139 # Step 4: dbrd minors and drbd setups changes
5140 # after this, we must manually remove the drbd minors on both the
5141 # error and the success paths
5142 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5144 logging.debug("Allocated minors %s" % (minors,))
5145 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5146 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5148 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5149 # create new devices on new_node; note that we create two IDs:
5150 # one without port, so the drbd will be activated without
5151 # networking information on the new node at this stage, and one
5152 # with network, for the latter activation in step 4
5153 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5154 if pri_node == o_node1:
5159 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5160 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5162 iv_names[idx] = (dev, dev.children, new_net_id)
5163 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5165 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5166 logical_id=new_alone_id,
5167 children=dev.children)
5169 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5170 _GetInstanceInfoText(instance), False)
5171 except errors.BlockDeviceError:
5172 self.cfg.ReleaseDRBDMinors(instance.name)
5175 for idx, dev in enumerate(instance.disks):
5176 # we have new devices, shutdown the drbd on the old secondary
5177 info("shutting down drbd for disk/%d on old node" % idx)
5178 cfg.SetDiskID(dev, old_node)
5179 result = self.rpc.call_blockdev_shutdown(old_node, dev)
5180 if result.failed or not result.data:
5181 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5182 hint="Please cleanup this device manually as soon as possible")
5184 info("detaching primary drbds from the network (=> standalone)")
5185 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5186 instance.disks)[pri_node]
5188 msg = result.RemoteFailMsg()
5190 # detaches didn't succeed (unlikely)
5191 self.cfg.ReleaseDRBDMinors(instance.name)
5192 raise errors.OpExecError("Can't detach the disks from the network on"
5193 " old node: %s" % (msg,))
5195 # if we managed to detach at least one, we update all the disks of
5196 # the instance to point to the new secondary
5197 info("updating instance configuration")
5198 for dev, _, new_logical_id in iv_names.itervalues():
5199 dev.logical_id = new_logical_id
5200 cfg.SetDiskID(dev, pri_node)
5201 cfg.Update(instance)
5203 # and now perform the drbd attach
5204 info("attaching primary drbds to new secondary (standalone => connected)")
5205 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5206 instance.disks, instance.name,
5208 for to_node, to_result in result.items():
5209 msg = to_result.RemoteFailMsg()
5211 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5212 hint="please do a gnt-instance info to see the"
5215 # this can fail as the old devices are degraded and _WaitForSync
5216 # does a combined result over all disks, so we don't check its
5218 self.proc.LogStep(5, steps_total, "sync devices")
5219 _WaitForSync(self, instance, unlock=True)
5221 # so check manually all the devices
5222 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5223 cfg.SetDiskID(dev, pri_node)
5224 result = self.rpc.call_blockdev_find(pri_node, dev)
5227 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5229 self.proc.LogStep(6, steps_total, "removing old storage")
5230 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5231 info("remove logical volumes for disk/%d" % idx)
5233 cfg.SetDiskID(lv, old_node)
5234 result = self.rpc.call_blockdev_remove(old_node, lv)
5235 if result.failed or not result.data:
5236 warning("Can't remove LV on old secondary",
5237 hint="Cleanup stale volumes by hand")
5239 def Exec(self, feedback_fn):
5240 """Execute disk replacement.
5242 This dispatches the disk replacement to the appropriate handler.
5245 instance = self.instance
5247 # Activate the instance disks if we're replacing them on a down instance
5248 if not instance.admin_up:
5249 _StartInstanceDisks(self, instance, True)
5251 if self.op.mode == constants.REPLACE_DISK_CHG:
5252 fn = self._ExecD8Secondary
5254 fn = self._ExecD8DiskOnly
5256 ret = fn(feedback_fn)
5258 # Deactivate the instance disks if we're replacing them on a down instance
5259 if not instance.admin_up:
5260 _SafeShutdownInstanceDisks(self, instance)
5265 class LUGrowDisk(LogicalUnit):
5266 """Grow a disk of an instance.
5270 HTYPE = constants.HTYPE_INSTANCE
5271 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5274 def ExpandNames(self):
5275 self._ExpandAndLockInstance()
5276 self.needed_locks[locking.LEVEL_NODE] = []
5277 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5279 def DeclareLocks(self, level):
5280 if level == locking.LEVEL_NODE:
5281 self._LockInstancesNodes()
5283 def BuildHooksEnv(self):
5286 This runs on the master, the primary and all the secondaries.
5290 "DISK": self.op.disk,
5291 "AMOUNT": self.op.amount,
5293 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5295 self.cfg.GetMasterNode(),
5296 self.instance.primary_node,
5300 def CheckPrereq(self):
5301 """Check prerequisites.
5303 This checks that the instance is in the cluster.
5306 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5307 assert instance is not None, \
5308 "Cannot retrieve locked instance %s" % self.op.instance_name
5309 nodenames = list(instance.all_nodes)
5310 for node in nodenames:
5311 _CheckNodeOnline(self, node)
5314 self.instance = instance
5316 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5317 raise errors.OpPrereqError("Instance's disk layout does not support"
5320 self.disk = instance.FindDisk(self.op.disk)
5322 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5323 instance.hypervisor)
5324 for node in nodenames:
5325 info = nodeinfo[node]
5326 if info.failed or not info.data:
5327 raise errors.OpPrereqError("Cannot get current information"
5328 " from node '%s'" % node)
5329 vg_free = info.data.get('vg_free', None)
5330 if not isinstance(vg_free, int):
5331 raise errors.OpPrereqError("Can't compute free disk space on"
5333 if self.op.amount > vg_free:
5334 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5335 " %d MiB available, %d MiB required" %
5336 (node, vg_free, self.op.amount))
5338 def Exec(self, feedback_fn):
5339 """Execute disk grow.
5342 instance = self.instance
5344 for node in instance.all_nodes:
5345 self.cfg.SetDiskID(disk, node)
5346 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5348 if (not result.data or not isinstance(result.data, (list, tuple)) or
5349 len(result.data) != 2):
5350 raise errors.OpExecError("Grow request failed to node %s" % node)
5351 elif not result.data[0]:
5352 raise errors.OpExecError("Grow request failed to node %s: %s" %
5353 (node, result.data[1]))
5354 disk.RecordGrow(self.op.amount)
5355 self.cfg.Update(instance)
5356 if self.op.wait_for_sync:
5357 disk_abort = not _WaitForSync(self, instance)
5359 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5360 " status.\nPlease check the instance.")
5363 class LUQueryInstanceData(NoHooksLU):
5364 """Query runtime instance data.
5367 _OP_REQP = ["instances", "static"]
5370 def ExpandNames(self):
5371 self.needed_locks = {}
5372 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5374 if not isinstance(self.op.instances, list):
5375 raise errors.OpPrereqError("Invalid argument type 'instances'")
5377 if self.op.instances:
5378 self.wanted_names = []
5379 for name in self.op.instances:
5380 full_name = self.cfg.ExpandInstanceName(name)
5381 if full_name is None:
5382 raise errors.OpPrereqError("Instance '%s' not known" % name)
5383 self.wanted_names.append(full_name)
5384 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5386 self.wanted_names = None
5387 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5389 self.needed_locks[locking.LEVEL_NODE] = []
5390 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5392 def DeclareLocks(self, level):
5393 if level == locking.LEVEL_NODE:
5394 self._LockInstancesNodes()
5396 def CheckPrereq(self):
5397 """Check prerequisites.
5399 This only checks the optional instance list against the existing names.
5402 if self.wanted_names is None:
5403 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5405 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5406 in self.wanted_names]
5409 def _ComputeDiskStatus(self, instance, snode, dev):
5410 """Compute block device status.
5413 static = self.op.static
5415 self.cfg.SetDiskID(dev, instance.primary_node)
5416 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5418 dev_pstatus = dev_pstatus.data
5422 if dev.dev_type in constants.LDS_DRBD:
5423 # we change the snode then (otherwise we use the one passed in)
5424 if dev.logical_id[0] == instance.primary_node:
5425 snode = dev.logical_id[1]
5427 snode = dev.logical_id[0]
5429 if snode and not static:
5430 self.cfg.SetDiskID(dev, snode)
5431 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5433 dev_sstatus = dev_sstatus.data
5438 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5439 for child in dev.children]
5444 "iv_name": dev.iv_name,
5445 "dev_type": dev.dev_type,
5446 "logical_id": dev.logical_id,
5447 "physical_id": dev.physical_id,
5448 "pstatus": dev_pstatus,
5449 "sstatus": dev_sstatus,
5450 "children": dev_children,
5456 def Exec(self, feedback_fn):
5457 """Gather and return data"""
5460 cluster = self.cfg.GetClusterInfo()
5462 for instance in self.wanted_instances:
5463 if not self.op.static:
5464 remote_info = self.rpc.call_instance_info(instance.primary_node,
5466 instance.hypervisor)
5468 remote_info = remote_info.data
5469 if remote_info and "state" in remote_info:
5472 remote_state = "down"
5475 if instance.admin_up:
5478 config_state = "down"
5480 disks = [self._ComputeDiskStatus(instance, None, device)
5481 for device in instance.disks]
5484 "name": instance.name,
5485 "config_state": config_state,
5486 "run_state": remote_state,
5487 "pnode": instance.primary_node,
5488 "snodes": instance.secondary_nodes,
5490 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5492 "hypervisor": instance.hypervisor,
5493 "network_port": instance.network_port,
5494 "hv_instance": instance.hvparams,
5495 "hv_actual": cluster.FillHV(instance),
5496 "be_instance": instance.beparams,
5497 "be_actual": cluster.FillBE(instance),
5500 result[instance.name] = idict
5505 class LUSetInstanceParams(LogicalUnit):
5506 """Modifies an instances's parameters.
5509 HPATH = "instance-modify"
5510 HTYPE = constants.HTYPE_INSTANCE
5511 _OP_REQP = ["instance_name"]
5514 def CheckArguments(self):
5515 if not hasattr(self.op, 'nics'):
5517 if not hasattr(self.op, 'disks'):
5519 if not hasattr(self.op, 'beparams'):
5520 self.op.beparams = {}
5521 if not hasattr(self.op, 'hvparams'):
5522 self.op.hvparams = {}
5523 self.op.force = getattr(self.op, "force", False)
5524 if not (self.op.nics or self.op.disks or
5525 self.op.hvparams or self.op.beparams):
5526 raise errors.OpPrereqError("No changes submitted")
5528 utils.CheckBEParams(self.op.beparams)
5532 for disk_op, disk_dict in self.op.disks:
5533 if disk_op == constants.DDM_REMOVE:
5536 elif disk_op == constants.DDM_ADD:
5539 if not isinstance(disk_op, int):
5540 raise errors.OpPrereqError("Invalid disk index")
5541 if disk_op == constants.DDM_ADD:
5542 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5543 if mode not in constants.DISK_ACCESS_SET:
5544 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5545 size = disk_dict.get('size', None)
5547 raise errors.OpPrereqError("Required disk parameter size missing")
5550 except ValueError, err:
5551 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5553 disk_dict['size'] = size
5555 # modification of disk
5556 if 'size' in disk_dict:
5557 raise errors.OpPrereqError("Disk size change not possible, use"
5560 if disk_addremove > 1:
5561 raise errors.OpPrereqError("Only one disk add or remove operation"
5562 " supported at a time")
5566 for nic_op, nic_dict in self.op.nics:
5567 if nic_op == constants.DDM_REMOVE:
5570 elif nic_op == constants.DDM_ADD:
5573 if not isinstance(nic_op, int):
5574 raise errors.OpPrereqError("Invalid nic index")
5576 # nic_dict should be a dict
5577 nic_ip = nic_dict.get('ip', None)
5578 if nic_ip is not None:
5579 if nic_ip.lower() == "none":
5580 nic_dict['ip'] = None
5582 if not utils.IsValidIP(nic_ip):
5583 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5584 # we can only check None bridges and assign the default one
5585 nic_bridge = nic_dict.get('bridge', None)
5586 if nic_bridge is None:
5587 nic_dict['bridge'] = self.cfg.GetDefBridge()
5588 # but we can validate MACs
5589 nic_mac = nic_dict.get('mac', None)
5590 if nic_mac is not None:
5591 if self.cfg.IsMacInUse(nic_mac):
5592 raise errors.OpPrereqError("MAC address %s already in use"
5593 " in cluster" % nic_mac)
5594 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5595 if not utils.IsValidMac(nic_mac):
5596 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5597 if nic_addremove > 1:
5598 raise errors.OpPrereqError("Only one NIC add or remove operation"
5599 " supported at a time")
5601 def ExpandNames(self):
5602 self._ExpandAndLockInstance()
5603 self.needed_locks[locking.LEVEL_NODE] = []
5604 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5606 def DeclareLocks(self, level):
5607 if level == locking.LEVEL_NODE:
5608 self._LockInstancesNodes()
5610 def BuildHooksEnv(self):
5613 This runs on the master, primary and secondaries.
5617 if constants.BE_MEMORY in self.be_new:
5618 args['memory'] = self.be_new[constants.BE_MEMORY]
5619 if constants.BE_VCPUS in self.be_new:
5620 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5621 # FIXME: readd disk/nic changes
5622 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5623 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5626 def CheckPrereq(self):
5627 """Check prerequisites.
5629 This only checks the instance list against the existing names.
5632 force = self.force = self.op.force
5634 # checking the new params on the primary/secondary nodes
5636 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5637 assert self.instance is not None, \
5638 "Cannot retrieve locked instance %s" % self.op.instance_name
5639 pnode = instance.primary_node
5640 nodelist = list(instance.all_nodes)
5642 # hvparams processing
5643 if self.op.hvparams:
5644 i_hvdict = copy.deepcopy(instance.hvparams)
5645 for key, val in self.op.hvparams.iteritems():
5646 if val == constants.VALUE_DEFAULT:
5651 elif val == constants.VALUE_NONE:
5652 i_hvdict[key] = None
5655 cluster = self.cfg.GetClusterInfo()
5656 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5659 hypervisor.GetHypervisor(
5660 instance.hypervisor).CheckParameterSyntax(hv_new)
5661 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5662 self.hv_new = hv_new # the new actual values
5663 self.hv_inst = i_hvdict # the new dict (without defaults)
5665 self.hv_new = self.hv_inst = {}
5667 # beparams processing
5668 if self.op.beparams:
5669 i_bedict = copy.deepcopy(instance.beparams)
5670 for key, val in self.op.beparams.iteritems():
5671 if val == constants.VALUE_DEFAULT:
5678 cluster = self.cfg.GetClusterInfo()
5679 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5681 self.be_new = be_new # the new actual values
5682 self.be_inst = i_bedict # the new dict (without defaults)
5684 self.be_new = self.be_inst = {}
5688 if constants.BE_MEMORY in self.op.beparams and not self.force:
5689 mem_check_list = [pnode]
5690 if be_new[constants.BE_AUTO_BALANCE]:
5691 # either we changed auto_balance to yes or it was from before
5692 mem_check_list.extend(instance.secondary_nodes)
5693 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5694 instance.hypervisor)
5695 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5696 instance.hypervisor)
5697 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5698 # Assume the primary node is unreachable and go ahead
5699 self.warn.append("Can't get info from primary node %s" % pnode)
5701 if not instance_info.failed and instance_info.data:
5702 current_mem = instance_info.data['memory']
5704 # Assume instance not running
5705 # (there is a slight race condition here, but it's not very probable,
5706 # and we have no other way to check)
5708 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5709 nodeinfo[pnode].data['memory_free'])
5711 raise errors.OpPrereqError("This change will prevent the instance"
5712 " from starting, due to %d MB of memory"
5713 " missing on its primary node" % miss_mem)
5715 if be_new[constants.BE_AUTO_BALANCE]:
5716 for node, nres in nodeinfo.iteritems():
5717 if node not in instance.secondary_nodes:
5719 if nres.failed or not isinstance(nres.data, dict):
5720 self.warn.append("Can't get info from secondary node %s" % node)
5721 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5722 self.warn.append("Not enough memory to failover instance to"
5723 " secondary node %s" % node)
5726 for nic_op, nic_dict in self.op.nics:
5727 if nic_op == constants.DDM_REMOVE:
5728 if not instance.nics:
5729 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5731 if nic_op != constants.DDM_ADD:
5733 if nic_op < 0 or nic_op >= len(instance.nics):
5734 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5736 (nic_op, len(instance.nics)))
5737 nic_bridge = nic_dict.get('bridge', None)
5738 if nic_bridge is not None:
5739 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5740 msg = ("Bridge '%s' doesn't exist on one of"
5741 " the instance nodes" % nic_bridge)
5743 self.warn.append(msg)
5745 raise errors.OpPrereqError(msg)
5748 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5749 raise errors.OpPrereqError("Disk operations not supported for"
5750 " diskless instances")
5751 for disk_op, disk_dict in self.op.disks:
5752 if disk_op == constants.DDM_REMOVE:
5753 if len(instance.disks) == 1:
5754 raise errors.OpPrereqError("Cannot remove the last disk of"
5756 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5757 ins_l = ins_l[pnode]
5758 if ins_l.failed or not isinstance(ins_l.data, list):
5759 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5760 if instance.name in ins_l.data:
5761 raise errors.OpPrereqError("Instance is running, can't remove"
5764 if (disk_op == constants.DDM_ADD and
5765 len(instance.nics) >= constants.MAX_DISKS):
5766 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5767 " add more" % constants.MAX_DISKS)
5768 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5770 if disk_op < 0 or disk_op >= len(instance.disks):
5771 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5773 (disk_op, len(instance.disks)))
5777 def Exec(self, feedback_fn):
5778 """Modifies an instance.
5780 All parameters take effect only at the next restart of the instance.
5783 # Process here the warnings from CheckPrereq, as we don't have a
5784 # feedback_fn there.
5785 for warn in self.warn:
5786 feedback_fn("WARNING: %s" % warn)
5789 instance = self.instance
5791 for disk_op, disk_dict in self.op.disks:
5792 if disk_op == constants.DDM_REMOVE:
5793 # remove the last disk
5794 device = instance.disks.pop()
5795 device_idx = len(instance.disks)
5796 for node, disk in device.ComputeNodeTree(instance.primary_node):
5797 self.cfg.SetDiskID(disk, node)
5798 rpc_result = self.rpc.call_blockdev_remove(node, disk)
5799 if rpc_result.failed or not rpc_result.data:
5800 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5801 " continuing anyway", device_idx, node)
5802 result.append(("disk/%d" % device_idx, "remove"))
5803 elif disk_op == constants.DDM_ADD:
5805 if instance.disk_template == constants.DT_FILE:
5806 file_driver, file_path = instance.disks[0].logical_id
5807 file_path = os.path.dirname(file_path)
5809 file_driver = file_path = None
5810 disk_idx_base = len(instance.disks)
5811 new_disk = _GenerateDiskTemplate(self,
5812 instance.disk_template,
5813 instance.name, instance.primary_node,
5814 instance.secondary_nodes,
5819 instance.disks.append(new_disk)
5820 info = _GetInstanceInfoText(instance)
5822 logging.info("Creating volume %s for instance %s",
5823 new_disk.iv_name, instance.name)
5824 # Note: this needs to be kept in sync with _CreateDisks
5826 for node in instance.all_nodes:
5827 f_create = node == instance.primary_node
5829 _CreateBlockDev(self, node, instance, new_disk,
5830 f_create, info, f_create)
5831 except errors.OpExecError, err:
5832 self.LogWarning("Failed to create volume %s (%s) on"
5834 new_disk.iv_name, new_disk, node, err)
5835 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5836 (new_disk.size, new_disk.mode)))
5838 # change a given disk
5839 instance.disks[disk_op].mode = disk_dict['mode']
5840 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5842 for nic_op, nic_dict in self.op.nics:
5843 if nic_op == constants.DDM_REMOVE:
5844 # remove the last nic
5845 del instance.nics[-1]
5846 result.append(("nic.%d" % len(instance.nics), "remove"))
5847 elif nic_op == constants.DDM_ADD:
5849 if 'mac' not in nic_dict:
5850 mac = constants.VALUE_GENERATE
5852 mac = nic_dict['mac']
5853 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5854 mac = self.cfg.GenerateMAC()
5855 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5856 bridge=nic_dict.get('bridge', None))
5857 instance.nics.append(new_nic)
5858 result.append(("nic.%d" % (len(instance.nics) - 1),
5859 "add:mac=%s,ip=%s,bridge=%s" %
5860 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5862 # change a given nic
5863 for key in 'mac', 'ip', 'bridge':
5865 setattr(instance.nics[nic_op], key, nic_dict[key])
5866 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5869 if self.op.hvparams:
5870 instance.hvparams = self.hv_new
5871 for key, val in self.op.hvparams.iteritems():
5872 result.append(("hv/%s" % key, val))
5875 if self.op.beparams:
5876 instance.beparams = self.be_inst
5877 for key, val in self.op.beparams.iteritems():
5878 result.append(("be/%s" % key, val))
5880 self.cfg.Update(instance)
5885 class LUQueryExports(NoHooksLU):
5886 """Query the exports list
5889 _OP_REQP = ['nodes']
5892 def ExpandNames(self):
5893 self.needed_locks = {}
5894 self.share_locks[locking.LEVEL_NODE] = 1
5895 if not self.op.nodes:
5896 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5898 self.needed_locks[locking.LEVEL_NODE] = \
5899 _GetWantedNodes(self, self.op.nodes)
5901 def CheckPrereq(self):
5902 """Check prerequisites.
5905 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5907 def Exec(self, feedback_fn):
5908 """Compute the list of all the exported system images.
5911 @return: a dictionary with the structure node->(export-list)
5912 where export-list is a list of the instances exported on
5916 rpcresult = self.rpc.call_export_list(self.nodes)
5918 for node in rpcresult:
5919 if rpcresult[node].failed:
5920 result[node] = False
5922 result[node] = rpcresult[node].data
5927 class LUExportInstance(LogicalUnit):
5928 """Export an instance to an image in the cluster.
5931 HPATH = "instance-export"
5932 HTYPE = constants.HTYPE_INSTANCE
5933 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5936 def ExpandNames(self):
5937 self._ExpandAndLockInstance()
5938 # FIXME: lock only instance primary and destination node
5940 # Sad but true, for now we have do lock all nodes, as we don't know where
5941 # the previous export might be, and and in this LU we search for it and
5942 # remove it from its current node. In the future we could fix this by:
5943 # - making a tasklet to search (share-lock all), then create the new one,
5944 # then one to remove, after
5945 # - removing the removal operation altoghether
5946 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5948 def DeclareLocks(self, level):
5949 """Last minute lock declaration."""
5950 # All nodes are locked anyway, so nothing to do here.
5952 def BuildHooksEnv(self):
5955 This will run on the master, primary node and target node.
5959 "EXPORT_NODE": self.op.target_node,
5960 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5962 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5963 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5964 self.op.target_node]
5967 def CheckPrereq(self):
5968 """Check prerequisites.
5970 This checks that the instance and node names are valid.
5973 instance_name = self.op.instance_name
5974 self.instance = self.cfg.GetInstanceInfo(instance_name)
5975 assert self.instance is not None, \
5976 "Cannot retrieve locked instance %s" % self.op.instance_name
5977 _CheckNodeOnline(self, self.instance.primary_node)
5979 self.dst_node = self.cfg.GetNodeInfo(
5980 self.cfg.ExpandNodeName(self.op.target_node))
5982 if self.dst_node is None:
5983 # This is wrong node name, not a non-locked node
5984 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5985 _CheckNodeOnline(self, self.dst_node.name)
5987 # instance disk type verification
5988 for disk in self.instance.disks:
5989 if disk.dev_type == constants.LD_FILE:
5990 raise errors.OpPrereqError("Export not supported for instances with"
5991 " file-based disks")
5993 def Exec(self, feedback_fn):
5994 """Export an instance to an image in the cluster.
5997 instance = self.instance
5998 dst_node = self.dst_node
5999 src_node = instance.primary_node
6000 if self.op.shutdown:
6001 # shutdown the instance, but not the disks
6002 result = self.rpc.call_instance_shutdown(src_node, instance)
6005 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
6006 (instance.name, src_node))
6008 vgname = self.cfg.GetVGName()
6012 # set the disks ID correctly since call_instance_start needs the
6013 # correct drbd minor to create the symlinks
6014 for disk in instance.disks:
6015 self.cfg.SetDiskID(disk, src_node)
6018 for disk in instance.disks:
6019 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6020 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6021 if new_dev_name.failed or not new_dev_name.data:
6022 self.LogWarning("Could not snapshot block device %s on node %s",
6023 disk.logical_id[1], src_node)
6024 snap_disks.append(False)
6026 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6027 logical_id=(vgname, new_dev_name.data),
6028 physical_id=(vgname, new_dev_name.data),
6029 iv_name=disk.iv_name)
6030 snap_disks.append(new_dev)
6033 if self.op.shutdown and instance.admin_up:
6034 result = self.rpc.call_instance_start(src_node, instance, None)
6035 msg = result.RemoteFailMsg()
6037 _ShutdownInstanceDisks(self, instance)
6038 raise errors.OpExecError("Could not start instance: %s" % msg)
6040 # TODO: check for size
6042 cluster_name = self.cfg.GetClusterName()
6043 for idx, dev in enumerate(snap_disks):
6045 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6046 instance, cluster_name, idx)
6047 if result.failed or not result.data:
6048 self.LogWarning("Could not export block device %s from node %s to"
6049 " node %s", dev.logical_id[1], src_node,
6051 result = self.rpc.call_blockdev_remove(src_node, dev)
6052 if result.failed or not result.data:
6053 self.LogWarning("Could not remove snapshot block device %s from node"
6054 " %s", dev.logical_id[1], src_node)
6056 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6057 if result.failed or not result.data:
6058 self.LogWarning("Could not finalize export for instance %s on node %s",
6059 instance.name, dst_node.name)
6061 nodelist = self.cfg.GetNodeList()
6062 nodelist.remove(dst_node.name)
6064 # on one-node clusters nodelist will be empty after the removal
6065 # if we proceed the backup would be removed because OpQueryExports
6066 # substitutes an empty list with the full cluster node list.
6068 exportlist = self.rpc.call_export_list(nodelist)
6069 for node in exportlist:
6070 if exportlist[node].failed:
6072 if instance.name in exportlist[node].data:
6073 if not self.rpc.call_export_remove(node, instance.name):
6074 self.LogWarning("Could not remove older export for instance %s"
6075 " on node %s", instance.name, node)
6078 class LURemoveExport(NoHooksLU):
6079 """Remove exports related to the named instance.
6082 _OP_REQP = ["instance_name"]
6085 def ExpandNames(self):
6086 self.needed_locks = {}
6087 # We need all nodes to be locked in order for RemoveExport to work, but we
6088 # don't need to lock the instance itself, as nothing will happen to it (and
6089 # we can remove exports also for a removed instance)
6090 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6092 def CheckPrereq(self):
6093 """Check prerequisites.
6097 def Exec(self, feedback_fn):
6098 """Remove any export.
6101 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6102 # If the instance was not found we'll try with the name that was passed in.
6103 # This will only work if it was an FQDN, though.
6105 if not instance_name:
6107 instance_name = self.op.instance_name
6109 exportlist = self.rpc.call_export_list(self.acquired_locks[
6110 locking.LEVEL_NODE])
6112 for node in exportlist:
6113 if exportlist[node].failed:
6114 self.LogWarning("Failed to query node %s, continuing" % node)
6116 if instance_name in exportlist[node].data:
6118 result = self.rpc.call_export_remove(node, instance_name)
6119 if result.failed or not result.data:
6120 logging.error("Could not remove export for instance %s"
6121 " on node %s", instance_name, node)
6123 if fqdn_warn and not found:
6124 feedback_fn("Export not found. If trying to remove an export belonging"
6125 " to a deleted instance please use its Fully Qualified"
6129 class TagsLU(NoHooksLU):
6132 This is an abstract class which is the parent of all the other tags LUs.
6136 def ExpandNames(self):
6137 self.needed_locks = {}
6138 if self.op.kind == constants.TAG_NODE:
6139 name = self.cfg.ExpandNodeName(self.op.name)
6141 raise errors.OpPrereqError("Invalid node name (%s)" %
6144 self.needed_locks[locking.LEVEL_NODE] = name
6145 elif self.op.kind == constants.TAG_INSTANCE:
6146 name = self.cfg.ExpandInstanceName(self.op.name)
6148 raise errors.OpPrereqError("Invalid instance name (%s)" %
6151 self.needed_locks[locking.LEVEL_INSTANCE] = name
6153 def CheckPrereq(self):
6154 """Check prerequisites.
6157 if self.op.kind == constants.TAG_CLUSTER:
6158 self.target = self.cfg.GetClusterInfo()
6159 elif self.op.kind == constants.TAG_NODE:
6160 self.target = self.cfg.GetNodeInfo(self.op.name)
6161 elif self.op.kind == constants.TAG_INSTANCE:
6162 self.target = self.cfg.GetInstanceInfo(self.op.name)
6164 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6168 class LUGetTags(TagsLU):
6169 """Returns the tags of a given object.
6172 _OP_REQP = ["kind", "name"]
6175 def Exec(self, feedback_fn):
6176 """Returns the tag list.
6179 return list(self.target.GetTags())
6182 class LUSearchTags(NoHooksLU):
6183 """Searches the tags for a given pattern.
6186 _OP_REQP = ["pattern"]
6189 def ExpandNames(self):
6190 self.needed_locks = {}
6192 def CheckPrereq(self):
6193 """Check prerequisites.
6195 This checks the pattern passed for validity by compiling it.
6199 self.re = re.compile(self.op.pattern)
6200 except re.error, err:
6201 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6202 (self.op.pattern, err))
6204 def Exec(self, feedback_fn):
6205 """Returns the tag list.
6209 tgts = [("/cluster", cfg.GetClusterInfo())]
6210 ilist = cfg.GetAllInstancesInfo().values()
6211 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6212 nlist = cfg.GetAllNodesInfo().values()
6213 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6215 for path, target in tgts:
6216 for tag in target.GetTags():
6217 if self.re.search(tag):
6218 results.append((path, tag))
6222 class LUAddTags(TagsLU):
6223 """Sets a tag on a given object.
6226 _OP_REQP = ["kind", "name", "tags"]
6229 def CheckPrereq(self):
6230 """Check prerequisites.
6232 This checks the type and length of the tag name and value.
6235 TagsLU.CheckPrereq(self)
6236 for tag in self.op.tags:
6237 objects.TaggableObject.ValidateTag(tag)
6239 def Exec(self, feedback_fn):
6244 for tag in self.op.tags:
6245 self.target.AddTag(tag)
6246 except errors.TagError, err:
6247 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6249 self.cfg.Update(self.target)
6250 except errors.ConfigurationError:
6251 raise errors.OpRetryError("There has been a modification to the"
6252 " config file and the operation has been"
6253 " aborted. Please retry.")
6256 class LUDelTags(TagsLU):
6257 """Delete a list of tags from a given object.
6260 _OP_REQP = ["kind", "name", "tags"]
6263 def CheckPrereq(self):
6264 """Check prerequisites.
6266 This checks that we have the given tag.
6269 TagsLU.CheckPrereq(self)
6270 for tag in self.op.tags:
6271 objects.TaggableObject.ValidateTag(tag)
6272 del_tags = frozenset(self.op.tags)
6273 cur_tags = self.target.GetTags()
6274 if not del_tags <= cur_tags:
6275 diff_tags = del_tags - cur_tags
6276 diff_names = ["'%s'" % tag for tag in diff_tags]
6278 raise errors.OpPrereqError("Tag(s) %s not found" %
6279 (",".join(diff_names)))
6281 def Exec(self, feedback_fn):
6282 """Remove the tag from the object.
6285 for tag in self.op.tags:
6286 self.target.RemoveTag(tag)
6288 self.cfg.Update(self.target)
6289 except errors.ConfigurationError:
6290 raise errors.OpRetryError("There has been a modification to the"
6291 " config file and the operation has been"
6292 " aborted. Please retry.")
6295 class LUTestDelay(NoHooksLU):
6296 """Sleep for a specified amount of time.
6298 This LU sleeps on the master and/or nodes for a specified amount of
6302 _OP_REQP = ["duration", "on_master", "on_nodes"]
6305 def ExpandNames(self):
6306 """Expand names and set required locks.
6308 This expands the node list, if any.
6311 self.needed_locks = {}
6312 if self.op.on_nodes:
6313 # _GetWantedNodes can be used here, but is not always appropriate to use
6314 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6316 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6317 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6319 def CheckPrereq(self):
6320 """Check prerequisites.
6324 def Exec(self, feedback_fn):
6325 """Do the actual sleep.
6328 if self.op.on_master:
6329 if not utils.TestDelay(self.op.duration):
6330 raise errors.OpExecError("Error during master delay test")
6331 if self.op.on_nodes:
6332 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6334 raise errors.OpExecError("Complete failure from rpc call")
6335 for node, node_result in result.items():
6337 if not node_result.data:
6338 raise errors.OpExecError("Failure during rpc call to node %s,"
6339 " result: %s" % (node, node_result.data))
6342 class IAllocator(object):
6343 """IAllocator framework.
6345 An IAllocator instance has three sets of attributes:
6346 - cfg that is needed to query the cluster
6347 - input data (all members of the _KEYS class attribute are required)
6348 - four buffer attributes (in|out_data|text), that represent the
6349 input (to the external script) in text and data structure format,
6350 and the output from it, again in two formats
6351 - the result variables from the script (success, info, nodes) for
6356 "mem_size", "disks", "disk_template",
6357 "os", "tags", "nics", "vcpus", "hypervisor",
6363 def __init__(self, lu, mode, name, **kwargs):
6365 # init buffer variables
6366 self.in_text = self.out_text = self.in_data = self.out_data = None
6367 # init all input fields so that pylint is happy
6370 self.mem_size = self.disks = self.disk_template = None
6371 self.os = self.tags = self.nics = self.vcpus = None
6372 self.hypervisor = None
6373 self.relocate_from = None
6375 self.required_nodes = None
6376 # init result fields
6377 self.success = self.info = self.nodes = None
6378 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6379 keyset = self._ALLO_KEYS
6380 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6381 keyset = self._RELO_KEYS
6383 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6384 " IAllocator" % self.mode)
6386 if key not in keyset:
6387 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6388 " IAllocator" % key)
6389 setattr(self, key, kwargs[key])
6391 if key not in kwargs:
6392 raise errors.ProgrammerError("Missing input parameter '%s' to"
6393 " IAllocator" % key)
6394 self._BuildInputData()
6396 def _ComputeClusterData(self):
6397 """Compute the generic allocator input data.
6399 This is the data that is independent of the actual operation.
6403 cluster_info = cfg.GetClusterInfo()
6407 "cluster_name": cfg.GetClusterName(),
6408 "cluster_tags": list(cluster_info.GetTags()),
6409 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6410 # we don't have job IDs
6412 iinfo = cfg.GetAllInstancesInfo().values()
6413 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6417 node_list = cfg.GetNodeList()
6419 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6420 hypervisor_name = self.hypervisor
6421 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6422 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6424 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6426 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6427 cluster_info.enabled_hypervisors)
6428 for nname, nresult in node_data.items():
6429 # first fill in static (config-based) values
6430 ninfo = cfg.GetNodeInfo(nname)
6432 "tags": list(ninfo.GetTags()),
6433 "primary_ip": ninfo.primary_ip,
6434 "secondary_ip": ninfo.secondary_ip,
6435 "offline": ninfo.offline,
6436 "master_candidate": ninfo.master_candidate,
6439 if not ninfo.offline:
6441 if not isinstance(nresult.data, dict):
6442 raise errors.OpExecError("Can't get data for node %s" % nname)
6443 remote_info = nresult.data
6444 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6445 'vg_size', 'vg_free', 'cpu_total']:
6446 if attr not in remote_info:
6447 raise errors.OpExecError("Node '%s' didn't return attribute"
6448 " '%s'" % (nname, attr))
6450 remote_info[attr] = int(remote_info[attr])
6451 except ValueError, err:
6452 raise errors.OpExecError("Node '%s' returned invalid value"
6453 " for '%s': %s" % (nname, attr, err))
6454 # compute memory used by primary instances
6455 i_p_mem = i_p_up_mem = 0
6456 for iinfo, beinfo in i_list:
6457 if iinfo.primary_node == nname:
6458 i_p_mem += beinfo[constants.BE_MEMORY]
6459 if iinfo.name not in node_iinfo[nname].data:
6462 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6463 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6464 remote_info['memory_free'] -= max(0, i_mem_diff)
6467 i_p_up_mem += beinfo[constants.BE_MEMORY]
6469 # compute memory used by instances
6471 "total_memory": remote_info['memory_total'],
6472 "reserved_memory": remote_info['memory_dom0'],
6473 "free_memory": remote_info['memory_free'],
6474 "total_disk": remote_info['vg_size'],
6475 "free_disk": remote_info['vg_free'],
6476 "total_cpus": remote_info['cpu_total'],
6477 "i_pri_memory": i_p_mem,
6478 "i_pri_up_memory": i_p_up_mem,
6482 node_results[nname] = pnr
6483 data["nodes"] = node_results
6487 for iinfo, beinfo in i_list:
6488 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6489 for n in iinfo.nics]
6491 "tags": list(iinfo.GetTags()),
6492 "admin_up": iinfo.admin_up,
6493 "vcpus": beinfo[constants.BE_VCPUS],
6494 "memory": beinfo[constants.BE_MEMORY],
6496 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6498 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6499 "disk_template": iinfo.disk_template,
6500 "hypervisor": iinfo.hypervisor,
6502 instance_data[iinfo.name] = pir
6504 data["instances"] = instance_data
6508 def _AddNewInstance(self):
6509 """Add new instance data to allocator structure.
6511 This in combination with _AllocatorGetClusterData will create the
6512 correct structure needed as input for the allocator.
6514 The checks for the completeness of the opcode must have already been
6520 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6522 if self.disk_template in constants.DTS_NET_MIRROR:
6523 self.required_nodes = 2
6525 self.required_nodes = 1
6529 "disk_template": self.disk_template,
6532 "vcpus": self.vcpus,
6533 "memory": self.mem_size,
6534 "disks": self.disks,
6535 "disk_space_total": disk_space,
6537 "required_nodes": self.required_nodes,
6539 data["request"] = request
6541 def _AddRelocateInstance(self):
6542 """Add relocate instance data to allocator structure.
6544 This in combination with _IAllocatorGetClusterData will create the
6545 correct structure needed as input for the allocator.
6547 The checks for the completeness of the opcode must have already been
6551 instance = self.lu.cfg.GetInstanceInfo(self.name)
6552 if instance is None:
6553 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6554 " IAllocator" % self.name)
6556 if instance.disk_template not in constants.DTS_NET_MIRROR:
6557 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6559 if len(instance.secondary_nodes) != 1:
6560 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6562 self.required_nodes = 1
6563 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6564 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6569 "disk_space_total": disk_space,
6570 "required_nodes": self.required_nodes,
6571 "relocate_from": self.relocate_from,
6573 self.in_data["request"] = request
6575 def _BuildInputData(self):
6576 """Build input data structures.
6579 self._ComputeClusterData()
6581 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6582 self._AddNewInstance()
6584 self._AddRelocateInstance()
6586 self.in_text = serializer.Dump(self.in_data)
6588 def Run(self, name, validate=True, call_fn=None):
6589 """Run an instance allocator and return the results.
6593 call_fn = self.lu.rpc.call_iallocator_runner
6596 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6599 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6600 raise errors.OpExecError("Invalid result from master iallocator runner")
6602 rcode, stdout, stderr, fail = result.data
6604 if rcode == constants.IARUN_NOTFOUND:
6605 raise errors.OpExecError("Can't find allocator '%s'" % name)
6606 elif rcode == constants.IARUN_FAILURE:
6607 raise errors.OpExecError("Instance allocator call failed: %s,"
6608 " output: %s" % (fail, stdout+stderr))
6609 self.out_text = stdout
6611 self._ValidateResult()
6613 def _ValidateResult(self):
6614 """Process the allocator results.
6616 This will process and if successful save the result in
6617 self.out_data and the other parameters.
6621 rdict = serializer.Load(self.out_text)
6622 except Exception, err:
6623 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6625 if not isinstance(rdict, dict):
6626 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6628 for key in "success", "info", "nodes":
6629 if key not in rdict:
6630 raise errors.OpExecError("Can't parse iallocator results:"
6631 " missing key '%s'" % key)
6632 setattr(self, key, rdict[key])
6634 if not isinstance(rdict["nodes"], list):
6635 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6637 self.out_data = rdict
6640 class LUTestAllocator(NoHooksLU):
6641 """Run allocator tests.
6643 This LU runs the allocator tests
6646 _OP_REQP = ["direction", "mode", "name"]
6648 def CheckPrereq(self):
6649 """Check prerequisites.
6651 This checks the opcode parameters depending on the director and mode test.
6654 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6655 for attr in ["name", "mem_size", "disks", "disk_template",
6656 "os", "tags", "nics", "vcpus"]:
6657 if not hasattr(self.op, attr):
6658 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6660 iname = self.cfg.ExpandInstanceName(self.op.name)
6661 if iname is not None:
6662 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6664 if not isinstance(self.op.nics, list):
6665 raise errors.OpPrereqError("Invalid parameter 'nics'")
6666 for row in self.op.nics:
6667 if (not isinstance(row, dict) or
6670 "bridge" not in row):
6671 raise errors.OpPrereqError("Invalid contents of the"
6672 " 'nics' parameter")
6673 if not isinstance(self.op.disks, list):
6674 raise errors.OpPrereqError("Invalid parameter 'disks'")
6675 for row in self.op.disks:
6676 if (not isinstance(row, dict) or
6677 "size" not in row or
6678 not isinstance(row["size"], int) or
6679 "mode" not in row or
6680 row["mode"] not in ['r', 'w']):
6681 raise errors.OpPrereqError("Invalid contents of the"
6682 " 'disks' parameter")
6683 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6684 self.op.hypervisor = self.cfg.GetHypervisorType()
6685 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6686 if not hasattr(self.op, "name"):
6687 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6688 fname = self.cfg.ExpandInstanceName(self.op.name)
6690 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6692 self.op.name = fname
6693 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6695 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6698 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6699 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6700 raise errors.OpPrereqError("Missing allocator name")
6701 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6702 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6705 def Exec(self, feedback_fn):
6706 """Run the allocator test.
6709 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6710 ial = IAllocator(self,
6713 mem_size=self.op.mem_size,
6714 disks=self.op.disks,
6715 disk_template=self.op.disk_template,
6719 vcpus=self.op.vcpus,
6720 hypervisor=self.op.hypervisor,
6723 ial = IAllocator(self,
6726 relocate_from=list(self.relocate_from),
6729 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6730 result = ial.in_text
6732 ial.Run(self.op.allocator, validate=False)
6733 result = ial.out_text