4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.info("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.info("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
610 def _NICListToTuple(lu, nics):
611 """Build a list of nic information tuples.
613 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
614 value in LUQueryInstanceData.
616 @type lu: L{LogicalUnit}
617 @param lu: the logical unit on whose behalf we execute
618 @type nics: list of L{objects.NIC}
619 @param nics: list of nics to convert to hooks tuples
623 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
627 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
628 mode = filled_params[constants.NIC_MODE]
629 link = filled_params[constants.NIC_LINK]
630 hooks_nics.append((ip, mac, mode, link))
633 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
634 """Builds instance related env variables for hooks from an object.
636 @type lu: L{LogicalUnit}
637 @param lu: the logical unit on whose behalf we execute
638 @type instance: L{objects.Instance}
639 @param instance: the instance for which we should build the
642 @param override: dictionary with key/values that will override
645 @return: the hook environment dictionary
648 cluster = lu.cfg.GetClusterInfo()
649 bep = cluster.FillBE(instance)
650 hvp = cluster.FillHV(instance)
652 'name': instance.name,
653 'primary_node': instance.primary_node,
654 'secondary_nodes': instance.secondary_nodes,
655 'os_type': instance.os,
656 'status': instance.admin_up,
657 'memory': bep[constants.BE_MEMORY],
658 'vcpus': bep[constants.BE_VCPUS],
659 'nics': _NICListToTuple(lu, instance.nics),
660 'disk_template': instance.disk_template,
661 'disks': [(disk.size, disk.mode) for disk in instance.disks],
664 'hypervisor_name': instance.hypervisor,
667 args.update(override)
668 return _BuildInstanceHookEnv(**args)
671 def _AdjustCandidatePool(lu):
672 """Adjust the candidate pool after node operations.
675 mod_list = lu.cfg.MaintainCandidatePool()
677 lu.LogInfo("Promoted nodes to master candidate role: %s",
678 ", ".join(node.name for node in mod_list))
679 for name in mod_list:
680 lu.context.ReaddNode(name)
681 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
683 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
687 def _CheckNicsBridgesExist(lu, target_nics, target_node,
688 profile=constants.PP_DEFAULT):
689 """Check that the brigdes needed by a list of nics exist.
692 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
693 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
694 for nic in target_nics]
695 brlist = [params[constants.NIC_LINK] for params in paramslist
696 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
698 result = lu.rpc.call_bridges_exist(target_node, brlist)
699 result.Raise("Error checking bridges on destination node '%s'" %
700 target_node, prereq=True)
703 def _CheckInstanceBridgesExist(lu, instance, node=None):
704 """Check that the brigdes needed by an instance exist.
708 node = instance.primary_node
709 _CheckNicsBridgesExist(lu, instance.nics, node)
712 def _GetNodeSecondaryInstances(cfg, node_name):
713 """Returns secondary instances on a node.
718 for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
719 if node_name in inst.secondary_nodes:
720 instances.append(inst)
725 class LUDestroyCluster(NoHooksLU):
726 """Logical unit for destroying the cluster.
731 def CheckPrereq(self):
732 """Check prerequisites.
734 This checks whether the cluster is empty.
736 Any errors are signaled by raising errors.OpPrereqError.
739 master = self.cfg.GetMasterNode()
741 nodelist = self.cfg.GetNodeList()
742 if len(nodelist) != 1 or nodelist[0] != master:
743 raise errors.OpPrereqError("There are still %d node(s) in"
744 " this cluster." % (len(nodelist) - 1))
745 instancelist = self.cfg.GetInstanceList()
747 raise errors.OpPrereqError("There are still %d instance(s) in"
748 " this cluster." % len(instancelist))
750 def Exec(self, feedback_fn):
751 """Destroys the cluster.
754 master = self.cfg.GetMasterNode()
755 result = self.rpc.call_node_stop_master(master, False)
756 result.Raise("Could not disable the master role")
757 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
758 utils.CreateBackup(priv_key)
759 utils.CreateBackup(pub_key)
763 class LUVerifyCluster(LogicalUnit):
764 """Verifies the cluster status.
767 HPATH = "cluster-verify"
768 HTYPE = constants.HTYPE_CLUSTER
769 _OP_REQP = ["skip_checks"]
772 def ExpandNames(self):
773 self.needed_locks = {
774 locking.LEVEL_NODE: locking.ALL_SET,
775 locking.LEVEL_INSTANCE: locking.ALL_SET,
777 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
779 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
780 node_result, feedback_fn, master_files,
782 """Run multiple tests against a node.
786 - compares ganeti version
787 - checks vg existence and size > 20G
788 - checks config file checksum
789 - checks ssh to other nodes
791 @type nodeinfo: L{objects.Node}
792 @param nodeinfo: the node to check
793 @param file_list: required list of files
794 @param local_cksum: dictionary of local files and their checksums
795 @param node_result: the results from the node
796 @param feedback_fn: function used to accumulate results
797 @param master_files: list of files that only masters should have
798 @param drbd_map: the useddrbd minors for this node, in
799 form of minor: (instance, must_exist) which correspond to instances
800 and their running status
801 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
806 # main result, node_result should be a non-empty dict
807 if not node_result or not isinstance(node_result, dict):
808 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
811 # compares ganeti version
812 local_version = constants.PROTOCOL_VERSION
813 remote_version = node_result.get('version', None)
814 if not (remote_version and isinstance(remote_version, (list, tuple)) and
815 len(remote_version) == 2):
816 feedback_fn(" - ERROR: connection to %s failed" % (node))
819 if local_version != remote_version[0]:
820 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
821 " node %s %s" % (local_version, node, remote_version[0]))
824 # node seems compatible, we can actually try to look into its results
828 # full package version
829 if constants.RELEASE_VERSION != remote_version[1]:
830 feedback_fn(" - WARNING: software version mismatch: master %s,"
832 (constants.RELEASE_VERSION, node, remote_version[1]))
834 # checks vg existence and size > 20G
835 if vg_name is not None:
836 vglist = node_result.get(constants.NV_VGLIST, None)
838 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
842 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
843 constants.MIN_VG_SIZE)
845 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
848 # checks config file checksum
850 remote_cksum = node_result.get(constants.NV_FILELIST, None)
851 if not isinstance(remote_cksum, dict):
853 feedback_fn(" - ERROR: node hasn't returned file checksum data")
855 for file_name in file_list:
856 node_is_mc = nodeinfo.master_candidate
857 must_have_file = file_name not in master_files
858 if file_name not in remote_cksum:
859 if node_is_mc or must_have_file:
861 feedback_fn(" - ERROR: file '%s' missing" % file_name)
862 elif remote_cksum[file_name] != local_cksum[file_name]:
863 if node_is_mc or must_have_file:
865 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
867 # not candidate and this is not a must-have file
869 feedback_fn(" - ERROR: file '%s' should not exist on non master"
870 " candidates (and the file is outdated)" % file_name)
872 # all good, except non-master/non-must have combination
873 if not node_is_mc and not must_have_file:
874 feedback_fn(" - ERROR: file '%s' should not exist on non master"
875 " candidates" % file_name)
879 if constants.NV_NODELIST not in node_result:
881 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
883 if node_result[constants.NV_NODELIST]:
885 for node in node_result[constants.NV_NODELIST]:
886 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
887 (node, node_result[constants.NV_NODELIST][node]))
889 if constants.NV_NODENETTEST not in node_result:
891 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
893 if node_result[constants.NV_NODENETTEST]:
895 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
897 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
898 (node, node_result[constants.NV_NODENETTEST][node]))
900 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
901 if isinstance(hyp_result, dict):
902 for hv_name, hv_result in hyp_result.iteritems():
903 if hv_result is not None:
904 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
905 (hv_name, hv_result))
907 # check used drbd list
908 if vg_name is not None:
909 used_minors = node_result.get(constants.NV_DRBDLIST, [])
910 if not isinstance(used_minors, (tuple, list)):
911 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
914 for minor, (iname, must_exist) in drbd_map.items():
915 if minor not in used_minors and must_exist:
916 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
917 " not active" % (minor, iname))
919 for minor in used_minors:
920 if minor not in drbd_map:
921 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
927 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
928 node_instance, feedback_fn, n_offline):
929 """Verify an instance.
931 This function checks to see if the required block devices are
932 available on the instance's node.
937 node_current = instanceconfig.primary_node
940 instanceconfig.MapLVsByNode(node_vol_should)
942 for node in node_vol_should:
943 if node in n_offline:
944 # ignore missing volumes on offline nodes
946 for volume in node_vol_should[node]:
947 if node not in node_vol_is or volume not in node_vol_is[node]:
948 feedback_fn(" - ERROR: volume %s missing on node %s" %
952 if instanceconfig.admin_up:
953 if ((node_current not in node_instance or
954 not instance in node_instance[node_current]) and
955 node_current not in n_offline):
956 feedback_fn(" - ERROR: instance %s not running on node %s" %
957 (instance, node_current))
960 for node in node_instance:
961 if (not node == node_current):
962 if instance in node_instance[node]:
963 feedback_fn(" - ERROR: instance %s should not run on node %s" %
969 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
970 """Verify if there are any unknown volumes in the cluster.
972 The .os, .swap and backup volumes are ignored. All other volumes are
978 for node in node_vol_is:
979 for volume in node_vol_is[node]:
980 if node not in node_vol_should or volume not in node_vol_should[node]:
981 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
986 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
987 """Verify the list of running instances.
989 This checks what instances are running but unknown to the cluster.
993 for node in node_instance:
994 for runninginstance in node_instance[node]:
995 if runninginstance not in instancelist:
996 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
997 (runninginstance, node))
1001 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1002 """Verify N+1 Memory Resilience.
1004 Check that if one single node dies we can still start all the instances it
1010 for node, nodeinfo in node_info.iteritems():
1011 # This code checks that every node which is now listed as secondary has
1012 # enough memory to host all instances it is supposed to should a single
1013 # other node in the cluster fail.
1014 # FIXME: not ready for failover to an arbitrary node
1015 # FIXME: does not support file-backed instances
1016 # WARNING: we currently take into account down instances as well as up
1017 # ones, considering that even if they're down someone might want to start
1018 # them even in the event of a node failure.
1019 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1021 for instance in instances:
1022 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1023 if bep[constants.BE_AUTO_BALANCE]:
1024 needed_mem += bep[constants.BE_MEMORY]
1025 if nodeinfo['mfree'] < needed_mem:
1026 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1027 " failovers should node %s fail" % (node, prinode))
1031 def CheckPrereq(self):
1032 """Check prerequisites.
1034 Transform the list of checks we're going to skip into a set and check that
1035 all its members are valid.
1038 self.skip_set = frozenset(self.op.skip_checks)
1039 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1040 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1042 def BuildHooksEnv(self):
1045 Cluster-Verify hooks just ran in the post phase and their failure makes
1046 the output be logged in the verify output and the verification to fail.
1049 all_nodes = self.cfg.GetNodeList()
1051 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1053 for node in self.cfg.GetAllNodesInfo().values():
1054 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1056 return env, [], all_nodes
1058 def Exec(self, feedback_fn):
1059 """Verify integrity of cluster, performing various test on nodes.
1063 feedback_fn("* Verifying global settings")
1064 for msg in self.cfg.VerifyConfig():
1065 feedback_fn(" - ERROR: %s" % msg)
1067 vg_name = self.cfg.GetVGName()
1068 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1069 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1070 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1071 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1072 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1073 for iname in instancelist)
1074 i_non_redundant = [] # Non redundant instances
1075 i_non_a_balanced = [] # Non auto-balanced instances
1076 n_offline = [] # List of offline nodes
1077 n_drained = [] # List of nodes being drained
1083 # FIXME: verify OS list
1084 # do local checksums
1085 master_files = [constants.CLUSTER_CONF_FILE]
1087 file_names = ssconf.SimpleStore().GetFileList()
1088 file_names.append(constants.SSL_CERT_FILE)
1089 file_names.append(constants.RAPI_CERT_FILE)
1090 file_names.extend(master_files)
1092 local_checksums = utils.FingerprintFiles(file_names)
1094 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1095 node_verify_param = {
1096 constants.NV_FILELIST: file_names,
1097 constants.NV_NODELIST: [node.name for node in nodeinfo
1098 if not node.offline],
1099 constants.NV_HYPERVISOR: hypervisors,
1100 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1101 node.secondary_ip) for node in nodeinfo
1102 if not node.offline],
1103 constants.NV_INSTANCELIST: hypervisors,
1104 constants.NV_VERSION: None,
1105 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1107 if vg_name is not None:
1108 node_verify_param[constants.NV_VGLIST] = None
1109 node_verify_param[constants.NV_LVLIST] = vg_name
1110 node_verify_param[constants.NV_DRBDLIST] = None
1111 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1112 self.cfg.GetClusterName())
1114 cluster = self.cfg.GetClusterInfo()
1115 master_node = self.cfg.GetMasterNode()
1116 all_drbd_map = self.cfg.ComputeDRBDMap()
1118 for node_i in nodeinfo:
1122 feedback_fn("* Skipping offline node %s" % (node,))
1123 n_offline.append(node)
1126 if node == master_node:
1128 elif node_i.master_candidate:
1129 ntype = "master candidate"
1130 elif node_i.drained:
1132 n_drained.append(node)
1135 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1137 msg = all_nvinfo[node].fail_msg
1139 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1143 nresult = all_nvinfo[node].payload
1145 for minor, instance in all_drbd_map[node].items():
1146 if instance not in instanceinfo:
1147 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1149 # ghost instance should not be running, but otherwise we
1150 # don't give double warnings (both ghost instance and
1151 # unallocated minor in use)
1152 node_drbd[minor] = (instance, False)
1154 instance = instanceinfo[instance]
1155 node_drbd[minor] = (instance.name, instance.admin_up)
1156 result = self._VerifyNode(node_i, file_names, local_checksums,
1157 nresult, feedback_fn, master_files,
1161 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1163 node_volume[node] = {}
1164 elif isinstance(lvdata, basestring):
1165 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1166 (node, utils.SafeEncode(lvdata)))
1168 node_volume[node] = {}
1169 elif not isinstance(lvdata, dict):
1170 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1174 node_volume[node] = lvdata
1177 idata = nresult.get(constants.NV_INSTANCELIST, None)
1178 if not isinstance(idata, list):
1179 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1184 node_instance[node] = idata
1187 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1188 if not isinstance(nodeinfo, dict):
1189 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1195 "mfree": int(nodeinfo['memory_free']),
1198 # dictionary holding all instances this node is secondary for,
1199 # grouped by their primary node. Each key is a cluster node, and each
1200 # value is a list of instances which have the key as primary and the
1201 # current node as secondary. this is handy to calculate N+1 memory
1202 # availability if you can only failover from a primary to its
1204 "sinst-by-pnode": {},
1206 # FIXME: devise a free space model for file based instances as well
1207 if vg_name is not None:
1208 if (constants.NV_VGLIST not in nresult or
1209 vg_name not in nresult[constants.NV_VGLIST]):
1210 feedback_fn(" - ERROR: node %s didn't return data for the"
1211 " volume group '%s' - it is either missing or broken" %
1215 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1216 except (ValueError, KeyError):
1217 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1218 " from node %s" % (node,))
1222 node_vol_should = {}
1224 for instance in instancelist:
1225 feedback_fn("* Verifying instance %s" % instance)
1226 inst_config = instanceinfo[instance]
1227 result = self._VerifyInstance(instance, inst_config, node_volume,
1228 node_instance, feedback_fn, n_offline)
1230 inst_nodes_offline = []
1232 inst_config.MapLVsByNode(node_vol_should)
1234 instance_cfg[instance] = inst_config
1236 pnode = inst_config.primary_node
1237 if pnode in node_info:
1238 node_info[pnode]['pinst'].append(instance)
1239 elif pnode not in n_offline:
1240 feedback_fn(" - ERROR: instance %s, connection to primary node"
1241 " %s failed" % (instance, pnode))
1244 if pnode in n_offline:
1245 inst_nodes_offline.append(pnode)
1247 # If the instance is non-redundant we cannot survive losing its primary
1248 # node, so we are not N+1 compliant. On the other hand we have no disk
1249 # templates with more than one secondary so that situation is not well
1251 # FIXME: does not support file-backed instances
1252 if len(inst_config.secondary_nodes) == 0:
1253 i_non_redundant.append(instance)
1254 elif len(inst_config.secondary_nodes) > 1:
1255 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1258 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1259 i_non_a_balanced.append(instance)
1261 for snode in inst_config.secondary_nodes:
1262 if snode in node_info:
1263 node_info[snode]['sinst'].append(instance)
1264 if pnode not in node_info[snode]['sinst-by-pnode']:
1265 node_info[snode]['sinst-by-pnode'][pnode] = []
1266 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1267 elif snode not in n_offline:
1268 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1269 " %s failed" % (instance, snode))
1271 if snode in n_offline:
1272 inst_nodes_offline.append(snode)
1274 if inst_nodes_offline:
1275 # warn that the instance lives on offline nodes, and set bad=True
1276 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1277 ", ".join(inst_nodes_offline))
1280 feedback_fn("* Verifying orphan volumes")
1281 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1285 feedback_fn("* Verifying remaining instances")
1286 result = self._VerifyOrphanInstances(instancelist, node_instance,
1290 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1291 feedback_fn("* Verifying N+1 Memory redundancy")
1292 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1295 feedback_fn("* Other Notes")
1297 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1298 % len(i_non_redundant))
1300 if i_non_a_balanced:
1301 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1302 % len(i_non_a_balanced))
1305 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1308 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1312 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1313 """Analyze the post-hooks' result
1315 This method analyses the hook result, handles it, and sends some
1316 nicely-formatted feedback back to the user.
1318 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1319 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1320 @param hooks_results: the results of the multi-node hooks rpc call
1321 @param feedback_fn: function used send feedback back to the caller
1322 @param lu_result: previous Exec result
1323 @return: the new Exec result, based on the previous result
1327 # We only really run POST phase hooks, and are only interested in
1329 if phase == constants.HOOKS_PHASE_POST:
1330 # Used to change hooks' output to proper indentation
1331 indent_re = re.compile('^', re.M)
1332 feedback_fn("* Hooks Results")
1333 if not hooks_results:
1334 feedback_fn(" - ERROR: general communication failure")
1337 for node_name in hooks_results:
1338 show_node_header = True
1339 res = hooks_results[node_name]
1343 # no need to warn or set fail return value
1345 feedback_fn(" Communication failure in hooks execution: %s" %
1349 for script, hkr, output in res.payload:
1350 if hkr == constants.HKR_FAIL:
1351 # The node header is only shown once, if there are
1352 # failing hooks on that node
1353 if show_node_header:
1354 feedback_fn(" Node %s:" % node_name)
1355 show_node_header = False
1356 feedback_fn(" ERROR: Script %s failed, output:" % script)
1357 output = indent_re.sub(' ', output)
1358 feedback_fn("%s" % output)
1364 class LUVerifyDisks(NoHooksLU):
1365 """Verifies the cluster disks status.
1371 def ExpandNames(self):
1372 self.needed_locks = {
1373 locking.LEVEL_NODE: locking.ALL_SET,
1374 locking.LEVEL_INSTANCE: locking.ALL_SET,
1376 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1378 def CheckPrereq(self):
1379 """Check prerequisites.
1381 This has no prerequisites.
1386 def Exec(self, feedback_fn):
1387 """Verify integrity of cluster disks.
1389 @rtype: tuple of three items
1390 @return: a tuple of (dict of node-to-node_error, list of instances
1391 which need activate-disks, dict of instance: (node, volume) for
1395 result = res_nodes, res_instances, res_missing = {}, [], {}
1397 vg_name = self.cfg.GetVGName()
1398 nodes = utils.NiceSort(self.cfg.GetNodeList())
1399 instances = [self.cfg.GetInstanceInfo(name)
1400 for name in self.cfg.GetInstanceList()]
1403 for inst in instances:
1405 if (not inst.admin_up or
1406 inst.disk_template not in constants.DTS_NET_MIRROR):
1408 inst.MapLVsByNode(inst_lvs)
1409 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1410 for node, vol_list in inst_lvs.iteritems():
1411 for vol in vol_list:
1412 nv_dict[(node, vol)] = inst
1417 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1421 node_res = node_lvs[node]
1422 if node_res.offline:
1424 msg = node_res.fail_msg
1426 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1427 res_nodes[node] = msg
1430 lvs = node_res.payload
1431 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1432 inst = nv_dict.pop((node, lv_name), None)
1433 if (not lv_online and inst is not None
1434 and inst.name not in res_instances):
1435 res_instances.append(inst.name)
1437 # any leftover items in nv_dict are missing LVs, let's arrange the
1439 for key, inst in nv_dict.iteritems():
1440 if inst.name not in res_missing:
1441 res_missing[inst.name] = []
1442 res_missing[inst.name].append(key)
1447 class LURenameCluster(LogicalUnit):
1448 """Rename the cluster.
1451 HPATH = "cluster-rename"
1452 HTYPE = constants.HTYPE_CLUSTER
1455 def BuildHooksEnv(self):
1460 "OP_TARGET": self.cfg.GetClusterName(),
1461 "NEW_NAME": self.op.name,
1463 mn = self.cfg.GetMasterNode()
1464 return env, [mn], [mn]
1466 def CheckPrereq(self):
1467 """Verify that the passed name is a valid one.
1470 hostname = utils.HostInfo(self.op.name)
1472 new_name = hostname.name
1473 self.ip = new_ip = hostname.ip
1474 old_name = self.cfg.GetClusterName()
1475 old_ip = self.cfg.GetMasterIP()
1476 if new_name == old_name and new_ip == old_ip:
1477 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1478 " cluster has changed")
1479 if new_ip != old_ip:
1480 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1481 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1482 " reachable on the network. Aborting." %
1485 self.op.name = new_name
1487 def Exec(self, feedback_fn):
1488 """Rename the cluster.
1491 clustername = self.op.name
1494 # shutdown the master IP
1495 master = self.cfg.GetMasterNode()
1496 result = self.rpc.call_node_stop_master(master, False)
1497 result.Raise("Could not disable the master role")
1500 cluster = self.cfg.GetClusterInfo()
1501 cluster.cluster_name = clustername
1502 cluster.master_ip = ip
1503 self.cfg.Update(cluster)
1505 # update the known hosts file
1506 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1507 node_list = self.cfg.GetNodeList()
1509 node_list.remove(master)
1512 result = self.rpc.call_upload_file(node_list,
1513 constants.SSH_KNOWN_HOSTS_FILE)
1514 for to_node, to_result in result.iteritems():
1515 msg = to_result.fail_msg
1517 msg = ("Copy of file %s to node %s failed: %s" %
1518 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1519 self.proc.LogWarning(msg)
1522 result = self.rpc.call_node_start_master(master, False, False)
1523 msg = result.fail_msg
1525 self.LogWarning("Could not re-enable the master role on"
1526 " the master, please restart manually: %s", msg)
1529 def _RecursiveCheckIfLVMBased(disk):
1530 """Check if the given disk or its children are lvm-based.
1532 @type disk: L{objects.Disk}
1533 @param disk: the disk to check
1535 @return: boolean indicating whether a LD_LV dev_type was found or not
1539 for chdisk in disk.children:
1540 if _RecursiveCheckIfLVMBased(chdisk):
1542 return disk.dev_type == constants.LD_LV
1545 class LUSetClusterParams(LogicalUnit):
1546 """Change the parameters of the cluster.
1549 HPATH = "cluster-modify"
1550 HTYPE = constants.HTYPE_CLUSTER
1554 def CheckArguments(self):
1558 if not hasattr(self.op, "candidate_pool_size"):
1559 self.op.candidate_pool_size = None
1560 if self.op.candidate_pool_size is not None:
1562 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1563 except (ValueError, TypeError), err:
1564 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1566 if self.op.candidate_pool_size < 1:
1567 raise errors.OpPrereqError("At least one master candidate needed")
1569 def ExpandNames(self):
1570 # FIXME: in the future maybe other cluster params won't require checking on
1571 # all nodes to be modified.
1572 self.needed_locks = {
1573 locking.LEVEL_NODE: locking.ALL_SET,
1575 self.share_locks[locking.LEVEL_NODE] = 1
1577 def BuildHooksEnv(self):
1582 "OP_TARGET": self.cfg.GetClusterName(),
1583 "NEW_VG_NAME": self.op.vg_name,
1585 mn = self.cfg.GetMasterNode()
1586 return env, [mn], [mn]
1588 def CheckPrereq(self):
1589 """Check prerequisites.
1591 This checks whether the given params don't conflict and
1592 if the given volume group is valid.
1595 if self.op.vg_name is not None and not self.op.vg_name:
1596 instances = self.cfg.GetAllInstancesInfo().values()
1597 for inst in instances:
1598 for disk in inst.disks:
1599 if _RecursiveCheckIfLVMBased(disk):
1600 raise errors.OpPrereqError("Cannot disable lvm storage while"
1601 " lvm-based instances exist")
1603 node_list = self.acquired_locks[locking.LEVEL_NODE]
1605 # if vg_name not None, checks given volume group on all nodes
1607 vglist = self.rpc.call_vg_list(node_list)
1608 for node in node_list:
1609 msg = vglist[node].fail_msg
1611 # ignoring down node
1612 self.LogWarning("Error while gathering data on node %s"
1613 " (ignoring node): %s", node, msg)
1615 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1617 constants.MIN_VG_SIZE)
1619 raise errors.OpPrereqError("Error on node '%s': %s" %
1622 self.cluster = cluster = self.cfg.GetClusterInfo()
1623 # validate params changes
1624 if self.op.beparams:
1625 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1626 self.new_beparams = objects.FillDict(
1627 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1629 if self.op.nicparams:
1630 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1631 self.new_nicparams = objects.FillDict(
1632 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1633 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1635 # hypervisor list/parameters
1636 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1637 if self.op.hvparams:
1638 if not isinstance(self.op.hvparams, dict):
1639 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1640 for hv_name, hv_dict in self.op.hvparams.items():
1641 if hv_name not in self.new_hvparams:
1642 self.new_hvparams[hv_name] = hv_dict
1644 self.new_hvparams[hv_name].update(hv_dict)
1646 if self.op.enabled_hypervisors is not None:
1647 self.hv_list = self.op.enabled_hypervisors
1648 if not self.hv_list:
1649 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1650 " least one member")
1651 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1653 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1654 " entries: %s" % invalid_hvs)
1656 self.hv_list = cluster.enabled_hypervisors
1658 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1659 # either the enabled list has changed, or the parameters have, validate
1660 for hv_name, hv_params in self.new_hvparams.items():
1661 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1662 (self.op.enabled_hypervisors and
1663 hv_name in self.op.enabled_hypervisors)):
1664 # either this is a new hypervisor, or its parameters have changed
1665 hv_class = hypervisor.GetHypervisor(hv_name)
1666 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1667 hv_class.CheckParameterSyntax(hv_params)
1668 _CheckHVParams(self, node_list, hv_name, hv_params)
1670 def Exec(self, feedback_fn):
1671 """Change the parameters of the cluster.
1674 if self.op.vg_name is not None:
1675 new_volume = self.op.vg_name
1678 if new_volume != self.cfg.GetVGName():
1679 self.cfg.SetVGName(new_volume)
1681 feedback_fn("Cluster LVM configuration already in desired"
1682 " state, not changing")
1683 if self.op.hvparams:
1684 self.cluster.hvparams = self.new_hvparams
1685 if self.op.enabled_hypervisors is not None:
1686 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1687 if self.op.beparams:
1688 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1689 if self.op.nicparams:
1690 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1692 if self.op.candidate_pool_size is not None:
1693 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1694 # we need to update the pool size here, otherwise the save will fail
1695 _AdjustCandidatePool(self)
1697 self.cfg.Update(self.cluster)
1700 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1701 """Distribute additional files which are part of the cluster configuration.
1703 ConfigWriter takes care of distributing the config and ssconf files, but
1704 there are more files which should be distributed to all nodes. This function
1705 makes sure those are copied.
1707 @param lu: calling logical unit
1708 @param additional_nodes: list of nodes not in the config to distribute to
1711 # 1. Gather target nodes
1712 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1713 dist_nodes = lu.cfg.GetNodeList()
1714 if additional_nodes is not None:
1715 dist_nodes.extend(additional_nodes)
1716 if myself.name in dist_nodes:
1717 dist_nodes.remove(myself.name)
1718 # 2. Gather files to distribute
1719 dist_files = set([constants.ETC_HOSTS,
1720 constants.SSH_KNOWN_HOSTS_FILE,
1721 constants.RAPI_CERT_FILE,
1722 constants.RAPI_USERS_FILE,
1723 constants.HMAC_CLUSTER_KEY,
1726 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1727 for hv_name in enabled_hypervisors:
1728 hv_class = hypervisor.GetHypervisor(hv_name)
1729 dist_files.update(hv_class.GetAncillaryFiles())
1731 # 3. Perform the files upload
1732 for fname in dist_files:
1733 if os.path.exists(fname):
1734 result = lu.rpc.call_upload_file(dist_nodes, fname)
1735 for to_node, to_result in result.items():
1736 msg = to_result.fail_msg
1738 msg = ("Copy of file %s to node %s failed: %s" %
1739 (fname, to_node, msg))
1740 lu.proc.LogWarning(msg)
1743 class LURedistributeConfig(NoHooksLU):
1744 """Force the redistribution of cluster configuration.
1746 This is a very simple LU.
1752 def ExpandNames(self):
1753 self.needed_locks = {
1754 locking.LEVEL_NODE: locking.ALL_SET,
1756 self.share_locks[locking.LEVEL_NODE] = 1
1758 def CheckPrereq(self):
1759 """Check prerequisites.
1763 def Exec(self, feedback_fn):
1764 """Redistribute the configuration.
1767 self.cfg.Update(self.cfg.GetClusterInfo())
1768 _RedistributeAncillaryFiles(self)
1771 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1772 """Sleep and poll for an instance's disk to sync.
1775 if not instance.disks:
1779 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1781 node = instance.primary_node
1783 for dev in instance.disks:
1784 lu.cfg.SetDiskID(dev, node)
1787 degr_retries = 10 # in seconds, as we sleep 1 second each time
1791 cumul_degraded = False
1792 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1793 msg = rstats.fail_msg
1795 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1798 raise errors.RemoteError("Can't contact node %s for mirror data,"
1799 " aborting." % node)
1802 rstats = rstats.payload
1804 for i, mstat in enumerate(rstats):
1806 lu.LogWarning("Can't compute data for node %s/%s",
1807 node, instance.disks[i].iv_name)
1809 # we ignore the ldisk parameter
1810 perc_done, est_time, is_degraded, _ = mstat
1811 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1812 if perc_done is not None:
1814 if est_time is not None:
1815 rem_time = "%d estimated seconds remaining" % est_time
1818 rem_time = "no time estimate"
1819 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1820 (instance.disks[i].iv_name, perc_done, rem_time))
1822 # if we're done but degraded, let's do a few small retries, to
1823 # make sure we see a stable and not transient situation; therefore
1824 # we force restart of the loop
1825 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1826 logging.info("Degraded disks found, %d retries left", degr_retries)
1834 time.sleep(min(60, max_time))
1837 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1838 return not cumul_degraded
1841 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1842 """Check that mirrors are not degraded.
1844 The ldisk parameter, if True, will change the test from the
1845 is_degraded attribute (which represents overall non-ok status for
1846 the device(s)) to the ldisk (representing the local storage status).
1849 lu.cfg.SetDiskID(dev, node)
1856 if on_primary or dev.AssembleOnSecondary():
1857 rstats = lu.rpc.call_blockdev_find(node, dev)
1858 msg = rstats.fail_msg
1860 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1862 elif not rstats.payload:
1863 lu.LogWarning("Can't find disk on node %s", node)
1866 result = result and (not rstats.payload[idx])
1868 for child in dev.children:
1869 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1874 class LUDiagnoseOS(NoHooksLU):
1875 """Logical unit for OS diagnose/query.
1878 _OP_REQP = ["output_fields", "names"]
1880 _FIELDS_STATIC = utils.FieldSet()
1881 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1883 def ExpandNames(self):
1885 raise errors.OpPrereqError("Selective OS query not supported")
1887 _CheckOutputFields(static=self._FIELDS_STATIC,
1888 dynamic=self._FIELDS_DYNAMIC,
1889 selected=self.op.output_fields)
1891 # Lock all nodes, in shared mode
1892 # Temporary removal of locks, should be reverted later
1893 # TODO: reintroduce locks when they are lighter-weight
1894 self.needed_locks = {}
1895 #self.share_locks[locking.LEVEL_NODE] = 1
1896 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1898 def CheckPrereq(self):
1899 """Check prerequisites.
1904 def _DiagnoseByOS(node_list, rlist):
1905 """Remaps a per-node return list into an a per-os per-node dictionary
1907 @param node_list: a list with the names of all nodes
1908 @param rlist: a map with node names as keys and OS objects as values
1911 @return: a dictionary with osnames as keys and as value another map, with
1912 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1914 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1915 (/srv/..., False, "invalid api")],
1916 "node2": [(/srv/..., True, "")]}
1921 # we build here the list of nodes that didn't fail the RPC (at RPC
1922 # level), so that nodes with a non-responding node daemon don't
1923 # make all OSes invalid
1924 good_nodes = [node_name for node_name in rlist
1925 if not rlist[node_name].fail_msg]
1926 for node_name, nr in rlist.items():
1927 if nr.fail_msg or not nr.payload:
1929 for name, path, status, diagnose in nr.payload:
1930 if name not in all_os:
1931 # build a list of nodes for this os containing empty lists
1932 # for each node in node_list
1934 for nname in good_nodes:
1935 all_os[name][nname] = []
1936 all_os[name][node_name].append((path, status, diagnose))
1939 def Exec(self, feedback_fn):
1940 """Compute the list of OSes.
1943 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1944 node_data = self.rpc.call_os_diagnose(valid_nodes)
1945 pol = self._DiagnoseByOS(valid_nodes, node_data)
1947 for os_name, os_data in pol.items():
1949 for field in self.op.output_fields:
1952 elif field == "valid":
1953 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1954 elif field == "node_status":
1955 # this is just a copy of the dict
1957 for node_name, nos_list in os_data.items():
1958 val[node_name] = nos_list
1960 raise errors.ParameterError(field)
1967 class LURemoveNode(LogicalUnit):
1968 """Logical unit for removing a node.
1971 HPATH = "node-remove"
1972 HTYPE = constants.HTYPE_NODE
1973 _OP_REQP = ["node_name"]
1975 def BuildHooksEnv(self):
1978 This doesn't run on the target node in the pre phase as a failed
1979 node would then be impossible to remove.
1983 "OP_TARGET": self.op.node_name,
1984 "NODE_NAME": self.op.node_name,
1986 all_nodes = self.cfg.GetNodeList()
1987 all_nodes.remove(self.op.node_name)
1988 return env, all_nodes, all_nodes
1990 def CheckPrereq(self):
1991 """Check prerequisites.
1994 - the node exists in the configuration
1995 - it does not have primary or secondary instances
1996 - it's not the master
1998 Any errors are signaled by raising errors.OpPrereqError.
2001 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2003 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2005 instance_list = self.cfg.GetInstanceList()
2007 masternode = self.cfg.GetMasterNode()
2008 if node.name == masternode:
2009 raise errors.OpPrereqError("Node is the master node,"
2010 " you need to failover first.")
2012 for instance_name in instance_list:
2013 instance = self.cfg.GetInstanceInfo(instance_name)
2014 if node.name in instance.all_nodes:
2015 raise errors.OpPrereqError("Instance %s is still running on the node,"
2016 " please remove first." % instance_name)
2017 self.op.node_name = node.name
2020 def Exec(self, feedback_fn):
2021 """Removes the node from the cluster.
2025 logging.info("Stopping the node daemon and removing configs from node %s",
2028 self.context.RemoveNode(node.name)
2030 result = self.rpc.call_node_leave_cluster(node.name)
2031 msg = result.fail_msg
2033 self.LogWarning("Errors encountered on the remote node while leaving"
2034 " the cluster: %s", msg)
2036 # Promote nodes to master candidate as needed
2037 _AdjustCandidatePool(self)
2040 class LUQueryNodes(NoHooksLU):
2041 """Logical unit for querying nodes.
2044 _OP_REQP = ["output_fields", "names", "use_locking"]
2046 _FIELDS_DYNAMIC = utils.FieldSet(
2048 "mtotal", "mnode", "mfree",
2050 "ctotal", "cnodes", "csockets",
2053 _FIELDS_STATIC = utils.FieldSet(
2054 "name", "pinst_cnt", "sinst_cnt",
2055 "pinst_list", "sinst_list",
2056 "pip", "sip", "tags",
2065 def ExpandNames(self):
2066 _CheckOutputFields(static=self._FIELDS_STATIC,
2067 dynamic=self._FIELDS_DYNAMIC,
2068 selected=self.op.output_fields)
2070 self.needed_locks = {}
2071 self.share_locks[locking.LEVEL_NODE] = 1
2074 self.wanted = _GetWantedNodes(self, self.op.names)
2076 self.wanted = locking.ALL_SET
2078 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2079 self.do_locking = self.do_node_query and self.op.use_locking
2081 # if we don't request only static fields, we need to lock the nodes
2082 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2085 def CheckPrereq(self):
2086 """Check prerequisites.
2089 # The validation of the node list is done in the _GetWantedNodes,
2090 # if non empty, and if empty, there's no validation to do
2093 def Exec(self, feedback_fn):
2094 """Computes the list of nodes and their attributes.
2097 all_info = self.cfg.GetAllNodesInfo()
2099 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2100 elif self.wanted != locking.ALL_SET:
2101 nodenames = self.wanted
2102 missing = set(nodenames).difference(all_info.keys())
2104 raise errors.OpExecError(
2105 "Some nodes were removed before retrieving their data: %s" % missing)
2107 nodenames = all_info.keys()
2109 nodenames = utils.NiceSort(nodenames)
2110 nodelist = [all_info[name] for name in nodenames]
2112 # begin data gathering
2114 if self.do_node_query:
2116 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2117 self.cfg.GetHypervisorType())
2118 for name in nodenames:
2119 nodeinfo = node_data[name]
2120 if not nodeinfo.fail_msg and nodeinfo.payload:
2121 nodeinfo = nodeinfo.payload
2122 fn = utils.TryConvert
2124 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2125 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2126 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2127 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2128 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2129 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2130 "bootid": nodeinfo.get('bootid', None),
2131 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2132 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2135 live_data[name] = {}
2137 live_data = dict.fromkeys(nodenames, {})
2139 node_to_primary = dict([(name, set()) for name in nodenames])
2140 node_to_secondary = dict([(name, set()) for name in nodenames])
2142 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2143 "sinst_cnt", "sinst_list"))
2144 if inst_fields & frozenset(self.op.output_fields):
2145 instancelist = self.cfg.GetInstanceList()
2147 for instance_name in instancelist:
2148 inst = self.cfg.GetInstanceInfo(instance_name)
2149 if inst.primary_node in node_to_primary:
2150 node_to_primary[inst.primary_node].add(inst.name)
2151 for secnode in inst.secondary_nodes:
2152 if secnode in node_to_secondary:
2153 node_to_secondary[secnode].add(inst.name)
2155 master_node = self.cfg.GetMasterNode()
2157 # end data gathering
2160 for node in nodelist:
2162 for field in self.op.output_fields:
2165 elif field == "pinst_list":
2166 val = list(node_to_primary[node.name])
2167 elif field == "sinst_list":
2168 val = list(node_to_secondary[node.name])
2169 elif field == "pinst_cnt":
2170 val = len(node_to_primary[node.name])
2171 elif field == "sinst_cnt":
2172 val = len(node_to_secondary[node.name])
2173 elif field == "pip":
2174 val = node.primary_ip
2175 elif field == "sip":
2176 val = node.secondary_ip
2177 elif field == "tags":
2178 val = list(node.GetTags())
2179 elif field == "serial_no":
2180 val = node.serial_no
2181 elif field == "master_candidate":
2182 val = node.master_candidate
2183 elif field == "master":
2184 val = node.name == master_node
2185 elif field == "offline":
2187 elif field == "drained":
2189 elif self._FIELDS_DYNAMIC.Matches(field):
2190 val = live_data[node.name].get(field, None)
2191 elif field == "role":
2192 if node.name == master_node:
2194 elif node.master_candidate:
2203 raise errors.ParameterError(field)
2204 node_output.append(val)
2205 output.append(node_output)
2210 class LUQueryNodeVolumes(NoHooksLU):
2211 """Logical unit for getting volumes on node(s).
2214 _OP_REQP = ["nodes", "output_fields"]
2216 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2217 _FIELDS_STATIC = utils.FieldSet("node")
2219 def ExpandNames(self):
2220 _CheckOutputFields(static=self._FIELDS_STATIC,
2221 dynamic=self._FIELDS_DYNAMIC,
2222 selected=self.op.output_fields)
2224 self.needed_locks = {}
2225 self.share_locks[locking.LEVEL_NODE] = 1
2226 if not self.op.nodes:
2227 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2229 self.needed_locks[locking.LEVEL_NODE] = \
2230 _GetWantedNodes(self, self.op.nodes)
2232 def CheckPrereq(self):
2233 """Check prerequisites.
2235 This checks that the fields required are valid output fields.
2238 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2240 def Exec(self, feedback_fn):
2241 """Computes the list of nodes and their attributes.
2244 nodenames = self.nodes
2245 volumes = self.rpc.call_node_volumes(nodenames)
2247 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2248 in self.cfg.GetInstanceList()]
2250 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2253 for node in nodenames:
2254 nresult = volumes[node]
2257 msg = nresult.fail_msg
2259 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2262 node_vols = nresult.payload[:]
2263 node_vols.sort(key=lambda vol: vol['dev'])
2265 for vol in node_vols:
2267 for field in self.op.output_fields:
2270 elif field == "phys":
2274 elif field == "name":
2276 elif field == "size":
2277 val = int(float(vol['size']))
2278 elif field == "instance":
2280 if node not in lv_by_node[inst]:
2282 if vol['name'] in lv_by_node[inst][node]:
2288 raise errors.ParameterError(field)
2289 node_output.append(str(val))
2291 output.append(node_output)
2296 class LUAddNode(LogicalUnit):
2297 """Logical unit for adding node to the cluster.
2301 HTYPE = constants.HTYPE_NODE
2302 _OP_REQP = ["node_name"]
2304 def BuildHooksEnv(self):
2307 This will run on all nodes before, and on all nodes + the new node after.
2311 "OP_TARGET": self.op.node_name,
2312 "NODE_NAME": self.op.node_name,
2313 "NODE_PIP": self.op.primary_ip,
2314 "NODE_SIP": self.op.secondary_ip,
2316 nodes_0 = self.cfg.GetNodeList()
2317 nodes_1 = nodes_0 + [self.op.node_name, ]
2318 return env, nodes_0, nodes_1
2320 def CheckPrereq(self):
2321 """Check prerequisites.
2324 - the new node is not already in the config
2326 - its parameters (single/dual homed) matches the cluster
2328 Any errors are signaled by raising errors.OpPrereqError.
2331 node_name = self.op.node_name
2334 dns_data = utils.HostInfo(node_name)
2336 node = dns_data.name
2337 primary_ip = self.op.primary_ip = dns_data.ip
2338 secondary_ip = getattr(self.op, "secondary_ip", None)
2339 if secondary_ip is None:
2340 secondary_ip = primary_ip
2341 if not utils.IsValidIP(secondary_ip):
2342 raise errors.OpPrereqError("Invalid secondary IP given")
2343 self.op.secondary_ip = secondary_ip
2345 node_list = cfg.GetNodeList()
2346 if not self.op.readd and node in node_list:
2347 raise errors.OpPrereqError("Node %s is already in the configuration" %
2349 elif self.op.readd and node not in node_list:
2350 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2352 for existing_node_name in node_list:
2353 existing_node = cfg.GetNodeInfo(existing_node_name)
2355 if self.op.readd and node == existing_node_name:
2356 if (existing_node.primary_ip != primary_ip or
2357 existing_node.secondary_ip != secondary_ip):
2358 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2359 " address configuration as before")
2362 if (existing_node.primary_ip == primary_ip or
2363 existing_node.secondary_ip == primary_ip or
2364 existing_node.primary_ip == secondary_ip or
2365 existing_node.secondary_ip == secondary_ip):
2366 raise errors.OpPrereqError("New node ip address(es) conflict with"
2367 " existing node %s" % existing_node.name)
2369 # check that the type of the node (single versus dual homed) is the
2370 # same as for the master
2371 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2372 master_singlehomed = myself.secondary_ip == myself.primary_ip
2373 newbie_singlehomed = secondary_ip == primary_ip
2374 if master_singlehomed != newbie_singlehomed:
2375 if master_singlehomed:
2376 raise errors.OpPrereqError("The master has no private ip but the"
2377 " new node has one")
2379 raise errors.OpPrereqError("The master has a private ip but the"
2380 " new node doesn't have one")
2382 # checks reachability
2383 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2384 raise errors.OpPrereqError("Node not reachable by ping")
2386 if not newbie_singlehomed:
2387 # check reachability from my secondary ip to newbie's secondary ip
2388 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2389 source=myself.secondary_ip):
2390 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2391 " based ping to noded port")
2393 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2398 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2399 # the new node will increase mc_max with one, so:
2400 mc_max = min(mc_max + 1, cp_size)
2401 self.master_candidate = mc_now < mc_max
2404 self.new_node = self.cfg.GetNodeInfo(node)
2405 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2407 self.new_node = objects.Node(name=node,
2408 primary_ip=primary_ip,
2409 secondary_ip=secondary_ip,
2410 master_candidate=self.master_candidate,
2411 offline=False, drained=False)
2413 def Exec(self, feedback_fn):
2414 """Adds the new node to the cluster.
2417 new_node = self.new_node
2418 node = new_node.name
2420 # for re-adds, reset the offline/drained/master-candidate flags;
2421 # we need to reset here, otherwise offline would prevent RPC calls
2422 # later in the procedure; this also means that if the re-add
2423 # fails, we are left with a non-offlined, broken node
2425 new_node.drained = new_node.offline = False
2426 self.LogInfo("Readding a node, the offline/drained flags were reset")
2427 # if we demote the node, we do cleanup later in the procedure
2428 new_node.master_candidate = self.master_candidate
2430 # notify the user about any possible mc promotion
2431 if new_node.master_candidate:
2432 self.LogInfo("Node will be a master candidate")
2434 # check connectivity
2435 result = self.rpc.call_version([node])[node]
2436 result.Raise("Can't get version information from node %s" % node)
2437 if constants.PROTOCOL_VERSION == result.payload:
2438 logging.info("Communication to node %s fine, sw version %s match",
2439 node, result.payload)
2441 raise errors.OpExecError("Version mismatch master version %s,"
2442 " node version %s" %
2443 (constants.PROTOCOL_VERSION, result.payload))
2446 logging.info("Copy ssh key to node %s", node)
2447 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2449 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2450 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2456 keyarray.append(f.read())
2460 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2462 keyarray[3], keyarray[4], keyarray[5])
2463 result.Raise("Cannot transfer ssh keys to the new node")
2465 # Add node to our /etc/hosts, and add key to known_hosts
2466 if self.cfg.GetClusterInfo().modify_etc_hosts:
2467 utils.AddHostToEtcHosts(new_node.name)
2469 if new_node.secondary_ip != new_node.primary_ip:
2470 result = self.rpc.call_node_has_ip_address(new_node.name,
2471 new_node.secondary_ip)
2472 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2474 if not result.payload:
2475 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2476 " you gave (%s). Please fix and re-run this"
2477 " command." % new_node.secondary_ip)
2479 node_verify_list = [self.cfg.GetMasterNode()]
2480 node_verify_param = {
2482 # TODO: do a node-net-test as well?
2485 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2486 self.cfg.GetClusterName())
2487 for verifier in node_verify_list:
2488 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2489 nl_payload = result[verifier].payload['nodelist']
2491 for failed in nl_payload:
2492 feedback_fn("ssh/hostname verification failed %s -> %s" %
2493 (verifier, nl_payload[failed]))
2494 raise errors.OpExecError("ssh/hostname verification failed.")
2497 _RedistributeAncillaryFiles(self)
2498 self.context.ReaddNode(new_node)
2499 # make sure we redistribute the config
2500 self.cfg.Update(new_node)
2501 # and make sure the new node will not have old files around
2502 if not new_node.master_candidate:
2503 result = self.rpc.call_node_demote_from_mc(new_node.name)
2504 msg = result.RemoteFailMsg()
2506 self.LogWarning("Node failed to demote itself from master"
2507 " candidate status: %s" % msg)
2509 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2510 self.context.AddNode(new_node)
2513 class LUSetNodeParams(LogicalUnit):
2514 """Modifies the parameters of a node.
2517 HPATH = "node-modify"
2518 HTYPE = constants.HTYPE_NODE
2519 _OP_REQP = ["node_name"]
2522 def CheckArguments(self):
2523 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2524 if node_name is None:
2525 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2526 self.op.node_name = node_name
2527 _CheckBooleanOpField(self.op, 'master_candidate')
2528 _CheckBooleanOpField(self.op, 'offline')
2529 _CheckBooleanOpField(self.op, 'drained')
2530 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2531 if all_mods.count(None) == 3:
2532 raise errors.OpPrereqError("Please pass at least one modification")
2533 if all_mods.count(True) > 1:
2534 raise errors.OpPrereqError("Can't set the node into more than one"
2535 " state at the same time")
2537 def ExpandNames(self):
2538 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2540 def BuildHooksEnv(self):
2543 This runs on the master node.
2547 "OP_TARGET": self.op.node_name,
2548 "MASTER_CANDIDATE": str(self.op.master_candidate),
2549 "OFFLINE": str(self.op.offline),
2550 "DRAINED": str(self.op.drained),
2552 nl = [self.cfg.GetMasterNode(),
2556 def CheckPrereq(self):
2557 """Check prerequisites.
2559 This only checks the instance list against the existing names.
2562 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2564 if ((self.op.master_candidate == False or self.op.offline == True or
2565 self.op.drained == True) and node.master_candidate):
2566 # we will demote the node from master_candidate
2567 if self.op.node_name == self.cfg.GetMasterNode():
2568 raise errors.OpPrereqError("The master node has to be a"
2569 " master candidate, online and not drained")
2570 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2571 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2572 if num_candidates <= cp_size:
2573 msg = ("Not enough master candidates (desired"
2574 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2576 self.LogWarning(msg)
2578 raise errors.OpPrereqError(msg)
2580 if (self.op.master_candidate == True and
2581 ((node.offline and not self.op.offline == False) or
2582 (node.drained and not self.op.drained == False))):
2583 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2584 " to master_candidate" % node.name)
2588 def Exec(self, feedback_fn):
2597 if self.op.offline is not None:
2598 node.offline = self.op.offline
2599 result.append(("offline", str(self.op.offline)))
2600 if self.op.offline == True:
2601 if node.master_candidate:
2602 node.master_candidate = False
2604 result.append(("master_candidate", "auto-demotion due to offline"))
2606 node.drained = False
2607 result.append(("drained", "clear drained status due to offline"))
2609 if self.op.master_candidate is not None:
2610 node.master_candidate = self.op.master_candidate
2612 result.append(("master_candidate", str(self.op.master_candidate)))
2613 if self.op.master_candidate == False:
2614 rrc = self.rpc.call_node_demote_from_mc(node.name)
2617 self.LogWarning("Node failed to demote itself: %s" % msg)
2619 if self.op.drained is not None:
2620 node.drained = self.op.drained
2621 result.append(("drained", str(self.op.drained)))
2622 if self.op.drained == True:
2623 if node.master_candidate:
2624 node.master_candidate = False
2626 result.append(("master_candidate", "auto-demotion due to drain"))
2627 rrc = self.rpc.call_node_demote_from_mc(node.name)
2628 msg = rrc.RemoteFailMsg()
2630 self.LogWarning("Node failed to demote itself: %s" % msg)
2632 node.offline = False
2633 result.append(("offline", "clear offline status due to drain"))
2635 # this will trigger configuration file update, if needed
2636 self.cfg.Update(node)
2637 # this will trigger job queue propagation or cleanup
2639 self.context.ReaddNode(node)
2644 class LUPowercycleNode(NoHooksLU):
2645 """Powercycles a node.
2648 _OP_REQP = ["node_name", "force"]
2651 def CheckArguments(self):
2652 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2653 if node_name is None:
2654 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2655 self.op.node_name = node_name
2656 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2657 raise errors.OpPrereqError("The node is the master and the force"
2658 " parameter was not set")
2660 def ExpandNames(self):
2661 """Locking for PowercycleNode.
2663 This is a last-resource option and shouldn't block on other
2664 jobs. Therefore, we grab no locks.
2667 self.needed_locks = {}
2669 def CheckPrereq(self):
2670 """Check prerequisites.
2672 This LU has no prereqs.
2677 def Exec(self, feedback_fn):
2681 result = self.rpc.call_node_powercycle(self.op.node_name,
2682 self.cfg.GetHypervisorType())
2683 result.Raise("Failed to schedule the reboot")
2684 return result.payload
2687 class LUQueryClusterInfo(NoHooksLU):
2688 """Query cluster configuration.
2694 def ExpandNames(self):
2695 self.needed_locks = {}
2697 def CheckPrereq(self):
2698 """No prerequsites needed for this LU.
2703 def Exec(self, feedback_fn):
2704 """Return cluster config.
2707 cluster = self.cfg.GetClusterInfo()
2709 "software_version": constants.RELEASE_VERSION,
2710 "protocol_version": constants.PROTOCOL_VERSION,
2711 "config_version": constants.CONFIG_VERSION,
2712 "os_api_version": max(constants.OS_API_VERSIONS),
2713 "export_version": constants.EXPORT_VERSION,
2714 "architecture": (platform.architecture()[0], platform.machine()),
2715 "name": cluster.cluster_name,
2716 "master": cluster.master_node,
2717 "default_hypervisor": cluster.enabled_hypervisors[0],
2718 "enabled_hypervisors": cluster.enabled_hypervisors,
2719 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2720 for hypervisor_name in cluster.enabled_hypervisors]),
2721 "beparams": cluster.beparams,
2722 "nicparams": cluster.nicparams,
2723 "candidate_pool_size": cluster.candidate_pool_size,
2724 "master_netdev": cluster.master_netdev,
2725 "volume_group_name": cluster.volume_group_name,
2726 "file_storage_dir": cluster.file_storage_dir,
2732 class LUQueryConfigValues(NoHooksLU):
2733 """Return configuration values.
2738 _FIELDS_DYNAMIC = utils.FieldSet()
2739 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2741 def ExpandNames(self):
2742 self.needed_locks = {}
2744 _CheckOutputFields(static=self._FIELDS_STATIC,
2745 dynamic=self._FIELDS_DYNAMIC,
2746 selected=self.op.output_fields)
2748 def CheckPrereq(self):
2749 """No prerequisites.
2754 def Exec(self, feedback_fn):
2755 """Dump a representation of the cluster config to the standard output.
2759 for field in self.op.output_fields:
2760 if field == "cluster_name":
2761 entry = self.cfg.GetClusterName()
2762 elif field == "master_node":
2763 entry = self.cfg.GetMasterNode()
2764 elif field == "drain_flag":
2765 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2767 raise errors.ParameterError(field)
2768 values.append(entry)
2772 class LUActivateInstanceDisks(NoHooksLU):
2773 """Bring up an instance's disks.
2776 _OP_REQP = ["instance_name"]
2779 def ExpandNames(self):
2780 self._ExpandAndLockInstance()
2781 self.needed_locks[locking.LEVEL_NODE] = []
2782 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2784 def DeclareLocks(self, level):
2785 if level == locking.LEVEL_NODE:
2786 self._LockInstancesNodes()
2788 def CheckPrereq(self):
2789 """Check prerequisites.
2791 This checks that the instance is in the cluster.
2794 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2795 assert self.instance is not None, \
2796 "Cannot retrieve locked instance %s" % self.op.instance_name
2797 _CheckNodeOnline(self, self.instance.primary_node)
2799 def Exec(self, feedback_fn):
2800 """Activate the disks.
2803 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2805 raise errors.OpExecError("Cannot activate block devices")
2810 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2811 """Prepare the block devices for an instance.
2813 This sets up the block devices on all nodes.
2815 @type lu: L{LogicalUnit}
2816 @param lu: the logical unit on whose behalf we execute
2817 @type instance: L{objects.Instance}
2818 @param instance: the instance for whose disks we assemble
2819 @type ignore_secondaries: boolean
2820 @param ignore_secondaries: if true, errors on secondary nodes
2821 won't result in an error return from the function
2822 @return: False if the operation failed, otherwise a list of
2823 (host, instance_visible_name, node_visible_name)
2824 with the mapping from node devices to instance devices
2829 iname = instance.name
2830 # With the two passes mechanism we try to reduce the window of
2831 # opportunity for the race condition of switching DRBD to primary
2832 # before handshaking occured, but we do not eliminate it
2834 # The proper fix would be to wait (with some limits) until the
2835 # connection has been made and drbd transitions from WFConnection
2836 # into any other network-connected state (Connected, SyncTarget,
2839 # 1st pass, assemble on all nodes in secondary mode
2840 for inst_disk in instance.disks:
2841 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2842 lu.cfg.SetDiskID(node_disk, node)
2843 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2844 msg = result.fail_msg
2846 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2847 " (is_primary=False, pass=1): %s",
2848 inst_disk.iv_name, node, msg)
2849 if not ignore_secondaries:
2852 # FIXME: race condition on drbd migration to primary
2854 # 2nd pass, do only the primary node
2855 for inst_disk in instance.disks:
2856 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2857 if node != instance.primary_node:
2859 lu.cfg.SetDiskID(node_disk, node)
2860 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2861 msg = result.fail_msg
2863 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2864 " (is_primary=True, pass=2): %s",
2865 inst_disk.iv_name, node, msg)
2867 device_info.append((instance.primary_node, inst_disk.iv_name,
2870 # leave the disks configured for the primary node
2871 # this is a workaround that would be fixed better by
2872 # improving the logical/physical id handling
2873 for disk in instance.disks:
2874 lu.cfg.SetDiskID(disk, instance.primary_node)
2876 return disks_ok, device_info
2879 def _StartInstanceDisks(lu, instance, force):
2880 """Start the disks of an instance.
2883 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2884 ignore_secondaries=force)
2886 _ShutdownInstanceDisks(lu, instance)
2887 if force is not None and not force:
2888 lu.proc.LogWarning("", hint="If the message above refers to a"
2890 " you can retry the operation using '--force'.")
2891 raise errors.OpExecError("Disk consistency error")
2894 class LUDeactivateInstanceDisks(NoHooksLU):
2895 """Shutdown an instance's disks.
2898 _OP_REQP = ["instance_name"]
2901 def ExpandNames(self):
2902 self._ExpandAndLockInstance()
2903 self.needed_locks[locking.LEVEL_NODE] = []
2904 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2906 def DeclareLocks(self, level):
2907 if level == locking.LEVEL_NODE:
2908 self._LockInstancesNodes()
2910 def CheckPrereq(self):
2911 """Check prerequisites.
2913 This checks that the instance is in the cluster.
2916 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2917 assert self.instance is not None, \
2918 "Cannot retrieve locked instance %s" % self.op.instance_name
2920 def Exec(self, feedback_fn):
2921 """Deactivate the disks
2924 instance = self.instance
2925 _SafeShutdownInstanceDisks(self, instance)
2928 def _SafeShutdownInstanceDisks(lu, instance):
2929 """Shutdown block devices of an instance.
2931 This function checks if an instance is running, before calling
2932 _ShutdownInstanceDisks.
2935 pnode = instance.primary_node
2936 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2937 ins_l.Raise("Can't contact node %s" % pnode)
2939 if instance.name in ins_l.payload:
2940 raise errors.OpExecError("Instance is running, can't shutdown"
2943 _ShutdownInstanceDisks(lu, instance)
2946 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2947 """Shutdown block devices of an instance.
2949 This does the shutdown on all nodes of the instance.
2951 If the ignore_primary is false, errors on the primary node are
2956 for disk in instance.disks:
2957 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2958 lu.cfg.SetDiskID(top_disk, node)
2959 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2960 msg = result.fail_msg
2962 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2963 disk.iv_name, node, msg)
2964 if not ignore_primary or node != instance.primary_node:
2969 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2970 """Checks if a node has enough free memory.
2972 This function check if a given node has the needed amount of free
2973 memory. In case the node has less memory or we cannot get the
2974 information from the node, this function raise an OpPrereqError
2977 @type lu: C{LogicalUnit}
2978 @param lu: a logical unit from which we get configuration data
2980 @param node: the node to check
2981 @type reason: C{str}
2982 @param reason: string to use in the error message
2983 @type requested: C{int}
2984 @param requested: the amount of memory in MiB to check for
2985 @type hypervisor_name: C{str}
2986 @param hypervisor_name: the hypervisor to ask for memory stats
2987 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2988 we cannot check the node
2991 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2992 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2993 free_mem = nodeinfo[node].payload.get('memory_free', None)
2994 if not isinstance(free_mem, int):
2995 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2996 " was '%s'" % (node, free_mem))
2997 if requested > free_mem:
2998 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2999 " needed %s MiB, available %s MiB" %
3000 (node, reason, requested, free_mem))
3003 class LUStartupInstance(LogicalUnit):
3004 """Starts an instance.
3007 HPATH = "instance-start"
3008 HTYPE = constants.HTYPE_INSTANCE
3009 _OP_REQP = ["instance_name", "force"]
3012 def ExpandNames(self):
3013 self._ExpandAndLockInstance()
3015 def BuildHooksEnv(self):
3018 This runs on master, primary and secondary nodes of the instance.
3022 "FORCE": self.op.force,
3024 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3025 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3028 def CheckPrereq(self):
3029 """Check prerequisites.
3031 This checks that the instance is in the cluster.
3034 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3035 assert self.instance is not None, \
3036 "Cannot retrieve locked instance %s" % self.op.instance_name
3039 self.beparams = getattr(self.op, "beparams", {})
3041 if not isinstance(self.beparams, dict):
3042 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3043 " dict" % (type(self.beparams), ))
3044 # fill the beparams dict
3045 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3046 self.op.beparams = self.beparams
3049 self.hvparams = getattr(self.op, "hvparams", {})
3051 if not isinstance(self.hvparams, dict):
3052 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3053 " dict" % (type(self.hvparams), ))
3055 # check hypervisor parameter syntax (locally)
3056 cluster = self.cfg.GetClusterInfo()
3057 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3058 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3060 filled_hvp.update(self.hvparams)
3061 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3062 hv_type.CheckParameterSyntax(filled_hvp)
3063 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3064 self.op.hvparams = self.hvparams
3066 _CheckNodeOnline(self, instance.primary_node)
3068 bep = self.cfg.GetClusterInfo().FillBE(instance)
3069 # check bridges existence
3070 _CheckInstanceBridgesExist(self, instance)
3072 remote_info = self.rpc.call_instance_info(instance.primary_node,
3074 instance.hypervisor)
3075 remote_info.Raise("Error checking node %s" % instance.primary_node,
3077 if not remote_info.payload: # not running already
3078 _CheckNodeFreeMemory(self, instance.primary_node,
3079 "starting instance %s" % instance.name,
3080 bep[constants.BE_MEMORY], instance.hypervisor)
3082 def Exec(self, feedback_fn):
3083 """Start the instance.
3086 instance = self.instance
3087 force = self.op.force
3089 self.cfg.MarkInstanceUp(instance.name)
3091 node_current = instance.primary_node
3093 _StartInstanceDisks(self, instance, force)
3095 result = self.rpc.call_instance_start(node_current, instance,
3096 self.hvparams, self.beparams)
3097 msg = result.fail_msg
3099 _ShutdownInstanceDisks(self, instance)
3100 raise errors.OpExecError("Could not start instance: %s" % msg)
3103 class LURebootInstance(LogicalUnit):
3104 """Reboot an instance.
3107 HPATH = "instance-reboot"
3108 HTYPE = constants.HTYPE_INSTANCE
3109 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3112 def ExpandNames(self):
3113 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3114 constants.INSTANCE_REBOOT_HARD,
3115 constants.INSTANCE_REBOOT_FULL]:
3116 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3117 (constants.INSTANCE_REBOOT_SOFT,
3118 constants.INSTANCE_REBOOT_HARD,
3119 constants.INSTANCE_REBOOT_FULL))
3120 self._ExpandAndLockInstance()
3122 def BuildHooksEnv(self):
3125 This runs on master, primary and secondary nodes of the instance.
3129 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3130 "REBOOT_TYPE": self.op.reboot_type,
3132 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3133 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3136 def CheckPrereq(self):
3137 """Check prerequisites.
3139 This checks that the instance is in the cluster.
3142 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3143 assert self.instance is not None, \
3144 "Cannot retrieve locked instance %s" % self.op.instance_name
3146 _CheckNodeOnline(self, instance.primary_node)
3148 # check bridges existence
3149 _CheckInstanceBridgesExist(self, instance)
3151 def Exec(self, feedback_fn):
3152 """Reboot the instance.
3155 instance = self.instance
3156 ignore_secondaries = self.op.ignore_secondaries
3157 reboot_type = self.op.reboot_type
3159 node_current = instance.primary_node
3161 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3162 constants.INSTANCE_REBOOT_HARD]:
3163 for disk in instance.disks:
3164 self.cfg.SetDiskID(disk, node_current)
3165 result = self.rpc.call_instance_reboot(node_current, instance,
3167 result.Raise("Could not reboot instance")
3169 result = self.rpc.call_instance_shutdown(node_current, instance)
3170 result.Raise("Could not shutdown instance for full reboot")
3171 _ShutdownInstanceDisks(self, instance)
3172 _StartInstanceDisks(self, instance, ignore_secondaries)
3173 result = self.rpc.call_instance_start(node_current, instance, None, None)
3174 msg = result.fail_msg
3176 _ShutdownInstanceDisks(self, instance)
3177 raise errors.OpExecError("Could not start instance for"
3178 " full reboot: %s" % msg)
3180 self.cfg.MarkInstanceUp(instance.name)
3183 class LUShutdownInstance(LogicalUnit):
3184 """Shutdown an instance.
3187 HPATH = "instance-stop"
3188 HTYPE = constants.HTYPE_INSTANCE
3189 _OP_REQP = ["instance_name"]
3192 def ExpandNames(self):
3193 self._ExpandAndLockInstance()
3195 def BuildHooksEnv(self):
3198 This runs on master, primary and secondary nodes of the instance.
3201 env = _BuildInstanceHookEnvByObject(self, self.instance)
3202 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3205 def CheckPrereq(self):
3206 """Check prerequisites.
3208 This checks that the instance is in the cluster.
3211 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3212 assert self.instance is not None, \
3213 "Cannot retrieve locked instance %s" % self.op.instance_name
3214 _CheckNodeOnline(self, self.instance.primary_node)
3216 def Exec(self, feedback_fn):
3217 """Shutdown the instance.
3220 instance = self.instance
3221 node_current = instance.primary_node
3222 self.cfg.MarkInstanceDown(instance.name)
3223 result = self.rpc.call_instance_shutdown(node_current, instance)
3224 msg = result.fail_msg
3226 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3228 _ShutdownInstanceDisks(self, instance)
3231 class LUReinstallInstance(LogicalUnit):
3232 """Reinstall an instance.
3235 HPATH = "instance-reinstall"
3236 HTYPE = constants.HTYPE_INSTANCE
3237 _OP_REQP = ["instance_name"]
3240 def ExpandNames(self):
3241 self._ExpandAndLockInstance()
3243 def BuildHooksEnv(self):
3246 This runs on master, primary and secondary nodes of the instance.
3249 env = _BuildInstanceHookEnvByObject(self, self.instance)
3250 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3253 def CheckPrereq(self):
3254 """Check prerequisites.
3256 This checks that the instance is in the cluster and is not running.
3259 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3260 assert instance is not None, \
3261 "Cannot retrieve locked instance %s" % self.op.instance_name
3262 _CheckNodeOnline(self, instance.primary_node)
3264 if instance.disk_template == constants.DT_DISKLESS:
3265 raise errors.OpPrereqError("Instance '%s' has no disks" %
3266 self.op.instance_name)
3267 if instance.admin_up:
3268 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3269 self.op.instance_name)
3270 remote_info = self.rpc.call_instance_info(instance.primary_node,
3272 instance.hypervisor)
3273 remote_info.Raise("Error checking node %s" % instance.primary_node,
3275 if remote_info.payload:
3276 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3277 (self.op.instance_name,
3278 instance.primary_node))
3280 self.op.os_type = getattr(self.op, "os_type", None)
3281 if self.op.os_type is not None:
3283 pnode = self.cfg.GetNodeInfo(
3284 self.cfg.ExpandNodeName(instance.primary_node))
3286 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3288 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3289 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3290 (self.op.os_type, pnode.name), prereq=True)
3292 self.instance = instance
3294 def Exec(self, feedback_fn):
3295 """Reinstall the instance.
3298 inst = self.instance
3300 if self.op.os_type is not None:
3301 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3302 inst.os = self.op.os_type
3303 self.cfg.Update(inst)
3305 _StartInstanceDisks(self, inst, None)
3307 feedback_fn("Running the instance OS create scripts...")
3308 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3309 result.Raise("Could not install OS for instance %s on node %s" %
3310 (inst.name, inst.primary_node))
3312 _ShutdownInstanceDisks(self, inst)
3315 class LURenameInstance(LogicalUnit):
3316 """Rename an instance.
3319 HPATH = "instance-rename"
3320 HTYPE = constants.HTYPE_INSTANCE
3321 _OP_REQP = ["instance_name", "new_name"]
3323 def BuildHooksEnv(self):
3326 This runs on master, primary and secondary nodes of the instance.
3329 env = _BuildInstanceHookEnvByObject(self, self.instance)
3330 env["INSTANCE_NEW_NAME"] = self.op.new_name
3331 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3334 def CheckPrereq(self):
3335 """Check prerequisites.
3337 This checks that the instance is in the cluster and is not running.
3340 instance = self.cfg.GetInstanceInfo(
3341 self.cfg.ExpandInstanceName(self.op.instance_name))
3342 if instance is None:
3343 raise errors.OpPrereqError("Instance '%s' not known" %
3344 self.op.instance_name)
3345 _CheckNodeOnline(self, instance.primary_node)
3347 if instance.admin_up:
3348 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3349 self.op.instance_name)
3350 remote_info = self.rpc.call_instance_info(instance.primary_node,
3352 instance.hypervisor)
3353 remote_info.Raise("Error checking node %s" % instance.primary_node,
3355 if remote_info.payload:
3356 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3357 (self.op.instance_name,
3358 instance.primary_node))
3359 self.instance = instance
3361 # new name verification
3362 name_info = utils.HostInfo(self.op.new_name)
3364 self.op.new_name = new_name = name_info.name
3365 instance_list = self.cfg.GetInstanceList()
3366 if new_name in instance_list:
3367 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3370 if not getattr(self.op, "ignore_ip", False):
3371 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3372 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3373 (name_info.ip, new_name))
3376 def Exec(self, feedback_fn):
3377 """Reinstall the instance.
3380 inst = self.instance
3381 old_name = inst.name
3383 if inst.disk_template == constants.DT_FILE:
3384 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3386 self.cfg.RenameInstance(inst.name, self.op.new_name)
3387 # Change the instance lock. This is definitely safe while we hold the BGL
3388 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3389 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3391 # re-read the instance from the configuration after rename
3392 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3394 if inst.disk_template == constants.DT_FILE:
3395 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3396 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3397 old_file_storage_dir,
3398 new_file_storage_dir)
3399 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3400 " (but the instance has been renamed in Ganeti)" %
3401 (inst.primary_node, old_file_storage_dir,
3402 new_file_storage_dir))
3404 _StartInstanceDisks(self, inst, None)
3406 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3408 msg = result.fail_msg
3410 msg = ("Could not run OS rename script for instance %s on node %s"
3411 " (but the instance has been renamed in Ganeti): %s" %
3412 (inst.name, inst.primary_node, msg))
3413 self.proc.LogWarning(msg)
3415 _ShutdownInstanceDisks(self, inst)
3418 class LURemoveInstance(LogicalUnit):
3419 """Remove an instance.
3422 HPATH = "instance-remove"
3423 HTYPE = constants.HTYPE_INSTANCE
3424 _OP_REQP = ["instance_name", "ignore_failures"]
3427 def ExpandNames(self):
3428 self._ExpandAndLockInstance()
3429 self.needed_locks[locking.LEVEL_NODE] = []
3430 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3432 def DeclareLocks(self, level):
3433 if level == locking.LEVEL_NODE:
3434 self._LockInstancesNodes()
3436 def BuildHooksEnv(self):
3439 This runs on master, primary and secondary nodes of the instance.
3442 env = _BuildInstanceHookEnvByObject(self, self.instance)
3443 nl = [self.cfg.GetMasterNode()]
3446 def CheckPrereq(self):
3447 """Check prerequisites.
3449 This checks that the instance is in the cluster.
3452 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3453 assert self.instance is not None, \
3454 "Cannot retrieve locked instance %s" % self.op.instance_name
3456 def Exec(self, feedback_fn):
3457 """Remove the instance.
3460 instance = self.instance
3461 logging.info("Shutting down instance %s on node %s",
3462 instance.name, instance.primary_node)
3464 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3465 msg = result.fail_msg
3467 if self.op.ignore_failures:
3468 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3470 raise errors.OpExecError("Could not shutdown instance %s on"
3472 (instance.name, instance.primary_node, msg))
3474 logging.info("Removing block devices for instance %s", instance.name)
3476 if not _RemoveDisks(self, instance):
3477 if self.op.ignore_failures:
3478 feedback_fn("Warning: can't remove instance's disks")
3480 raise errors.OpExecError("Can't remove instance's disks")
3482 logging.info("Removing instance %s out of cluster config", instance.name)
3484 self.cfg.RemoveInstance(instance.name)
3485 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3488 class LUQueryInstances(NoHooksLU):
3489 """Logical unit for querying instances.
3492 _OP_REQP = ["output_fields", "names", "use_locking"]
3494 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3496 "disk_template", "ip", "mac", "bridge",
3497 "nic_mode", "nic_link",
3498 "sda_size", "sdb_size", "vcpus", "tags",
3499 "network_port", "beparams",
3500 r"(disk)\.(size)/([0-9]+)",
3501 r"(disk)\.(sizes)", "disk_usage",
3502 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3503 r"(nic)\.(bridge)/([0-9]+)",
3504 r"(nic)\.(macs|ips|modes|links|bridges)",
3505 r"(disk|nic)\.(count)",
3506 "serial_no", "hypervisor", "hvparams",] +
3508 for name in constants.HVS_PARAMETERS] +
3510 for name in constants.BES_PARAMETERS])
3511 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3514 def ExpandNames(self):
3515 _CheckOutputFields(static=self._FIELDS_STATIC,
3516 dynamic=self._FIELDS_DYNAMIC,
3517 selected=self.op.output_fields)
3519 self.needed_locks = {}
3520 self.share_locks[locking.LEVEL_INSTANCE] = 1
3521 self.share_locks[locking.LEVEL_NODE] = 1
3524 self.wanted = _GetWantedInstances(self, self.op.names)
3526 self.wanted = locking.ALL_SET
3528 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3529 self.do_locking = self.do_node_query and self.op.use_locking
3531 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3532 self.needed_locks[locking.LEVEL_NODE] = []
3533 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3535 def DeclareLocks(self, level):
3536 if level == locking.LEVEL_NODE and self.do_locking:
3537 self._LockInstancesNodes()
3539 def CheckPrereq(self):
3540 """Check prerequisites.
3545 def Exec(self, feedback_fn):
3546 """Computes the list of nodes and their attributes.
3549 all_info = self.cfg.GetAllInstancesInfo()
3550 if self.wanted == locking.ALL_SET:
3551 # caller didn't specify instance names, so ordering is not important
3553 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3555 instance_names = all_info.keys()
3556 instance_names = utils.NiceSort(instance_names)
3558 # caller did specify names, so we must keep the ordering
3560 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3562 tgt_set = all_info.keys()
3563 missing = set(self.wanted).difference(tgt_set)
3565 raise errors.OpExecError("Some instances were removed before"
3566 " retrieving their data: %s" % missing)
3567 instance_names = self.wanted
3569 instance_list = [all_info[iname] for iname in instance_names]
3571 # begin data gathering
3573 nodes = frozenset([inst.primary_node for inst in instance_list])
3574 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3578 if self.do_node_query:
3580 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3582 result = node_data[name]
3584 # offline nodes will be in both lists
3585 off_nodes.append(name)
3586 if result.failed or result.fail_msg:
3587 bad_nodes.append(name)
3590 live_data.update(result.payload)
3591 # else no instance is alive
3593 live_data = dict([(name, {}) for name in instance_names])
3595 # end data gathering
3600 cluster = self.cfg.GetClusterInfo()
3601 for instance in instance_list:
3603 i_hv = cluster.FillHV(instance)
3604 i_be = cluster.FillBE(instance)
3605 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3606 nic.nicparams) for nic in instance.nics]
3607 for field in self.op.output_fields:
3608 st_match = self._FIELDS_STATIC.Matches(field)
3613 elif field == "pnode":
3614 val = instance.primary_node
3615 elif field == "snodes":
3616 val = list(instance.secondary_nodes)
3617 elif field == "admin_state":
3618 val = instance.admin_up
3619 elif field == "oper_state":
3620 if instance.primary_node in bad_nodes:
3623 val = bool(live_data.get(instance.name))
3624 elif field == "status":
3625 if instance.primary_node in off_nodes:
3626 val = "ERROR_nodeoffline"
3627 elif instance.primary_node in bad_nodes:
3628 val = "ERROR_nodedown"
3630 running = bool(live_data.get(instance.name))
3632 if instance.admin_up:
3637 if instance.admin_up:
3641 elif field == "oper_ram":
3642 if instance.primary_node in bad_nodes:
3644 elif instance.name in live_data:
3645 val = live_data[instance.name].get("memory", "?")
3648 elif field == "vcpus":
3649 val = i_be[constants.BE_VCPUS]
3650 elif field == "disk_template":
3651 val = instance.disk_template
3654 val = instance.nics[0].ip
3657 elif field == "nic_mode":
3659 val = i_nicp[0][constants.NIC_MODE]
3662 elif field == "nic_link":
3664 val = i_nicp[0][constants.NIC_LINK]
3667 elif field == "bridge":
3668 if (instance.nics and
3669 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3670 val = i_nicp[0][constants.NIC_LINK]
3673 elif field == "mac":
3675 val = instance.nics[0].mac
3678 elif field == "sda_size" or field == "sdb_size":
3679 idx = ord(field[2]) - ord('a')
3681 val = instance.FindDisk(idx).size
3682 except errors.OpPrereqError:
3684 elif field == "disk_usage": # total disk usage per node
3685 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3686 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3687 elif field == "tags":
3688 val = list(instance.GetTags())
3689 elif field == "serial_no":
3690 val = instance.serial_no
3691 elif field == "network_port":
3692 val = instance.network_port
3693 elif field == "hypervisor":
3694 val = instance.hypervisor
3695 elif field == "hvparams":
3697 elif (field.startswith(HVPREFIX) and
3698 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3699 val = i_hv.get(field[len(HVPREFIX):], None)
3700 elif field == "beparams":
3702 elif (field.startswith(BEPREFIX) and
3703 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3704 val = i_be.get(field[len(BEPREFIX):], None)
3705 elif st_match and st_match.groups():
3706 # matches a variable list
3707 st_groups = st_match.groups()
3708 if st_groups and st_groups[0] == "disk":
3709 if st_groups[1] == "count":
3710 val = len(instance.disks)
3711 elif st_groups[1] == "sizes":
3712 val = [disk.size for disk in instance.disks]
3713 elif st_groups[1] == "size":
3715 val = instance.FindDisk(st_groups[2]).size
3716 except errors.OpPrereqError:
3719 assert False, "Unhandled disk parameter"
3720 elif st_groups[0] == "nic":
3721 if st_groups[1] == "count":
3722 val = len(instance.nics)
3723 elif st_groups[1] == "macs":
3724 val = [nic.mac for nic in instance.nics]
3725 elif st_groups[1] == "ips":
3726 val = [nic.ip for nic in instance.nics]
3727 elif st_groups[1] == "modes":
3728 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3729 elif st_groups[1] == "links":
3730 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3731 elif st_groups[1] == "bridges":
3734 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3735 val.append(nicp[constants.NIC_LINK])
3740 nic_idx = int(st_groups[2])
3741 if nic_idx >= len(instance.nics):
3744 if st_groups[1] == "mac":
3745 val = instance.nics[nic_idx].mac
3746 elif st_groups[1] == "ip":
3747 val = instance.nics[nic_idx].ip
3748 elif st_groups[1] == "mode":
3749 val = i_nicp[nic_idx][constants.NIC_MODE]
3750 elif st_groups[1] == "link":
3751 val = i_nicp[nic_idx][constants.NIC_LINK]
3752 elif st_groups[1] == "bridge":
3753 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3754 if nic_mode == constants.NIC_MODE_BRIDGED:
3755 val = i_nicp[nic_idx][constants.NIC_LINK]
3759 assert False, "Unhandled NIC parameter"
3761 assert False, ("Declared but unhandled variable parameter '%s'" %
3764 assert False, "Declared but unhandled parameter '%s'" % field
3771 class LUFailoverInstance(LogicalUnit):
3772 """Failover an instance.
3775 HPATH = "instance-failover"
3776 HTYPE = constants.HTYPE_INSTANCE
3777 _OP_REQP = ["instance_name", "ignore_consistency"]
3780 def ExpandNames(self):
3781 self._ExpandAndLockInstance()
3782 self.needed_locks[locking.LEVEL_NODE] = []
3783 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3785 def DeclareLocks(self, level):
3786 if level == locking.LEVEL_NODE:
3787 self._LockInstancesNodes()
3789 def BuildHooksEnv(self):
3792 This runs on master, primary and secondary nodes of the instance.
3796 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3798 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3799 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3802 def CheckPrereq(self):
3803 """Check prerequisites.
3805 This checks that the instance is in the cluster.
3808 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3809 assert self.instance is not None, \
3810 "Cannot retrieve locked instance %s" % self.op.instance_name
3812 bep = self.cfg.GetClusterInfo().FillBE(instance)
3813 if instance.disk_template not in constants.DTS_NET_MIRROR:
3814 raise errors.OpPrereqError("Instance's disk layout is not"
3815 " network mirrored, cannot failover.")
3817 secondary_nodes = instance.secondary_nodes
3818 if not secondary_nodes:
3819 raise errors.ProgrammerError("no secondary node but using "
3820 "a mirrored disk template")
3822 target_node = secondary_nodes[0]
3823 _CheckNodeOnline(self, target_node)
3824 _CheckNodeNotDrained(self, target_node)
3825 if instance.admin_up:
3826 # check memory requirements on the secondary node
3827 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3828 instance.name, bep[constants.BE_MEMORY],
3829 instance.hypervisor)
3831 self.LogInfo("Not checking memory on the secondary node as"
3832 " instance will not be started")
3834 # check bridge existance
3835 _CheckInstanceBridgesExist(self, instance, node=target_node)
3837 def Exec(self, feedback_fn):
3838 """Failover an instance.
3840 The failover is done by shutting it down on its present node and
3841 starting it on the secondary.
3844 instance = self.instance
3846 source_node = instance.primary_node
3847 target_node = instance.secondary_nodes[0]
3849 feedback_fn("* checking disk consistency between source and target")
3850 for dev in instance.disks:
3851 # for drbd, these are drbd over lvm
3852 if not _CheckDiskConsistency(self, dev, target_node, False):
3853 if instance.admin_up and not self.op.ignore_consistency:
3854 raise errors.OpExecError("Disk %s is degraded on target node,"
3855 " aborting failover." % dev.iv_name)
3857 feedback_fn("* shutting down instance on source node")
3858 logging.info("Shutting down instance %s on node %s",
3859 instance.name, source_node)
3861 result = self.rpc.call_instance_shutdown(source_node, instance)
3862 msg = result.fail_msg
3864 if self.op.ignore_consistency:
3865 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3866 " Proceeding anyway. Please make sure node"
3867 " %s is down. Error details: %s",
3868 instance.name, source_node, source_node, msg)
3870 raise errors.OpExecError("Could not shutdown instance %s on"
3872 (instance.name, source_node, msg))
3874 feedback_fn("* deactivating the instance's disks on source node")
3875 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3876 raise errors.OpExecError("Can't shut down the instance's disks.")
3878 instance.primary_node = target_node
3879 # distribute new instance config to the other nodes
3880 self.cfg.Update(instance)
3882 # Only start the instance if it's marked as up
3883 if instance.admin_up:
3884 feedback_fn("* activating the instance's disks on target node")
3885 logging.info("Starting instance %s on node %s",
3886 instance.name, target_node)
3888 disks_ok, _ = _AssembleInstanceDisks(self, instance,
3889 ignore_secondaries=True)
3891 _ShutdownInstanceDisks(self, instance)
3892 raise errors.OpExecError("Can't activate the instance's disks")
3894 feedback_fn("* starting the instance on the target node")
3895 result = self.rpc.call_instance_start(target_node, instance, None, None)
3896 msg = result.fail_msg
3898 _ShutdownInstanceDisks(self, instance)
3899 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3900 (instance.name, target_node, msg))
3903 class LUMigrateInstance(LogicalUnit):
3904 """Migrate an instance.
3906 This is migration without shutting down, compared to the failover,
3907 which is done with shutdown.
3910 HPATH = "instance-migrate"
3911 HTYPE = constants.HTYPE_INSTANCE
3912 _OP_REQP = ["instance_name", "live", "cleanup"]
3916 def ExpandNames(self):
3917 self._ExpandAndLockInstance()
3919 self.needed_locks[locking.LEVEL_NODE] = []
3920 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3922 self._migrater = TLMigrateInstance(self, self.op.instance_name,
3923 self.op.live, self.op.cleanup)
3924 self.tasklets = [self._migrater]
3926 def DeclareLocks(self, level):
3927 if level == locking.LEVEL_NODE:
3928 self._LockInstancesNodes()
3930 def BuildHooksEnv(self):
3933 This runs on master, primary and secondary nodes of the instance.
3936 instance = self._migrater.instance
3937 env = _BuildInstanceHookEnvByObject(self, instance)
3938 env["MIGRATE_LIVE"] = self.op.live
3939 env["MIGRATE_CLEANUP"] = self.op.cleanup
3940 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
3944 class TLMigrateInstance(Tasklet):
3945 def __init__(self, lu, instance_name, live, cleanup):
3946 """Initializes this class.
3949 Tasklet.__init__(self, lu)
3952 self.instance_name = instance_name
3954 self.cleanup = cleanup
3956 def CheckPrereq(self):
3957 """Check prerequisites.
3959 This checks that the instance is in the cluster.
3962 instance = self.cfg.GetInstanceInfo(
3963 self.cfg.ExpandInstanceName(self.instance_name))
3964 if instance is None:
3965 raise errors.OpPrereqError("Instance '%s' not known" %
3968 if instance.disk_template != constants.DT_DRBD8:
3969 raise errors.OpPrereqError("Instance's disk layout is not"
3970 " drbd8, cannot migrate.")
3972 secondary_nodes = instance.secondary_nodes
3973 if not secondary_nodes:
3974 raise errors.ConfigurationError("No secondary node but using"
3975 " drbd8 disk template")
3977 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3979 target_node = secondary_nodes[0]
3980 # check memory requirements on the secondary node
3981 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3982 instance.name, i_be[constants.BE_MEMORY],
3983 instance.hypervisor)
3985 # check bridge existance
3986 _CheckInstanceBridgesExist(self, instance, node=target_node)
3988 if not self.cleanup:
3989 _CheckNodeNotDrained(self, target_node)
3990 result = self.rpc.call_instance_migratable(instance.primary_node,
3992 result.Raise("Can't migrate, please use failover", prereq=True)
3994 self.instance = instance
3996 def _WaitUntilSync(self):
3997 """Poll with custom rpc for disk sync.
3999 This uses our own step-based rpc call.
4002 self.feedback_fn("* wait until resync is done")
4006 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4008 self.instance.disks)
4010 for node, nres in result.items():
4011 nres.Raise("Cannot resync disks on node %s" % node)
4012 node_done, node_percent = nres.payload
4013 all_done = all_done and node_done
4014 if node_percent is not None:
4015 min_percent = min(min_percent, node_percent)
4017 if min_percent < 100:
4018 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4021 def _EnsureSecondary(self, node):
4022 """Demote a node to secondary.
4025 self.feedback_fn("* switching node %s to secondary mode" % node)
4027 for dev in self.instance.disks:
4028 self.cfg.SetDiskID(dev, node)
4030 result = self.rpc.call_blockdev_close(node, self.instance.name,
4031 self.instance.disks)
4032 result.Raise("Cannot change disk to secondary on node %s" % node)
4034 def _GoStandalone(self):
4035 """Disconnect from the network.
4038 self.feedback_fn("* changing into standalone mode")
4039 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4040 self.instance.disks)
4041 for node, nres in result.items():
4042 nres.Raise("Cannot disconnect disks node %s" % node)
4044 def _GoReconnect(self, multimaster):
4045 """Reconnect to the network.
4051 msg = "single-master"
4052 self.feedback_fn("* changing disks into %s mode" % msg)
4053 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4054 self.instance.disks,
4055 self.instance.name, multimaster)
4056 for node, nres in result.items():
4057 nres.Raise("Cannot change disks config on node %s" % node)
4059 def _ExecCleanup(self):
4060 """Try to cleanup after a failed migration.
4062 The cleanup is done by:
4063 - check that the instance is running only on one node
4064 (and update the config if needed)
4065 - change disks on its secondary node to secondary
4066 - wait until disks are fully synchronized
4067 - disconnect from the network
4068 - change disks into single-master mode
4069 - wait again until disks are fully synchronized
4072 instance = self.instance
4073 target_node = self.target_node
4074 source_node = self.source_node
4076 # check running on only one node
4077 self.feedback_fn("* checking where the instance actually runs"
4078 " (if this hangs, the hypervisor might be in"
4080 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4081 for node, result in ins_l.items():
4082 result.Raise("Can't contact node %s" % node)
4084 runningon_source = instance.name in ins_l[source_node].payload
4085 runningon_target = instance.name in ins_l[target_node].payload
4087 if runningon_source and runningon_target:
4088 raise errors.OpExecError("Instance seems to be running on two nodes,"
4089 " or the hypervisor is confused. You will have"
4090 " to ensure manually that it runs only on one"
4091 " and restart this operation.")
4093 if not (runningon_source or runningon_target):
4094 raise errors.OpExecError("Instance does not seem to be running at all."
4095 " In this case, it's safer to repair by"
4096 " running 'gnt-instance stop' to ensure disk"
4097 " shutdown, and then restarting it.")
4099 if runningon_target:
4100 # the migration has actually succeeded, we need to update the config
4101 self.feedback_fn("* instance running on secondary node (%s),"
4102 " updating config" % target_node)
4103 instance.primary_node = target_node
4104 self.cfg.Update(instance)
4105 demoted_node = source_node
4107 self.feedback_fn("* instance confirmed to be running on its"
4108 " primary node (%s)" % source_node)
4109 demoted_node = target_node
4111 self._EnsureSecondary(demoted_node)
4113 self._WaitUntilSync()
4114 except errors.OpExecError:
4115 # we ignore here errors, since if the device is standalone, it
4116 # won't be able to sync
4118 self._GoStandalone()
4119 self._GoReconnect(False)
4120 self._WaitUntilSync()
4122 self.feedback_fn("* done")
4124 def _RevertDiskStatus(self):
4125 """Try to revert the disk status after a failed migration.
4128 target_node = self.target_node
4130 self._EnsureSecondary(target_node)
4131 self._GoStandalone()
4132 self._GoReconnect(False)
4133 self._WaitUntilSync()
4134 except errors.OpExecError, err:
4135 self.lu.LogWarning("Migration failed and I can't reconnect the"
4136 " drives: error '%s'\n"
4137 "Please look and recover the instance status" %
4140 def _AbortMigration(self):
4141 """Call the hypervisor code to abort a started migration.
4144 instance = self.instance
4145 target_node = self.target_node
4146 migration_info = self.migration_info
4148 abort_result = self.rpc.call_finalize_migration(target_node,
4152 abort_msg = abort_result.fail_msg
4154 logging.error("Aborting migration failed on target node %s: %s" %
4155 (target_node, abort_msg))
4156 # Don't raise an exception here, as we stil have to try to revert the
4157 # disk status, even if this step failed.
4159 def _ExecMigration(self):
4160 """Migrate an instance.
4162 The migrate is done by:
4163 - change the disks into dual-master mode
4164 - wait until disks are fully synchronized again
4165 - migrate the instance
4166 - change disks on the new secondary node (the old primary) to secondary
4167 - wait until disks are fully synchronized
4168 - change disks into single-master mode
4171 instance = self.instance
4172 target_node = self.target_node
4173 source_node = self.source_node
4175 self.feedback_fn("* checking disk consistency between source and target")
4176 for dev in instance.disks:
4177 if not _CheckDiskConsistency(self, dev, target_node, False):
4178 raise errors.OpExecError("Disk %s is degraded or not fully"
4179 " synchronized on target node,"
4180 " aborting migrate." % dev.iv_name)
4182 # First get the migration information from the remote node
4183 result = self.rpc.call_migration_info(source_node, instance)
4184 msg = result.fail_msg
4186 log_err = ("Failed fetching source migration information from %s: %s" %
4188 logging.error(log_err)
4189 raise errors.OpExecError(log_err)
4191 self.migration_info = migration_info = result.payload
4193 # Then switch the disks to master/master mode
4194 self._EnsureSecondary(target_node)
4195 self._GoStandalone()
4196 self._GoReconnect(True)
4197 self._WaitUntilSync()
4199 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4200 result = self.rpc.call_accept_instance(target_node,
4203 self.nodes_ip[target_node])
4205 msg = result.fail_msg
4207 logging.error("Instance pre-migration failed, trying to revert"
4208 " disk status: %s", msg)
4209 self._AbortMigration()
4210 self._RevertDiskStatus()
4211 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4212 (instance.name, msg))
4214 self.feedback_fn("* migrating instance to %s" % target_node)
4216 result = self.rpc.call_instance_migrate(source_node, instance,
4217 self.nodes_ip[target_node],
4219 msg = result.fail_msg
4221 logging.error("Instance migration failed, trying to revert"
4222 " disk status: %s", msg)
4223 self._AbortMigration()
4224 self._RevertDiskStatus()
4225 raise errors.OpExecError("Could not migrate instance %s: %s" %
4226 (instance.name, msg))
4229 instance.primary_node = target_node
4230 # distribute new instance config to the other nodes
4231 self.cfg.Update(instance)
4233 result = self.rpc.call_finalize_migration(target_node,
4237 msg = result.fail_msg
4239 logging.error("Instance migration succeeded, but finalization failed:"
4241 raise errors.OpExecError("Could not finalize instance migration: %s" %
4244 self._EnsureSecondary(source_node)
4245 self._WaitUntilSync()
4246 self._GoStandalone()
4247 self._GoReconnect(False)
4248 self._WaitUntilSync()
4250 self.feedback_fn("* done")
4252 def Exec(self, feedback_fn):
4253 """Perform the migration.
4256 self.feedback_fn = feedback_fn
4258 self.source_node = self.instance.primary_node
4259 self.target_node = self.instance.secondary_nodes[0]
4260 self.all_nodes = [self.source_node, self.target_node]
4262 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4263 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4267 return self._ExecCleanup()
4269 return self._ExecMigration()
4272 def _CreateBlockDev(lu, node, instance, device, force_create,
4274 """Create a tree of block devices on a given node.
4276 If this device type has to be created on secondaries, create it and
4279 If not, just recurse to children keeping the same 'force' value.
4281 @param lu: the lu on whose behalf we execute
4282 @param node: the node on which to create the device
4283 @type instance: L{objects.Instance}
4284 @param instance: the instance which owns the device
4285 @type device: L{objects.Disk}
4286 @param device: the device to create
4287 @type force_create: boolean
4288 @param force_create: whether to force creation of this device; this
4289 will be change to True whenever we find a device which has
4290 CreateOnSecondary() attribute
4291 @param info: the extra 'metadata' we should attach to the device
4292 (this will be represented as a LVM tag)
4293 @type force_open: boolean
4294 @param force_open: this parameter will be passes to the
4295 L{backend.BlockdevCreate} function where it specifies
4296 whether we run on primary or not, and it affects both
4297 the child assembly and the device own Open() execution
4300 if device.CreateOnSecondary():
4304 for child in device.children:
4305 _CreateBlockDev(lu, node, instance, child, force_create,
4308 if not force_create:
4311 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4314 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4315 """Create a single block device on a given node.
4317 This will not recurse over children of the device, so they must be
4320 @param lu: the lu on whose behalf we execute
4321 @param node: the node on which to create the device
4322 @type instance: L{objects.Instance}
4323 @param instance: the instance which owns the device
4324 @type device: L{objects.Disk}
4325 @param device: the device to create
4326 @param info: the extra 'metadata' we should attach to the device
4327 (this will be represented as a LVM tag)
4328 @type force_open: boolean
4329 @param force_open: this parameter will be passes to the
4330 L{backend.BlockdevCreate} function where it specifies
4331 whether we run on primary or not, and it affects both
4332 the child assembly and the device own Open() execution
4335 lu.cfg.SetDiskID(device, node)
4336 result = lu.rpc.call_blockdev_create(node, device, device.size,
4337 instance.name, force_open, info)
4338 result.Raise("Can't create block device %s on"
4339 " node %s for instance %s" % (device, node, instance.name))
4340 if device.physical_id is None:
4341 device.physical_id = result.payload
4344 def _GenerateUniqueNames(lu, exts):
4345 """Generate a suitable LV name.
4347 This will generate a logical volume name for the given instance.
4352 new_id = lu.cfg.GenerateUniqueID()
4353 results.append("%s%s" % (new_id, val))
4357 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4359 """Generate a drbd8 device complete with its children.
4362 port = lu.cfg.AllocatePort()
4363 vgname = lu.cfg.GetVGName()
4364 shared_secret = lu.cfg.GenerateDRBDSecret()
4365 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4366 logical_id=(vgname, names[0]))
4367 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4368 logical_id=(vgname, names[1]))
4369 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4370 logical_id=(primary, secondary, port,
4373 children=[dev_data, dev_meta],
4378 def _GenerateDiskTemplate(lu, template_name,
4379 instance_name, primary_node,
4380 secondary_nodes, disk_info,
4381 file_storage_dir, file_driver,
4383 """Generate the entire disk layout for a given template type.
4386 #TODO: compute space requirements
4388 vgname = lu.cfg.GetVGName()
4389 disk_count = len(disk_info)
4391 if template_name == constants.DT_DISKLESS:
4393 elif template_name == constants.DT_PLAIN:
4394 if len(secondary_nodes) != 0:
4395 raise errors.ProgrammerError("Wrong template configuration")
4397 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4398 for i in range(disk_count)])
4399 for idx, disk in enumerate(disk_info):
4400 disk_index = idx + base_index
4401 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4402 logical_id=(vgname, names[idx]),
4403 iv_name="disk/%d" % disk_index,
4405 disks.append(disk_dev)
4406 elif template_name == constants.DT_DRBD8:
4407 if len(secondary_nodes) != 1:
4408 raise errors.ProgrammerError("Wrong template configuration")
4409 remote_node = secondary_nodes[0]
4410 minors = lu.cfg.AllocateDRBDMinor(
4411 [primary_node, remote_node] * len(disk_info), instance_name)
4414 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4415 for i in range(disk_count)]):
4416 names.append(lv_prefix + "_data")
4417 names.append(lv_prefix + "_meta")
4418 for idx, disk in enumerate(disk_info):
4419 disk_index = idx + base_index
4420 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4421 disk["size"], names[idx*2:idx*2+2],
4422 "disk/%d" % disk_index,
4423 minors[idx*2], minors[idx*2+1])
4424 disk_dev.mode = disk["mode"]
4425 disks.append(disk_dev)
4426 elif template_name == constants.DT_FILE:
4427 if len(secondary_nodes) != 0:
4428 raise errors.ProgrammerError("Wrong template configuration")
4430 for idx, disk in enumerate(disk_info):
4431 disk_index = idx + base_index
4432 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4433 iv_name="disk/%d" % disk_index,
4434 logical_id=(file_driver,
4435 "%s/disk%d" % (file_storage_dir,
4438 disks.append(disk_dev)
4440 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4444 def _GetInstanceInfoText(instance):
4445 """Compute that text that should be added to the disk's metadata.
4448 return "originstname+%s" % instance.name
4451 def _CreateDisks(lu, instance):
4452 """Create all disks for an instance.
4454 This abstracts away some work from AddInstance.
4456 @type lu: L{LogicalUnit}
4457 @param lu: the logical unit on whose behalf we execute
4458 @type instance: L{objects.Instance}
4459 @param instance: the instance whose disks we should create
4461 @return: the success of the creation
4464 info = _GetInstanceInfoText(instance)
4465 pnode = instance.primary_node
4467 if instance.disk_template == constants.DT_FILE:
4468 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4469 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4471 result.Raise("Failed to create directory '%s' on"
4472 " node %s: %s" % (file_storage_dir, pnode))
4474 # Note: this needs to be kept in sync with adding of disks in
4475 # LUSetInstanceParams
4476 for device in instance.disks:
4477 logging.info("Creating volume %s for instance %s",
4478 device.iv_name, instance.name)
4480 for node in instance.all_nodes:
4481 f_create = node == pnode
4482 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4485 def _RemoveDisks(lu, instance):
4486 """Remove all disks for an instance.
4488 This abstracts away some work from `AddInstance()` and
4489 `RemoveInstance()`. Note that in case some of the devices couldn't
4490 be removed, the removal will continue with the other ones (compare
4491 with `_CreateDisks()`).
4493 @type lu: L{LogicalUnit}
4494 @param lu: the logical unit on whose behalf we execute
4495 @type instance: L{objects.Instance}
4496 @param instance: the instance whose disks we should remove
4498 @return: the success of the removal
4501 logging.info("Removing block devices for instance %s", instance.name)
4504 for device in instance.disks:
4505 for node, disk in device.ComputeNodeTree(instance.primary_node):
4506 lu.cfg.SetDiskID(disk, node)
4507 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4509 lu.LogWarning("Could not remove block device %s on node %s,"
4510 " continuing anyway: %s", device.iv_name, node, msg)
4513 if instance.disk_template == constants.DT_FILE:
4514 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4515 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4517 msg = result.fail_msg
4519 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4520 file_storage_dir, instance.primary_node, msg)
4526 def _ComputeDiskSize(disk_template, disks):
4527 """Compute disk size requirements in the volume group
4530 # Required free disk space as a function of disk and swap space
4532 constants.DT_DISKLESS: None,
4533 constants.DT_PLAIN: sum(d["size"] for d in disks),
4534 # 128 MB are added for drbd metadata for each disk
4535 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4536 constants.DT_FILE: None,
4539 if disk_template not in req_size_dict:
4540 raise errors.ProgrammerError("Disk template '%s' size requirement"
4541 " is unknown" % disk_template)
4543 return req_size_dict[disk_template]
4546 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4547 """Hypervisor parameter validation.
4549 This function abstract the hypervisor parameter validation to be
4550 used in both instance create and instance modify.
4552 @type lu: L{LogicalUnit}
4553 @param lu: the logical unit for which we check
4554 @type nodenames: list
4555 @param nodenames: the list of nodes on which we should check
4556 @type hvname: string
4557 @param hvname: the name of the hypervisor we should use
4558 @type hvparams: dict
4559 @param hvparams: the parameters which we need to check
4560 @raise errors.OpPrereqError: if the parameters are not valid
4563 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4566 for node in nodenames:
4570 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4573 class LUCreateInstance(LogicalUnit):
4574 """Create an instance.
4577 HPATH = "instance-add"
4578 HTYPE = constants.HTYPE_INSTANCE
4579 _OP_REQP = ["instance_name", "disks", "disk_template",
4581 "wait_for_sync", "ip_check", "nics",
4582 "hvparams", "beparams"]
4585 def _ExpandNode(self, node):
4586 """Expands and checks one node name.
4589 node_full = self.cfg.ExpandNodeName(node)
4590 if node_full is None:
4591 raise errors.OpPrereqError("Unknown node %s" % node)
4594 def ExpandNames(self):
4595 """ExpandNames for CreateInstance.
4597 Figure out the right locks for instance creation.
4600 self.needed_locks = {}
4602 # set optional parameters to none if they don't exist
4603 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4604 if not hasattr(self.op, attr):
4605 setattr(self.op, attr, None)
4607 # cheap checks, mostly valid constants given
4609 # verify creation mode
4610 if self.op.mode not in (constants.INSTANCE_CREATE,
4611 constants.INSTANCE_IMPORT):
4612 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4615 # disk template and mirror node verification
4616 if self.op.disk_template not in constants.DISK_TEMPLATES:
4617 raise errors.OpPrereqError("Invalid disk template name")
4619 if self.op.hypervisor is None:
4620 self.op.hypervisor = self.cfg.GetHypervisorType()
4622 cluster = self.cfg.GetClusterInfo()
4623 enabled_hvs = cluster.enabled_hypervisors
4624 if self.op.hypervisor not in enabled_hvs:
4625 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4626 " cluster (%s)" % (self.op.hypervisor,
4627 ",".join(enabled_hvs)))
4629 # check hypervisor parameter syntax (locally)
4630 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4631 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4633 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4634 hv_type.CheckParameterSyntax(filled_hvp)
4635 self.hv_full = filled_hvp
4637 # fill and remember the beparams dict
4638 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4639 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4642 #### instance parameters check
4644 # instance name verification
4645 hostname1 = utils.HostInfo(self.op.instance_name)
4646 self.op.instance_name = instance_name = hostname1.name
4648 # this is just a preventive check, but someone might still add this
4649 # instance in the meantime, and creation will fail at lock-add time
4650 if instance_name in self.cfg.GetInstanceList():
4651 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4654 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4658 for idx, nic in enumerate(self.op.nics):
4659 nic_mode_req = nic.get("mode", None)
4660 nic_mode = nic_mode_req
4661 if nic_mode is None:
4662 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4664 # in routed mode, for the first nic, the default ip is 'auto'
4665 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4666 default_ip_mode = constants.VALUE_AUTO
4668 default_ip_mode = constants.VALUE_NONE
4670 # ip validity checks
4671 ip = nic.get("ip", default_ip_mode)
4672 if ip is None or ip.lower() == constants.VALUE_NONE:
4674 elif ip.lower() == constants.VALUE_AUTO:
4675 nic_ip = hostname1.ip
4677 if not utils.IsValidIP(ip):
4678 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4679 " like a valid IP" % ip)
4682 # TODO: check the ip for uniqueness !!
4683 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4684 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4686 # MAC address verification
4687 mac = nic.get("mac", constants.VALUE_AUTO)
4688 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4689 if not utils.IsValidMac(mac.lower()):
4690 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4692 # bridge verification
4693 bridge = nic.get("bridge", None)
4694 link = nic.get("link", None)
4696 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4697 " at the same time")
4698 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4699 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4705 nicparams[constants.NIC_MODE] = nic_mode_req
4707 nicparams[constants.NIC_LINK] = link
4709 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4711 objects.NIC.CheckParameterSyntax(check_params)
4712 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4714 # disk checks/pre-build
4716 for disk in self.op.disks:
4717 mode = disk.get("mode", constants.DISK_RDWR)
4718 if mode not in constants.DISK_ACCESS_SET:
4719 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4721 size = disk.get("size", None)
4723 raise errors.OpPrereqError("Missing disk size")
4727 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4728 self.disks.append({"size": size, "mode": mode})
4730 # used in CheckPrereq for ip ping check
4731 self.check_ip = hostname1.ip
4733 # file storage checks
4734 if (self.op.file_driver and
4735 not self.op.file_driver in constants.FILE_DRIVER):
4736 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4737 self.op.file_driver)
4739 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4740 raise errors.OpPrereqError("File storage directory path not absolute")
4742 ### Node/iallocator related checks
4743 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4744 raise errors.OpPrereqError("One and only one of iallocator and primary"
4745 " node must be given")
4747 if self.op.iallocator:
4748 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4750 self.op.pnode = self._ExpandNode(self.op.pnode)
4751 nodelist = [self.op.pnode]
4752 if self.op.snode is not None:
4753 self.op.snode = self._ExpandNode(self.op.snode)
4754 nodelist.append(self.op.snode)
4755 self.needed_locks[locking.LEVEL_NODE] = nodelist
4757 # in case of import lock the source node too
4758 if self.op.mode == constants.INSTANCE_IMPORT:
4759 src_node = getattr(self.op, "src_node", None)
4760 src_path = getattr(self.op, "src_path", None)
4762 if src_path is None:
4763 self.op.src_path = src_path = self.op.instance_name
4765 if src_node is None:
4766 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4767 self.op.src_node = None
4768 if os.path.isabs(src_path):
4769 raise errors.OpPrereqError("Importing an instance from an absolute"
4770 " path requires a source node option.")
4772 self.op.src_node = src_node = self._ExpandNode(src_node)
4773 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4774 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4775 if not os.path.isabs(src_path):
4776 self.op.src_path = src_path = \
4777 os.path.join(constants.EXPORT_DIR, src_path)
4779 else: # INSTANCE_CREATE
4780 if getattr(self.op, "os_type", None) is None:
4781 raise errors.OpPrereqError("No guest OS specified")
4783 def _RunAllocator(self):
4784 """Run the allocator based on input opcode.
4787 nics = [n.ToDict() for n in self.nics]
4788 ial = IAllocator(self.cfg, self.rpc,
4789 mode=constants.IALLOCATOR_MODE_ALLOC,
4790 name=self.op.instance_name,
4791 disk_template=self.op.disk_template,
4794 vcpus=self.be_full[constants.BE_VCPUS],
4795 mem_size=self.be_full[constants.BE_MEMORY],
4798 hypervisor=self.op.hypervisor,
4801 ial.Run(self.op.iallocator)
4804 raise errors.OpPrereqError("Can't compute nodes using"
4805 " iallocator '%s': %s" % (self.op.iallocator,
4807 if len(ial.nodes) != ial.required_nodes:
4808 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4809 " of nodes (%s), required %s" %
4810 (self.op.iallocator, len(ial.nodes),
4811 ial.required_nodes))
4812 self.op.pnode = ial.nodes[0]
4813 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4814 self.op.instance_name, self.op.iallocator,
4815 ", ".join(ial.nodes))
4816 if ial.required_nodes == 2:
4817 self.op.snode = ial.nodes[1]
4819 def BuildHooksEnv(self):
4822 This runs on master, primary and secondary nodes of the instance.
4826 "ADD_MODE": self.op.mode,
4828 if self.op.mode == constants.INSTANCE_IMPORT:
4829 env["SRC_NODE"] = self.op.src_node
4830 env["SRC_PATH"] = self.op.src_path
4831 env["SRC_IMAGES"] = self.src_images
4833 env.update(_BuildInstanceHookEnv(
4834 name=self.op.instance_name,
4835 primary_node=self.op.pnode,
4836 secondary_nodes=self.secondaries,
4837 status=self.op.start,
4838 os_type=self.op.os_type,
4839 memory=self.be_full[constants.BE_MEMORY],
4840 vcpus=self.be_full[constants.BE_VCPUS],
4841 nics=_NICListToTuple(self, self.nics),
4842 disk_template=self.op.disk_template,
4843 disks=[(d["size"], d["mode"]) for d in self.disks],
4846 hypervisor_name=self.op.hypervisor,
4849 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4854 def CheckPrereq(self):
4855 """Check prerequisites.
4858 if (not self.cfg.GetVGName() and
4859 self.op.disk_template not in constants.DTS_NOT_LVM):
4860 raise errors.OpPrereqError("Cluster does not support lvm-based"
4863 if self.op.mode == constants.INSTANCE_IMPORT:
4864 src_node = self.op.src_node
4865 src_path = self.op.src_path
4867 if src_node is None:
4868 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4869 exp_list = self.rpc.call_export_list(locked_nodes)
4871 for node in exp_list:
4872 if exp_list[node].fail_msg:
4874 if src_path in exp_list[node].payload:
4876 self.op.src_node = src_node = node
4877 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4881 raise errors.OpPrereqError("No export found for relative path %s" %
4884 _CheckNodeOnline(self, src_node)
4885 result = self.rpc.call_export_info(src_node, src_path)
4886 result.Raise("No export or invalid export found in dir %s" % src_path)
4888 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4889 if not export_info.has_section(constants.INISECT_EXP):
4890 raise errors.ProgrammerError("Corrupted export config")
4892 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4893 if (int(ei_version) != constants.EXPORT_VERSION):
4894 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4895 (ei_version, constants.EXPORT_VERSION))
4897 # Check that the new instance doesn't have less disks than the export
4898 instance_disks = len(self.disks)
4899 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4900 if instance_disks < export_disks:
4901 raise errors.OpPrereqError("Not enough disks to import."
4902 " (instance: %d, export: %d)" %
4903 (instance_disks, export_disks))
4905 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4907 for idx in range(export_disks):
4908 option = 'disk%d_dump' % idx
4909 if export_info.has_option(constants.INISECT_INS, option):
4910 # FIXME: are the old os-es, disk sizes, etc. useful?
4911 export_name = export_info.get(constants.INISECT_INS, option)
4912 image = os.path.join(src_path, export_name)
4913 disk_images.append(image)
4915 disk_images.append(False)
4917 self.src_images = disk_images
4919 old_name = export_info.get(constants.INISECT_INS, 'name')
4920 # FIXME: int() here could throw a ValueError on broken exports
4921 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4922 if self.op.instance_name == old_name:
4923 for idx, nic in enumerate(self.nics):
4924 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4925 nic_mac_ini = 'nic%d_mac' % idx
4926 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4928 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4929 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4930 if self.op.start and not self.op.ip_check:
4931 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4932 " adding an instance in start mode")
4934 if self.op.ip_check:
4935 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4936 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4937 (self.check_ip, self.op.instance_name))
4939 #### mac address generation
4940 # By generating here the mac address both the allocator and the hooks get
4941 # the real final mac address rather than the 'auto' or 'generate' value.
4942 # There is a race condition between the generation and the instance object
4943 # creation, which means that we know the mac is valid now, but we're not
4944 # sure it will be when we actually add the instance. If things go bad
4945 # adding the instance will abort because of a duplicate mac, and the
4946 # creation job will fail.
4947 for nic in self.nics:
4948 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4949 nic.mac = self.cfg.GenerateMAC()
4953 if self.op.iallocator is not None:
4954 self._RunAllocator()
4956 #### node related checks
4958 # check primary node
4959 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4960 assert self.pnode is not None, \
4961 "Cannot retrieve locked node %s" % self.op.pnode
4963 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4966 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4969 self.secondaries = []
4971 # mirror node verification
4972 if self.op.disk_template in constants.DTS_NET_MIRROR:
4973 if self.op.snode is None:
4974 raise errors.OpPrereqError("The networked disk templates need"
4976 if self.op.snode == pnode.name:
4977 raise errors.OpPrereqError("The secondary node cannot be"
4978 " the primary node.")
4979 _CheckNodeOnline(self, self.op.snode)
4980 _CheckNodeNotDrained(self, self.op.snode)
4981 self.secondaries.append(self.op.snode)
4983 nodenames = [pnode.name] + self.secondaries
4985 req_size = _ComputeDiskSize(self.op.disk_template,
4988 # Check lv size requirements
4989 if req_size is not None:
4990 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4992 for node in nodenames:
4993 info = nodeinfo[node]
4994 info.Raise("Cannot get current information from node %s" % node)
4996 vg_free = info.get('vg_free', None)
4997 if not isinstance(vg_free, int):
4998 raise errors.OpPrereqError("Can't compute free disk space on"
5000 if req_size > vg_free:
5001 raise errors.OpPrereqError("Not enough disk space on target node %s."
5002 " %d MB available, %d MB required" %
5003 (node, vg_free, req_size))
5005 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5008 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5009 result.Raise("OS '%s' not in supported os list for primary node %s" %
5010 (self.op.os_type, pnode.name), prereq=True)
5012 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5014 # memory check on primary node
5016 _CheckNodeFreeMemory(self, self.pnode.name,
5017 "creating instance %s" % self.op.instance_name,
5018 self.be_full[constants.BE_MEMORY],
5021 self.dry_run_result = list(nodenames)
5023 def Exec(self, feedback_fn):
5024 """Create and add the instance to the cluster.
5027 instance = self.op.instance_name
5028 pnode_name = self.pnode.name
5030 ht_kind = self.op.hypervisor
5031 if ht_kind in constants.HTS_REQ_PORT:
5032 network_port = self.cfg.AllocatePort()
5036 ##if self.op.vnc_bind_address is None:
5037 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5039 # this is needed because os.path.join does not accept None arguments
5040 if self.op.file_storage_dir is None:
5041 string_file_storage_dir = ""
5043 string_file_storage_dir = self.op.file_storage_dir
5045 # build the full file storage dir path
5046 file_storage_dir = os.path.normpath(os.path.join(
5047 self.cfg.GetFileStorageDir(),
5048 string_file_storage_dir, instance))
5051 disks = _GenerateDiskTemplate(self,
5052 self.op.disk_template,
5053 instance, pnode_name,
5057 self.op.file_driver,
5060 iobj = objects.Instance(name=instance, os=self.op.os_type,
5061 primary_node=pnode_name,
5062 nics=self.nics, disks=disks,
5063 disk_template=self.op.disk_template,
5065 network_port=network_port,
5066 beparams=self.op.beparams,
5067 hvparams=self.op.hvparams,
5068 hypervisor=self.op.hypervisor,
5071 feedback_fn("* creating instance disks...")
5073 _CreateDisks(self, iobj)
5074 except errors.OpExecError:
5075 self.LogWarning("Device creation failed, reverting...")
5077 _RemoveDisks(self, iobj)
5079 self.cfg.ReleaseDRBDMinors(instance)
5082 feedback_fn("adding instance %s to cluster config" % instance)
5084 self.cfg.AddInstance(iobj)
5085 # Declare that we don't want to remove the instance lock anymore, as we've
5086 # added the instance to the config
5087 del self.remove_locks[locking.LEVEL_INSTANCE]
5088 # Unlock all the nodes
5089 if self.op.mode == constants.INSTANCE_IMPORT:
5090 nodes_keep = [self.op.src_node]
5091 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5092 if node != self.op.src_node]
5093 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5094 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5096 self.context.glm.release(locking.LEVEL_NODE)
5097 del self.acquired_locks[locking.LEVEL_NODE]
5099 if self.op.wait_for_sync:
5100 disk_abort = not _WaitForSync(self, iobj)
5101 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5102 # make sure the disks are not degraded (still sync-ing is ok)
5104 feedback_fn("* checking mirrors status")
5105 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5110 _RemoveDisks(self, iobj)
5111 self.cfg.RemoveInstance(iobj.name)
5112 # Make sure the instance lock gets removed
5113 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5114 raise errors.OpExecError("There are some degraded disks for"
5117 feedback_fn("creating os for instance %s on node %s" %
5118 (instance, pnode_name))
5120 if iobj.disk_template != constants.DT_DISKLESS:
5121 if self.op.mode == constants.INSTANCE_CREATE:
5122 feedback_fn("* running the instance OS create scripts...")
5123 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5124 result.Raise("Could not add os for instance %s"
5125 " on node %s" % (instance, pnode_name))
5127 elif self.op.mode == constants.INSTANCE_IMPORT:
5128 feedback_fn("* running the instance OS import scripts...")
5129 src_node = self.op.src_node
5130 src_images = self.src_images
5131 cluster_name = self.cfg.GetClusterName()
5132 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5133 src_node, src_images,
5135 msg = import_result.fail_msg
5137 self.LogWarning("Error while importing the disk images for instance"
5138 " %s on node %s: %s" % (instance, pnode_name, msg))
5140 # also checked in the prereq part
5141 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5145 iobj.admin_up = True
5146 self.cfg.Update(iobj)
5147 logging.info("Starting instance %s on node %s", instance, pnode_name)
5148 feedback_fn("* starting instance...")
5149 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5150 result.Raise("Could not start instance")
5152 return list(iobj.all_nodes)
5155 class LUConnectConsole(NoHooksLU):
5156 """Connect to an instance's console.
5158 This is somewhat special in that it returns the command line that
5159 you need to run on the master node in order to connect to the
5163 _OP_REQP = ["instance_name"]
5166 def ExpandNames(self):
5167 self._ExpandAndLockInstance()
5169 def CheckPrereq(self):
5170 """Check prerequisites.
5172 This checks that the instance is in the cluster.
5175 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5176 assert self.instance is not None, \
5177 "Cannot retrieve locked instance %s" % self.op.instance_name
5178 _CheckNodeOnline(self, self.instance.primary_node)
5180 def Exec(self, feedback_fn):
5181 """Connect to the console of an instance
5184 instance = self.instance
5185 node = instance.primary_node
5187 node_insts = self.rpc.call_instance_list([node],
5188 [instance.hypervisor])[node]
5189 node_insts.Raise("Can't get node information from %s" % node)
5191 if instance.name not in node_insts.payload:
5192 raise errors.OpExecError("Instance %s is not running." % instance.name)
5194 logging.debug("Connecting to console of %s on %s", instance.name, node)
5196 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5197 cluster = self.cfg.GetClusterInfo()
5198 # beparams and hvparams are passed separately, to avoid editing the
5199 # instance and then saving the defaults in the instance itself.
5200 hvparams = cluster.FillHV(instance)
5201 beparams = cluster.FillBE(instance)
5202 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5205 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5208 class LUReplaceDisks(LogicalUnit):
5209 """Replace the disks of an instance.
5212 HPATH = "mirrors-replace"
5213 HTYPE = constants.HTYPE_INSTANCE
5214 _OP_REQP = ["instance_name", "mode", "disks"]
5217 def CheckArguments(self):
5218 if not hasattr(self.op, "remote_node"):
5219 self.op.remote_node = None
5220 if not hasattr(self.op, "iallocator"):
5221 self.op.iallocator = None
5223 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5226 def ExpandNames(self):
5227 self._ExpandAndLockInstance()
5229 if self.op.iallocator is not None:
5230 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5232 elif self.op.remote_node is not None:
5233 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5234 if remote_node is None:
5235 raise errors.OpPrereqError("Node '%s' not known" %
5236 self.op.remote_node)
5238 self.op.remote_node = remote_node
5240 # Warning: do not remove the locking of the new secondary here
5241 # unless DRBD8.AddChildren is changed to work in parallel;
5242 # currently it doesn't since parallel invocations of
5243 # FindUnusedMinor will conflict
5244 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5245 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5248 self.needed_locks[locking.LEVEL_NODE] = []
5249 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5251 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5252 self.op.iallocator, self.op.remote_node,
5255 self.tasklets = [self.replacer]
5257 def DeclareLocks(self, level):
5258 # If we're not already locking all nodes in the set we have to declare the
5259 # instance's primary/secondary nodes.
5260 if (level == locking.LEVEL_NODE and
5261 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5262 self._LockInstancesNodes()
5264 def BuildHooksEnv(self):
5267 This runs on the master, the primary and all the secondaries.
5270 instance = self.replacer.instance
5272 "MODE": self.op.mode,
5273 "NEW_SECONDARY": self.op.remote_node,
5274 "OLD_SECONDARY": instance.secondary_nodes[0],
5276 env.update(_BuildInstanceHookEnvByObject(self, instance))
5278 self.cfg.GetMasterNode(),
5279 instance.primary_node,
5281 if self.op.remote_node is not None:
5282 nl.append(self.op.remote_node)
5286 class LUEvacuateNode(LogicalUnit):
5287 """Relocate the secondary instances from a node.
5290 HPATH = "node-evacuate"
5291 HTYPE = constants.HTYPE_NODE
5292 _OP_REQP = ["node_name"]
5295 def CheckArguments(self):
5296 if not hasattr(self.op, "remote_node"):
5297 self.op.remote_node = None
5298 if not hasattr(self.op, "iallocator"):
5299 self.op.iallocator = None
5301 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5302 self.op.remote_node,
5305 def ExpandNames(self):
5306 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5307 if self.op.node_name is None:
5308 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5310 self.needed_locks = {}
5312 # Declare node locks
5313 if self.op.iallocator is not None:
5314 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5316 elif self.op.remote_node is not None:
5317 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5318 if remote_node is None:
5319 raise errors.OpPrereqError("Node '%s' not known" %
5320 self.op.remote_node)
5322 self.op.remote_node = remote_node
5324 # Warning: do not remove the locking of the new secondary here
5325 # unless DRBD8.AddChildren is changed to work in parallel;
5326 # currently it doesn't since parallel invocations of
5327 # FindUnusedMinor will conflict
5328 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5329 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5332 raise errors.OpPrereqError("Invalid parameters")
5334 # Create tasklets for replacing disks for all secondary instances on this
5339 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5340 logging.debug("Replacing disks for instance %s", inst.name)
5341 names.append(inst.name)
5343 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5344 self.op.iallocator, self.op.remote_node, [])
5345 tasklets.append(replacer)
5347 self.tasklets = tasklets
5348 self.instance_names = names
5350 # Declare instance locks
5351 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5353 def DeclareLocks(self, level):
5354 # If we're not already locking all nodes in the set we have to declare the
5355 # instance's primary/secondary nodes.
5356 if (level == locking.LEVEL_NODE and
5357 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5358 self._LockInstancesNodes()
5360 def BuildHooksEnv(self):
5363 This runs on the master, the primary and all the secondaries.
5367 "NODE_NAME": self.op.node_name,
5370 nl = [self.cfg.GetMasterNode()]
5372 if self.op.remote_node is not None:
5373 env["NEW_SECONDARY"] = self.op.remote_node
5374 nl.append(self.op.remote_node)
5376 return (env, nl, nl)
5379 class TLReplaceDisks(Tasklet):
5380 """Replaces disks for an instance.
5382 Note: Locking is not within the scope of this class.
5385 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5387 """Initializes this class.
5390 Tasklet.__init__(self, lu)
5393 self.instance_name = instance_name
5395 self.iallocator_name = iallocator_name
5396 self.remote_node = remote_node
5400 self.instance = None
5401 self.new_node = None
5402 self.target_node = None
5403 self.other_node = None
5404 self.remote_node_info = None
5405 self.node_secondary_ip = None
5408 def CheckArguments(mode, remote_node, iallocator):
5409 """Helper function for users of this class.
5412 # check for valid parameter combination
5413 cnt = [remote_node, iallocator].count(None)
5414 if mode == constants.REPLACE_DISK_CHG:
5416 raise errors.OpPrereqError("When changing the secondary either an"
5417 " iallocator script must be used or the"
5420 raise errors.OpPrereqError("Give either the iallocator or the new"
5421 " secondary, not both")
5422 else: # not replacing the secondary
5424 raise errors.OpPrereqError("The iallocator and new node options can"
5425 " be used only when changing the"
5429 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5430 """Compute a new secondary node using an IAllocator.
5433 ial = IAllocator(lu.cfg, lu.rpc,
5434 mode=constants.IALLOCATOR_MODE_RELOC,
5436 relocate_from=relocate_from)
5438 ial.Run(iallocator_name)
5441 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5442 " %s" % (iallocator_name, ial.info))
5444 if len(ial.nodes) != ial.required_nodes:
5445 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5446 " of nodes (%s), required %s" %
5447 (len(ial.nodes), ial.required_nodes))
5449 remote_node_name = ial.nodes[0]
5451 lu.LogInfo("Selected new secondary for instance '%s': %s",
5452 instance_name, remote_node_name)
5454 return remote_node_name
5456 def CheckPrereq(self):
5457 """Check prerequisites.
5459 This checks that the instance is in the cluster.
5462 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5463 assert self.instance is not None, \
5464 "Cannot retrieve locked instance %s" % self.instance_name
5466 if self.instance.disk_template != constants.DT_DRBD8:
5467 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5470 if len(self.instance.secondary_nodes) != 1:
5471 raise errors.OpPrereqError("The instance has a strange layout,"
5472 " expected one secondary but found %d" %
5473 len(self.instance.secondary_nodes))
5475 secondary_node = self.instance.secondary_nodes[0]
5477 if self.iallocator_name is None:
5478 remote_node = self.remote_node
5480 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5481 self.instance.name, secondary_node)
5483 if remote_node is not None:
5484 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5485 assert self.remote_node_info is not None, \
5486 "Cannot retrieve locked node %s" % remote_node
5488 self.remote_node_info = None
5490 if remote_node == self.instance.primary_node:
5491 raise errors.OpPrereqError("The specified node is the primary node of"
5494 if remote_node == secondary_node:
5495 raise errors.OpPrereqError("The specified node is already the"
5496 " secondary node of the instance.")
5498 if self.mode == constants.REPLACE_DISK_PRI:
5499 self.target_node = self.instance.primary_node
5500 self.other_node = secondary_node
5501 check_nodes = [self.target_node, self.other_node]
5503 elif self.mode == constants.REPLACE_DISK_SEC:
5504 self.target_node = secondary_node
5505 self.other_node = self.instance.primary_node
5506 check_nodes = [self.target_node, self.other_node]
5508 elif self.mode == constants.REPLACE_DISK_CHG:
5509 self.new_node = remote_node
5510 self.other_node = self.instance.primary_node
5511 self.target_node = secondary_node
5512 check_nodes = [self.new_node, self.other_node]
5514 _CheckNodeNotDrained(self.lu, remote_node)
5517 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5520 for node in check_nodes:
5521 _CheckNodeOnline(self.lu, node)
5523 # If not specified all disks should be replaced
5525 self.disks = range(len(self.instance.disks))
5527 # Check whether disks are valid
5528 for disk_idx in self.disks:
5529 self.instance.FindDisk(disk_idx)
5531 # Get secondary node IP addresses
5534 for node_name in [self.target_node, self.other_node, self.new_node]:
5535 if node_name is not None:
5536 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5538 self.node_secondary_ip = node_2nd_ip
5540 def Exec(self, feedback_fn):
5541 """Execute disk replacement.
5543 This dispatches the disk replacement to the appropriate handler.
5546 feedback_fn("Replacing disks for %s" % self.instance.name)
5548 activate_disks = (not self.instance.admin_up)
5550 # Activate the instance disks if we're replacing them on a down instance
5552 _StartInstanceDisks(self.lu, self.instance, True)
5555 if self.mode == constants.REPLACE_DISK_CHG:
5556 return self._ExecDrbd8Secondary()
5558 return self._ExecDrbd8DiskOnly()
5561 # Deactivate the instance disks if we're replacing them on a down instance
5563 _SafeShutdownInstanceDisks(self.lu, self.instance)
5565 def _CheckVolumeGroup(self, nodes):
5566 self.lu.LogInfo("Checking volume groups")
5568 vgname = self.cfg.GetVGName()
5570 # Make sure volume group exists on all involved nodes
5571 results = self.rpc.call_vg_list(nodes)
5573 raise errors.OpExecError("Can't list volume groups on the nodes")
5577 res.Raise("Error checking node %s" % node)
5578 if vgname not in res.payload:
5579 raise errors.OpExecError("Volume group '%s' not found on node %s" %
5582 def _CheckDisksExistence(self, nodes):
5583 # Check disk existence
5584 for idx, dev in enumerate(self.instance.disks):
5585 if idx not in self.disks:
5589 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
5590 self.cfg.SetDiskID(dev, node)
5592 result = self.rpc.call_blockdev_find(node, dev)
5594 msg = result.fail_msg
5595 if msg or not result.payload:
5597 msg = "disk not found"
5598 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5601 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
5602 for idx, dev in enumerate(self.instance.disks):
5603 if idx not in self.disks:
5606 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
5609 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
5611 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
5612 " replace disks for instance %s" %
5613 (node_name, self.instance.name))
5615 def _CreateNewStorage(self, node_name):
5616 vgname = self.cfg.GetVGName()
5619 for idx, dev in enumerate(self.instance.disks):
5620 if idx not in self.disks:
5623 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
5625 self.cfg.SetDiskID(dev, node_name)
5627 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
5628 names = _GenerateUniqueNames(self.lu, lv_names)
5630 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
5631 logical_id=(vgname, names[0]))
5632 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5633 logical_id=(vgname, names[1]))
5635 new_lvs = [lv_data, lv_meta]
5636 old_lvs = dev.children
5637 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5639 # we pass force_create=True to force the LVM creation
5640 for new_lv in new_lvs:
5641 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
5642 _GetInstanceInfoText(self.instance), False)
5646 def _CheckDevices(self, node_name, iv_names):
5647 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5648 self.cfg.SetDiskID(dev, node_name)
5650 result = self.rpc.call_blockdev_find(node_name, dev)
5652 msg = result.fail_msg
5653 if msg or not result.payload:
5655 msg = "disk not found"
5656 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5659 if result.payload[5]:
5660 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5662 def _RemoveOldStorage(self, node_name, iv_names):
5663 for name, (dev, old_lvs, _) in iv_names.iteritems():
5664 self.lu.LogInfo("Remove logical volumes for %s" % name)
5667 self.cfg.SetDiskID(lv, node_name)
5669 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
5671 self.lu.LogWarning("Can't remove old LV: %s" % msg,
5672 hint="remove unused LVs manually")
5674 def _ExecDrbd8DiskOnly(self):
5675 """Replace a disk on the primary or secondary for DRBD 8.
5677 The algorithm for replace is quite complicated:
5679 1. for each disk to be replaced:
5681 1. create new LVs on the target node with unique names
5682 1. detach old LVs from the drbd device
5683 1. rename old LVs to name_replaced.<time_t>
5684 1. rename new LVs to old LVs
5685 1. attach the new LVs (with the old names now) to the drbd device
5687 1. wait for sync across all devices
5689 1. for each modified disk:
5691 1. remove old LVs (which have the name name_replaces.<time_t>)
5693 Failures are not very well handled.
5698 # Step: check device activation
5699 self.lu.LogStep(1, steps_total, "Check device existence")
5700 self._CheckDisksExistence([self.other_node, self.target_node])
5701 self._CheckVolumeGroup([self.target_node, self.other_node])
5703 # Step: check other node consistency
5704 self.lu.LogStep(2, steps_total, "Check peer consistency")
5705 self._CheckDisksConsistency(self.other_node,
5706 self.other_node == self.instance.primary_node,
5709 # Step: create new storage
5710 self.lu.LogStep(3, steps_total, "Allocate new storage")
5711 iv_names = self._CreateNewStorage(self.target_node)
5713 # Step: for each lv, detach+rename*2+attach
5714 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5715 for dev, old_lvs, new_lvs in iv_names.itervalues():
5716 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
5718 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
5719 result.Raise("Can't detach drbd from local storage on node"
5720 " %s for device %s" % (self.target_node, dev.iv_name))
5722 #cfg.Update(instance)
5724 # ok, we created the new LVs, so now we know we have the needed
5725 # storage; as such, we proceed on the target node to rename
5726 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5727 # using the assumption that logical_id == physical_id (which in
5728 # turn is the unique_id on that node)
5730 # FIXME(iustin): use a better name for the replaced LVs
5731 temp_suffix = int(time.time())
5732 ren_fn = lambda d, suff: (d.physical_id[0],
5733 d.physical_id[1] + "_replaced-%s" % suff)
5735 # Build the rename list based on what LVs exist on the node
5736 rename_old_to_new = []
5737 for to_ren in old_lvs:
5738 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
5739 if not result.fail_msg and result.payload:
5741 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
5743 self.lu.LogInfo("Renaming the old LVs on the target node")
5744 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
5745 result.Raise("Can't rename old LVs on node %s" % self.target_node)
5747 # Now we rename the new LVs to the old LVs
5748 self.lu.LogInfo("Renaming the new LVs on the target node")
5749 rename_new_to_old = [(new, old.physical_id)
5750 for old, new in zip(old_lvs, new_lvs)]
5751 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
5752 result.Raise("Can't rename new LVs on node %s" % self.target_node)
5754 for old, new in zip(old_lvs, new_lvs):
5755 new.logical_id = old.logical_id
5756 self.cfg.SetDiskID(new, self.target_node)
5758 for disk in old_lvs:
5759 disk.logical_id = ren_fn(disk, temp_suffix)
5760 self.cfg.SetDiskID(disk, self.target_node)
5762 # Now that the new lvs have the old name, we can add them to the device
5763 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
5764 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
5765 msg = result.fail_msg
5767 for new_lv in new_lvs:
5768 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
5770 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
5771 hint=("cleanup manually the unused logical"
5773 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5775 dev.children = new_lvs
5777 self.cfg.Update(self.instance)
5780 # This can fail as the old devices are degraded and _WaitForSync
5781 # does a combined result over all disks, so we don't check its return value
5782 self.lu.LogStep(5, steps_total, "Sync devices")
5783 _WaitForSync(self.lu, self.instance, unlock=True)
5785 # Check all devices manually
5786 self._CheckDevices(self.instance.primary_node, iv_names)
5788 # Step: remove old storage
5789 self.lu.LogStep(6, steps_total, "Removing old storage")
5790 self._RemoveOldStorage(self.target_node, iv_names)
5792 def _ExecDrbd8Secondary(self):
5793 """Replace the secondary node for DRBD 8.
5795 The algorithm for replace is quite complicated:
5796 - for all disks of the instance:
5797 - create new LVs on the new node with same names
5798 - shutdown the drbd device on the old secondary
5799 - disconnect the drbd network on the primary
5800 - create the drbd device on the new secondary
5801 - network attach the drbd on the primary, using an artifice:
5802 the drbd code for Attach() will connect to the network if it
5803 finds a device which is connected to the good local disks but
5805 - wait for sync across all devices
5806 - remove all disks from the old secondary
5808 Failures are not very well handled.
5813 # Step: check device activation
5814 self.lu.LogStep(1, steps_total, "Check device existence")
5815 self._CheckDisksExistence([self.instance.primary_node])
5816 self._CheckVolumeGroup([self.instance.primary_node])
5818 # Step: check other node consistency
5819 self.lu.LogStep(2, steps_total, "Check peer consistency")
5820 self._CheckDisksConsistency(self.instance.primary_node, True, True)
5822 # Step: create new storage
5823 self.lu.LogStep(3, steps_total, "Allocate new storage")
5824 for idx, dev in enumerate(self.instance.disks):
5825 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
5826 (self.new_node, idx))
5827 # we pass force_create=True to force LVM creation
5828 for new_lv in dev.children:
5829 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
5830 _GetInstanceInfoText(self.instance), False)
5832 # Step 4: dbrd minors and drbd setups changes
5833 # after this, we must manually remove the drbd minors on both the
5834 # error and the success paths
5835 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5836 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
5838 logging.debug("Allocated minors %r" % (minors,))
5841 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
5842 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
5843 # create new devices on new_node; note that we create two IDs:
5844 # one without port, so the drbd will be activated without
5845 # networking information on the new node at this stage, and one
5846 # with network, for the latter activation in step 4
5847 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5848 if self.instance.primary_node == o_node1:
5853 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
5854 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
5856 iv_names[idx] = (dev, dev.children, new_net_id)
5857 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5859 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5860 logical_id=new_alone_id,
5861 children=dev.children,
5864 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
5865 _GetInstanceInfoText(self.instance), False)
5866 except errors.GenericError:
5867 self.cfg.ReleaseDRBDMinors(self.instance.name)
5870 # We have new devices, shutdown the drbd on the old secondary
5871 for idx, dev in enumerate(self.instance.disks):
5872 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
5873 self.cfg.SetDiskID(dev, self.target_node)
5874 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
5876 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
5877 "node: %s" % (idx, msg),
5878 hint=("Please cleanup this device manually as"
5879 " soon as possible"))
5881 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
5882 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
5883 self.instance.disks)[self.instance.primary_node]
5885 msg = result.fail_msg
5887 # detaches didn't succeed (unlikely)
5888 self.cfg.ReleaseDRBDMinors(self.instance.name)
5889 raise errors.OpExecError("Can't detach the disks from the network on"
5890 " old node: %s" % (msg,))
5892 # if we managed to detach at least one, we update all the disks of
5893 # the instance to point to the new secondary
5894 self.lu.LogInfo("Updating instance configuration")
5895 for dev, _, new_logical_id in iv_names.itervalues():
5896 dev.logical_id = new_logical_id
5897 self.cfg.SetDiskID(dev, self.instance.primary_node)
5899 self.cfg.Update(self.instance)
5901 # and now perform the drbd attach
5902 self.lu.LogInfo("Attaching primary drbds to new secondary"
5903 " (standalone => connected)")
5904 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
5905 self.instance.disks, self.instance.name,
5907 for to_node, to_result in result.items():
5908 msg = to_result.fail_msg
5910 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
5911 hint=("please do a gnt-instance info to see the"
5912 " status of disks"))
5915 # This can fail as the old devices are degraded and _WaitForSync
5916 # does a combined result over all disks, so we don't check its return value
5917 self.lu.LogStep(5, steps_total, "Sync devices")
5918 _WaitForSync(self.lu, self.instance, unlock=True)
5920 # Check all devices manually
5921 self._CheckDevices(self.instance.primary_node, iv_names)
5923 # Step: remove old storage
5924 self.lu.LogStep(6, steps_total, "Removing old storage")
5925 self._RemoveOldStorage(self.target_node, iv_names)
5928 class LUGrowDisk(LogicalUnit):
5929 """Grow a disk of an instance.
5933 HTYPE = constants.HTYPE_INSTANCE
5934 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5937 def ExpandNames(self):
5938 self._ExpandAndLockInstance()
5939 self.needed_locks[locking.LEVEL_NODE] = []
5940 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5942 def DeclareLocks(self, level):
5943 if level == locking.LEVEL_NODE:
5944 self._LockInstancesNodes()
5946 def BuildHooksEnv(self):
5949 This runs on the master, the primary and all the secondaries.
5953 "DISK": self.op.disk,
5954 "AMOUNT": self.op.amount,
5956 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5958 self.cfg.GetMasterNode(),
5959 self.instance.primary_node,
5963 def CheckPrereq(self):
5964 """Check prerequisites.
5966 This checks that the instance is in the cluster.
5969 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5970 assert instance is not None, \
5971 "Cannot retrieve locked instance %s" % self.op.instance_name
5972 nodenames = list(instance.all_nodes)
5973 for node in nodenames:
5974 _CheckNodeOnline(self, node)
5977 self.instance = instance
5979 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5980 raise errors.OpPrereqError("Instance's disk layout does not support"
5983 self.disk = instance.FindDisk(self.op.disk)
5985 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5986 instance.hypervisor)
5987 for node in nodenames:
5988 info = nodeinfo[node]
5989 info.Raise("Cannot get current information from node %s" % node)
5990 vg_free = info.payload.get('vg_free', None)
5991 if not isinstance(vg_free, int):
5992 raise errors.OpPrereqError("Can't compute free disk space on"
5994 if self.op.amount > vg_free:
5995 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5996 " %d MiB available, %d MiB required" %
5997 (node, vg_free, self.op.amount))
5999 def Exec(self, feedback_fn):
6000 """Execute disk grow.
6003 instance = self.instance
6005 for node in instance.all_nodes:
6006 self.cfg.SetDiskID(disk, node)
6007 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6008 result.Raise("Grow request failed to node %s" % node)
6009 disk.RecordGrow(self.op.amount)
6010 self.cfg.Update(instance)
6011 if self.op.wait_for_sync:
6012 disk_abort = not _WaitForSync(self, instance)
6014 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6015 " status.\nPlease check the instance.")
6018 class LUQueryInstanceData(NoHooksLU):
6019 """Query runtime instance data.
6022 _OP_REQP = ["instances", "static"]
6025 def ExpandNames(self):
6026 self.needed_locks = {}
6027 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6029 if not isinstance(self.op.instances, list):
6030 raise errors.OpPrereqError("Invalid argument type 'instances'")
6032 if self.op.instances:
6033 self.wanted_names = []
6034 for name in self.op.instances:
6035 full_name = self.cfg.ExpandInstanceName(name)
6036 if full_name is None:
6037 raise errors.OpPrereqError("Instance '%s' not known" % name)
6038 self.wanted_names.append(full_name)
6039 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6041 self.wanted_names = None
6042 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6044 self.needed_locks[locking.LEVEL_NODE] = []
6045 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6047 def DeclareLocks(self, level):
6048 if level == locking.LEVEL_NODE:
6049 self._LockInstancesNodes()
6051 def CheckPrereq(self):
6052 """Check prerequisites.
6054 This only checks the optional instance list against the existing names.
6057 if self.wanted_names is None:
6058 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6060 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6061 in self.wanted_names]
6064 def _ComputeDiskStatus(self, instance, snode, dev):
6065 """Compute block device status.
6068 static = self.op.static
6070 self.cfg.SetDiskID(dev, instance.primary_node)
6071 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
6072 if dev_pstatus.offline:
6075 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
6076 dev_pstatus = dev_pstatus.payload
6080 if dev.dev_type in constants.LDS_DRBD:
6081 # we change the snode then (otherwise we use the one passed in)
6082 if dev.logical_id[0] == instance.primary_node:
6083 snode = dev.logical_id[1]
6085 snode = dev.logical_id[0]
6087 if snode and not static:
6088 self.cfg.SetDiskID(dev, snode)
6089 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
6090 if dev_sstatus.offline:
6093 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
6094 dev_sstatus = dev_sstatus.payload
6099 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6100 for child in dev.children]
6105 "iv_name": dev.iv_name,
6106 "dev_type": dev.dev_type,
6107 "logical_id": dev.logical_id,
6108 "physical_id": dev.physical_id,
6109 "pstatus": dev_pstatus,
6110 "sstatus": dev_sstatus,
6111 "children": dev_children,
6118 def Exec(self, feedback_fn):
6119 """Gather and return data"""
6122 cluster = self.cfg.GetClusterInfo()
6124 for instance in self.wanted_instances:
6125 if not self.op.static:
6126 remote_info = self.rpc.call_instance_info(instance.primary_node,
6128 instance.hypervisor)
6129 remote_info.Raise("Error checking node %s" % instance.primary_node)
6130 remote_info = remote_info.payload
6131 if remote_info and "state" in remote_info:
6134 remote_state = "down"
6137 if instance.admin_up:
6140 config_state = "down"
6142 disks = [self._ComputeDiskStatus(instance, None, device)
6143 for device in instance.disks]
6146 "name": instance.name,
6147 "config_state": config_state,
6148 "run_state": remote_state,
6149 "pnode": instance.primary_node,
6150 "snodes": instance.secondary_nodes,
6152 # this happens to be the same format used for hooks
6153 "nics": _NICListToTuple(self, instance.nics),
6155 "hypervisor": instance.hypervisor,
6156 "network_port": instance.network_port,
6157 "hv_instance": instance.hvparams,
6158 "hv_actual": cluster.FillHV(instance),
6159 "be_instance": instance.beparams,
6160 "be_actual": cluster.FillBE(instance),
6163 result[instance.name] = idict
6168 class LUSetInstanceParams(LogicalUnit):
6169 """Modifies an instances's parameters.
6172 HPATH = "instance-modify"
6173 HTYPE = constants.HTYPE_INSTANCE
6174 _OP_REQP = ["instance_name"]
6177 def CheckArguments(self):
6178 if not hasattr(self.op, 'nics'):
6180 if not hasattr(self.op, 'disks'):
6182 if not hasattr(self.op, 'beparams'):
6183 self.op.beparams = {}
6184 if not hasattr(self.op, 'hvparams'):
6185 self.op.hvparams = {}
6186 self.op.force = getattr(self.op, "force", False)
6187 if not (self.op.nics or self.op.disks or
6188 self.op.hvparams or self.op.beparams):
6189 raise errors.OpPrereqError("No changes submitted")
6193 for disk_op, disk_dict in self.op.disks:
6194 if disk_op == constants.DDM_REMOVE:
6197 elif disk_op == constants.DDM_ADD:
6200 if not isinstance(disk_op, int):
6201 raise errors.OpPrereqError("Invalid disk index")
6202 if not isinstance(disk_dict, dict):
6203 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6204 raise errors.OpPrereqError(msg)
6206 if disk_op == constants.DDM_ADD:
6207 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6208 if mode not in constants.DISK_ACCESS_SET:
6209 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6210 size = disk_dict.get('size', None)
6212 raise errors.OpPrereqError("Required disk parameter size missing")
6215 except ValueError, err:
6216 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6218 disk_dict['size'] = size
6220 # modification of disk
6221 if 'size' in disk_dict:
6222 raise errors.OpPrereqError("Disk size change not possible, use"
6225 if disk_addremove > 1:
6226 raise errors.OpPrereqError("Only one disk add or remove operation"
6227 " supported at a time")
6231 for nic_op, nic_dict in self.op.nics:
6232 if nic_op == constants.DDM_REMOVE:
6235 elif nic_op == constants.DDM_ADD:
6238 if not isinstance(nic_op, int):
6239 raise errors.OpPrereqError("Invalid nic index")
6240 if not isinstance(nic_dict, dict):
6241 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6242 raise errors.OpPrereqError(msg)
6244 # nic_dict should be a dict
6245 nic_ip = nic_dict.get('ip', None)
6246 if nic_ip is not None:
6247 if nic_ip.lower() == constants.VALUE_NONE:
6248 nic_dict['ip'] = None
6250 if not utils.IsValidIP(nic_ip):
6251 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6253 nic_bridge = nic_dict.get('bridge', None)
6254 nic_link = nic_dict.get('link', None)
6255 if nic_bridge and nic_link:
6256 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6257 " at the same time")
6258 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6259 nic_dict['bridge'] = None
6260 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6261 nic_dict['link'] = None
6263 if nic_op == constants.DDM_ADD:
6264 nic_mac = nic_dict.get('mac', None)
6266 nic_dict['mac'] = constants.VALUE_AUTO
6268 if 'mac' in nic_dict:
6269 nic_mac = nic_dict['mac']
6270 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6271 if not utils.IsValidMac(nic_mac):
6272 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6273 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6274 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6275 " modifying an existing nic")
6277 if nic_addremove > 1:
6278 raise errors.OpPrereqError("Only one NIC add or remove operation"
6279 " supported at a time")
6281 def ExpandNames(self):
6282 self._ExpandAndLockInstance()
6283 self.needed_locks[locking.LEVEL_NODE] = []
6284 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6286 def DeclareLocks(self, level):
6287 if level == locking.LEVEL_NODE:
6288 self._LockInstancesNodes()
6290 def BuildHooksEnv(self):
6293 This runs on the master, primary and secondaries.
6297 if constants.BE_MEMORY in self.be_new:
6298 args['memory'] = self.be_new[constants.BE_MEMORY]
6299 if constants.BE_VCPUS in self.be_new:
6300 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6301 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6302 # information at all.
6305 nic_override = dict(self.op.nics)
6306 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6307 for idx, nic in enumerate(self.instance.nics):
6308 if idx in nic_override:
6309 this_nic_override = nic_override[idx]
6311 this_nic_override = {}
6312 if 'ip' in this_nic_override:
6313 ip = this_nic_override['ip']
6316 if 'mac' in this_nic_override:
6317 mac = this_nic_override['mac']
6320 if idx in self.nic_pnew:
6321 nicparams = self.nic_pnew[idx]
6323 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6324 mode = nicparams[constants.NIC_MODE]
6325 link = nicparams[constants.NIC_LINK]
6326 args['nics'].append((ip, mac, mode, link))
6327 if constants.DDM_ADD in nic_override:
6328 ip = nic_override[constants.DDM_ADD].get('ip', None)
6329 mac = nic_override[constants.DDM_ADD]['mac']
6330 nicparams = self.nic_pnew[constants.DDM_ADD]
6331 mode = nicparams[constants.NIC_MODE]
6332 link = nicparams[constants.NIC_LINK]
6333 args['nics'].append((ip, mac, mode, link))
6334 elif constants.DDM_REMOVE in nic_override:
6335 del args['nics'][-1]
6337 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6338 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6341 def _GetUpdatedParams(self, old_params, update_dict,
6342 default_values, parameter_types):
6343 """Return the new params dict for the given params.
6345 @type old_params: dict
6346 @param old_params: old parameters
6347 @type update_dict: dict
6348 @param update_dict: dict containing new parameter values,
6349 or constants.VALUE_DEFAULT to reset the
6350 parameter to its default value
6351 @type default_values: dict
6352 @param default_values: default values for the filled parameters
6353 @type parameter_types: dict
6354 @param parameter_types: dict mapping target dict keys to types
6355 in constants.ENFORCEABLE_TYPES
6356 @rtype: (dict, dict)
6357 @return: (new_parameters, filled_parameters)
6360 params_copy = copy.deepcopy(old_params)
6361 for key, val in update_dict.iteritems():
6362 if val == constants.VALUE_DEFAULT:
6364 del params_copy[key]
6368 params_copy[key] = val
6369 utils.ForceDictType(params_copy, parameter_types)
6370 params_filled = objects.FillDict(default_values, params_copy)
6371 return (params_copy, params_filled)
6373 def CheckPrereq(self):
6374 """Check prerequisites.
6376 This only checks the instance list against the existing names.
6379 self.force = self.op.force
6381 # checking the new params on the primary/secondary nodes
6383 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6384 cluster = self.cluster = self.cfg.GetClusterInfo()
6385 assert self.instance is not None, \
6386 "Cannot retrieve locked instance %s" % self.op.instance_name
6387 pnode = instance.primary_node
6388 nodelist = list(instance.all_nodes)
6390 # hvparams processing
6391 if self.op.hvparams:
6392 i_hvdict, hv_new = self._GetUpdatedParams(
6393 instance.hvparams, self.op.hvparams,
6394 cluster.hvparams[instance.hypervisor],
6395 constants.HVS_PARAMETER_TYPES)
6397 hypervisor.GetHypervisor(
6398 instance.hypervisor).CheckParameterSyntax(hv_new)
6399 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6400 self.hv_new = hv_new # the new actual values
6401 self.hv_inst = i_hvdict # the new dict (without defaults)
6403 self.hv_new = self.hv_inst = {}
6405 # beparams processing
6406 if self.op.beparams:
6407 i_bedict, be_new = self._GetUpdatedParams(
6408 instance.beparams, self.op.beparams,
6409 cluster.beparams[constants.PP_DEFAULT],
6410 constants.BES_PARAMETER_TYPES)
6411 self.be_new = be_new # the new actual values
6412 self.be_inst = i_bedict # the new dict (without defaults)
6414 self.be_new = self.be_inst = {}
6418 if constants.BE_MEMORY in self.op.beparams and not self.force:
6419 mem_check_list = [pnode]
6420 if be_new[constants.BE_AUTO_BALANCE]:
6421 # either we changed auto_balance to yes or it was from before
6422 mem_check_list.extend(instance.secondary_nodes)
6423 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6424 instance.hypervisor)
6425 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6426 instance.hypervisor)
6427 pninfo = nodeinfo[pnode]
6428 msg = pninfo.fail_msg
6430 # Assume the primary node is unreachable and go ahead
6431 self.warn.append("Can't get info from primary node %s: %s" %
6433 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6434 self.warn.append("Node data from primary node %s doesn't contain"
6435 " free memory information" % pnode)
6436 elif instance_info.fail_msg:
6437 self.warn.append("Can't get instance runtime information: %s" %
6438 instance_info.fail_msg)
6440 if instance_info.payload:
6441 current_mem = int(instance_info.payload['memory'])
6443 # Assume instance not running
6444 # (there is a slight race condition here, but it's not very probable,
6445 # and we have no other way to check)
6447 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6448 pninfo.payload['memory_free'])
6450 raise errors.OpPrereqError("This change will prevent the instance"
6451 " from starting, due to %d MB of memory"
6452 " missing on its primary node" % miss_mem)
6454 if be_new[constants.BE_AUTO_BALANCE]:
6455 for node, nres in nodeinfo.items():
6456 if node not in instance.secondary_nodes:
6460 self.warn.append("Can't get info from secondary node %s: %s" %
6462 elif not isinstance(nres.payload.get('memory_free', None), int):
6463 self.warn.append("Secondary node %s didn't return free"
6464 " memory information" % node)
6465 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6466 self.warn.append("Not enough memory to failover instance to"
6467 " secondary node %s" % node)
6472 for nic_op, nic_dict in self.op.nics:
6473 if nic_op == constants.DDM_REMOVE:
6474 if not instance.nics:
6475 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6477 if nic_op != constants.DDM_ADD:
6479 if nic_op < 0 or nic_op >= len(instance.nics):
6480 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6482 (nic_op, len(instance.nics)))
6483 old_nic_params = instance.nics[nic_op].nicparams
6484 old_nic_ip = instance.nics[nic_op].ip
6489 update_params_dict = dict([(key, nic_dict[key])
6490 for key in constants.NICS_PARAMETERS
6491 if key in nic_dict])
6493 if 'bridge' in nic_dict:
6494 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6496 new_nic_params, new_filled_nic_params = \
6497 self._GetUpdatedParams(old_nic_params, update_params_dict,
6498 cluster.nicparams[constants.PP_DEFAULT],
6499 constants.NICS_PARAMETER_TYPES)
6500 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6501 self.nic_pinst[nic_op] = new_nic_params
6502 self.nic_pnew[nic_op] = new_filled_nic_params
6503 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6505 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6506 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6507 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6509 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6511 self.warn.append(msg)
6513 raise errors.OpPrereqError(msg)
6514 if new_nic_mode == constants.NIC_MODE_ROUTED:
6515 if 'ip' in nic_dict:
6516 nic_ip = nic_dict['ip']
6520 raise errors.OpPrereqError('Cannot set the nic ip to None'
6522 if 'mac' in nic_dict:
6523 nic_mac = nic_dict['mac']
6525 raise errors.OpPrereqError('Cannot set the nic mac to None')
6526 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6527 # otherwise generate the mac
6528 nic_dict['mac'] = self.cfg.GenerateMAC()
6530 # or validate/reserve the current one
6531 if self.cfg.IsMacInUse(nic_mac):
6532 raise errors.OpPrereqError("MAC address %s already in use"
6533 " in cluster" % nic_mac)
6536 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6537 raise errors.OpPrereqError("Disk operations not supported for"
6538 " diskless instances")
6539 for disk_op, disk_dict in self.op.disks:
6540 if disk_op == constants.DDM_REMOVE:
6541 if len(instance.disks) == 1:
6542 raise errors.OpPrereqError("Cannot remove the last disk of"
6544 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6545 ins_l = ins_l[pnode]
6546 msg = ins_l.fail_msg
6548 raise errors.OpPrereqError("Can't contact node %s: %s" %
6550 if instance.name in ins_l.payload:
6551 raise errors.OpPrereqError("Instance is running, can't remove"
6554 if (disk_op == constants.DDM_ADD and
6555 len(instance.nics) >= constants.MAX_DISKS):
6556 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6557 " add more" % constants.MAX_DISKS)
6558 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6560 if disk_op < 0 or disk_op >= len(instance.disks):
6561 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6563 (disk_op, len(instance.disks)))
6567 def Exec(self, feedback_fn):
6568 """Modifies an instance.
6570 All parameters take effect only at the next restart of the instance.
6573 # Process here the warnings from CheckPrereq, as we don't have a
6574 # feedback_fn there.
6575 for warn in self.warn:
6576 feedback_fn("WARNING: %s" % warn)
6579 instance = self.instance
6580 cluster = self.cluster
6582 for disk_op, disk_dict in self.op.disks:
6583 if disk_op == constants.DDM_REMOVE:
6584 # remove the last disk
6585 device = instance.disks.pop()
6586 device_idx = len(instance.disks)
6587 for node, disk in device.ComputeNodeTree(instance.primary_node):
6588 self.cfg.SetDiskID(disk, node)
6589 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6591 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6592 " continuing anyway", device_idx, node, msg)
6593 result.append(("disk/%d" % device_idx, "remove"))
6594 elif disk_op == constants.DDM_ADD:
6596 if instance.disk_template == constants.DT_FILE:
6597 file_driver, file_path = instance.disks[0].logical_id
6598 file_path = os.path.dirname(file_path)
6600 file_driver = file_path = None
6601 disk_idx_base = len(instance.disks)
6602 new_disk = _GenerateDiskTemplate(self,
6603 instance.disk_template,
6604 instance.name, instance.primary_node,
6605 instance.secondary_nodes,
6610 instance.disks.append(new_disk)
6611 info = _GetInstanceInfoText(instance)
6613 logging.info("Creating volume %s for instance %s",
6614 new_disk.iv_name, instance.name)
6615 # Note: this needs to be kept in sync with _CreateDisks
6617 for node in instance.all_nodes:
6618 f_create = node == instance.primary_node
6620 _CreateBlockDev(self, node, instance, new_disk,
6621 f_create, info, f_create)
6622 except errors.OpExecError, err:
6623 self.LogWarning("Failed to create volume %s (%s) on"
6625 new_disk.iv_name, new_disk, node, err)
6626 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6627 (new_disk.size, new_disk.mode)))
6629 # change a given disk
6630 instance.disks[disk_op].mode = disk_dict['mode']
6631 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6633 for nic_op, nic_dict in self.op.nics:
6634 if nic_op == constants.DDM_REMOVE:
6635 # remove the last nic
6636 del instance.nics[-1]
6637 result.append(("nic.%d" % len(instance.nics), "remove"))
6638 elif nic_op == constants.DDM_ADD:
6639 # mac and bridge should be set, by now
6640 mac = nic_dict['mac']
6641 ip = nic_dict.get('ip', None)
6642 nicparams = self.nic_pinst[constants.DDM_ADD]
6643 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6644 instance.nics.append(new_nic)
6645 result.append(("nic.%d" % (len(instance.nics) - 1),
6646 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6647 (new_nic.mac, new_nic.ip,
6648 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6649 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6652 for key in 'mac', 'ip':
6654 setattr(instance.nics[nic_op], key, nic_dict[key])
6655 if nic_op in self.nic_pnew:
6656 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6657 for key, val in nic_dict.iteritems():
6658 result.append(("nic.%s/%d" % (key, nic_op), val))
6661 if self.op.hvparams:
6662 instance.hvparams = self.hv_inst
6663 for key, val in self.op.hvparams.iteritems():
6664 result.append(("hv/%s" % key, val))
6667 if self.op.beparams:
6668 instance.beparams = self.be_inst
6669 for key, val in self.op.beparams.iteritems():
6670 result.append(("be/%s" % key, val))
6672 self.cfg.Update(instance)
6677 class LUQueryExports(NoHooksLU):
6678 """Query the exports list
6681 _OP_REQP = ['nodes']
6684 def ExpandNames(self):
6685 self.needed_locks = {}
6686 self.share_locks[locking.LEVEL_NODE] = 1
6687 if not self.op.nodes:
6688 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6690 self.needed_locks[locking.LEVEL_NODE] = \
6691 _GetWantedNodes(self, self.op.nodes)
6693 def CheckPrereq(self):
6694 """Check prerequisites.
6697 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6699 def Exec(self, feedback_fn):
6700 """Compute the list of all the exported system images.
6703 @return: a dictionary with the structure node->(export-list)
6704 where export-list is a list of the instances exported on
6708 rpcresult = self.rpc.call_export_list(self.nodes)
6710 for node in rpcresult:
6711 if rpcresult[node].fail_msg:
6712 result[node] = False
6714 result[node] = rpcresult[node].payload
6719 class LUExportInstance(LogicalUnit):
6720 """Export an instance to an image in the cluster.
6723 HPATH = "instance-export"
6724 HTYPE = constants.HTYPE_INSTANCE
6725 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6728 def ExpandNames(self):
6729 self._ExpandAndLockInstance()
6730 # FIXME: lock only instance primary and destination node
6732 # Sad but true, for now we have do lock all nodes, as we don't know where
6733 # the previous export might be, and and in this LU we search for it and
6734 # remove it from its current node. In the future we could fix this by:
6735 # - making a tasklet to search (share-lock all), then create the new one,
6736 # then one to remove, after
6737 # - removing the removal operation altogether
6738 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6740 def DeclareLocks(self, level):
6741 """Last minute lock declaration."""
6742 # All nodes are locked anyway, so nothing to do here.
6744 def BuildHooksEnv(self):
6747 This will run on the master, primary node and target node.
6751 "EXPORT_NODE": self.op.target_node,
6752 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6754 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6755 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6756 self.op.target_node]
6759 def CheckPrereq(self):
6760 """Check prerequisites.
6762 This checks that the instance and node names are valid.
6765 instance_name = self.op.instance_name
6766 self.instance = self.cfg.GetInstanceInfo(instance_name)
6767 assert self.instance is not None, \
6768 "Cannot retrieve locked instance %s" % self.op.instance_name
6769 _CheckNodeOnline(self, self.instance.primary_node)
6771 self.dst_node = self.cfg.GetNodeInfo(
6772 self.cfg.ExpandNodeName(self.op.target_node))
6774 if self.dst_node is None:
6775 # This is wrong node name, not a non-locked node
6776 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6777 _CheckNodeOnline(self, self.dst_node.name)
6778 _CheckNodeNotDrained(self, self.dst_node.name)
6780 # instance disk type verification
6781 for disk in self.instance.disks:
6782 if disk.dev_type == constants.LD_FILE:
6783 raise errors.OpPrereqError("Export not supported for instances with"
6784 " file-based disks")
6786 def Exec(self, feedback_fn):
6787 """Export an instance to an image in the cluster.
6790 instance = self.instance
6791 dst_node = self.dst_node
6792 src_node = instance.primary_node
6793 if self.op.shutdown:
6794 # shutdown the instance, but not the disks
6795 result = self.rpc.call_instance_shutdown(src_node, instance)
6796 result.Raise("Could not shutdown instance %s on"
6797 " node %s" % (instance.name, src_node))
6799 vgname = self.cfg.GetVGName()
6803 # set the disks ID correctly since call_instance_start needs the
6804 # correct drbd minor to create the symlinks
6805 for disk in instance.disks:
6806 self.cfg.SetDiskID(disk, src_node)
6809 for idx, disk in enumerate(instance.disks):
6810 # result.payload will be a snapshot of an lvm leaf of the one we passed
6811 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6812 msg = result.fail_msg
6814 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6816 snap_disks.append(False)
6818 disk_id = (vgname, result.payload)
6819 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6820 logical_id=disk_id, physical_id=disk_id,
6821 iv_name=disk.iv_name)
6822 snap_disks.append(new_dev)
6825 if self.op.shutdown and instance.admin_up:
6826 result = self.rpc.call_instance_start(src_node, instance, None, None)
6827 msg = result.fail_msg
6829 _ShutdownInstanceDisks(self, instance)
6830 raise errors.OpExecError("Could not start instance: %s" % msg)
6832 # TODO: check for size
6834 cluster_name = self.cfg.GetClusterName()
6835 for idx, dev in enumerate(snap_disks):
6837 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6838 instance, cluster_name, idx)
6839 msg = result.fail_msg
6841 self.LogWarning("Could not export disk/%s from node %s to"
6842 " node %s: %s", idx, src_node, dst_node.name, msg)
6843 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6845 self.LogWarning("Could not remove snapshot for disk/%d from node"
6846 " %s: %s", idx, src_node, msg)
6848 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6849 msg = result.fail_msg
6851 self.LogWarning("Could not finalize export for instance %s"
6852 " on node %s: %s", instance.name, dst_node.name, msg)
6854 nodelist = self.cfg.GetNodeList()
6855 nodelist.remove(dst_node.name)
6857 # on one-node clusters nodelist will be empty after the removal
6858 # if we proceed the backup would be removed because OpQueryExports
6859 # substitutes an empty list with the full cluster node list.
6860 iname = instance.name
6862 exportlist = self.rpc.call_export_list(nodelist)
6863 for node in exportlist:
6864 if exportlist[node].fail_msg:
6866 if iname in exportlist[node].payload:
6867 msg = self.rpc.call_export_remove(node, iname).fail_msg
6869 self.LogWarning("Could not remove older export for instance %s"
6870 " on node %s: %s", iname, node, msg)
6873 class LURemoveExport(NoHooksLU):
6874 """Remove exports related to the named instance.
6877 _OP_REQP = ["instance_name"]
6880 def ExpandNames(self):
6881 self.needed_locks = {}
6882 # We need all nodes to be locked in order for RemoveExport to work, but we
6883 # don't need to lock the instance itself, as nothing will happen to it (and
6884 # we can remove exports also for a removed instance)
6885 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6887 def CheckPrereq(self):
6888 """Check prerequisites.
6892 def Exec(self, feedback_fn):
6893 """Remove any export.
6896 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6897 # If the instance was not found we'll try with the name that was passed in.
6898 # This will only work if it was an FQDN, though.
6900 if not instance_name:
6902 instance_name = self.op.instance_name
6904 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6905 exportlist = self.rpc.call_export_list(locked_nodes)
6907 for node in exportlist:
6908 msg = exportlist[node].fail_msg
6910 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6912 if instance_name in exportlist[node].payload:
6914 result = self.rpc.call_export_remove(node, instance_name)
6915 msg = result.fail_msg
6917 logging.error("Could not remove export for instance %s"
6918 " on node %s: %s", instance_name, node, msg)
6920 if fqdn_warn and not found:
6921 feedback_fn("Export not found. If trying to remove an export belonging"
6922 " to a deleted instance please use its Fully Qualified"
6926 class TagsLU(NoHooksLU):
6929 This is an abstract class which is the parent of all the other tags LUs.
6933 def ExpandNames(self):
6934 self.needed_locks = {}
6935 if self.op.kind == constants.TAG_NODE:
6936 name = self.cfg.ExpandNodeName(self.op.name)
6938 raise errors.OpPrereqError("Invalid node name (%s)" %
6941 self.needed_locks[locking.LEVEL_NODE] = name
6942 elif self.op.kind == constants.TAG_INSTANCE:
6943 name = self.cfg.ExpandInstanceName(self.op.name)
6945 raise errors.OpPrereqError("Invalid instance name (%s)" %
6948 self.needed_locks[locking.LEVEL_INSTANCE] = name
6950 def CheckPrereq(self):
6951 """Check prerequisites.
6954 if self.op.kind == constants.TAG_CLUSTER:
6955 self.target = self.cfg.GetClusterInfo()
6956 elif self.op.kind == constants.TAG_NODE:
6957 self.target = self.cfg.GetNodeInfo(self.op.name)
6958 elif self.op.kind == constants.TAG_INSTANCE:
6959 self.target = self.cfg.GetInstanceInfo(self.op.name)
6961 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6965 class LUGetTags(TagsLU):
6966 """Returns the tags of a given object.
6969 _OP_REQP = ["kind", "name"]
6972 def Exec(self, feedback_fn):
6973 """Returns the tag list.
6976 return list(self.target.GetTags())
6979 class LUSearchTags(NoHooksLU):
6980 """Searches the tags for a given pattern.
6983 _OP_REQP = ["pattern"]
6986 def ExpandNames(self):
6987 self.needed_locks = {}
6989 def CheckPrereq(self):
6990 """Check prerequisites.
6992 This checks the pattern passed for validity by compiling it.
6996 self.re = re.compile(self.op.pattern)
6997 except re.error, err:
6998 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6999 (self.op.pattern, err))
7001 def Exec(self, feedback_fn):
7002 """Returns the tag list.
7006 tgts = [("/cluster", cfg.GetClusterInfo())]
7007 ilist = cfg.GetAllInstancesInfo().values()
7008 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7009 nlist = cfg.GetAllNodesInfo().values()
7010 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7012 for path, target in tgts:
7013 for tag in target.GetTags():
7014 if self.re.search(tag):
7015 results.append((path, tag))
7019 class LUAddTags(TagsLU):
7020 """Sets a tag on a given object.
7023 _OP_REQP = ["kind", "name", "tags"]
7026 def CheckPrereq(self):
7027 """Check prerequisites.
7029 This checks the type and length of the tag name and value.
7032 TagsLU.CheckPrereq(self)
7033 for tag in self.op.tags:
7034 objects.TaggableObject.ValidateTag(tag)
7036 def Exec(self, feedback_fn):
7041 for tag in self.op.tags:
7042 self.target.AddTag(tag)
7043 except errors.TagError, err:
7044 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7046 self.cfg.Update(self.target)
7047 except errors.ConfigurationError:
7048 raise errors.OpRetryError("There has been a modification to the"
7049 " config file and the operation has been"
7050 " aborted. Please retry.")
7053 class LUDelTags(TagsLU):
7054 """Delete a list of tags from a given object.
7057 _OP_REQP = ["kind", "name", "tags"]
7060 def CheckPrereq(self):
7061 """Check prerequisites.
7063 This checks that we have the given tag.
7066 TagsLU.CheckPrereq(self)
7067 for tag in self.op.tags:
7068 objects.TaggableObject.ValidateTag(tag)
7069 del_tags = frozenset(self.op.tags)
7070 cur_tags = self.target.GetTags()
7071 if not del_tags <= cur_tags:
7072 diff_tags = del_tags - cur_tags
7073 diff_names = ["'%s'" % tag for tag in diff_tags]
7075 raise errors.OpPrereqError("Tag(s) %s not found" %
7076 (",".join(diff_names)))
7078 def Exec(self, feedback_fn):
7079 """Remove the tag from the object.
7082 for tag in self.op.tags:
7083 self.target.RemoveTag(tag)
7085 self.cfg.Update(self.target)
7086 except errors.ConfigurationError:
7087 raise errors.OpRetryError("There has been a modification to the"
7088 " config file and the operation has been"
7089 " aborted. Please retry.")
7092 class LUTestDelay(NoHooksLU):
7093 """Sleep for a specified amount of time.
7095 This LU sleeps on the master and/or nodes for a specified amount of
7099 _OP_REQP = ["duration", "on_master", "on_nodes"]
7102 def ExpandNames(self):
7103 """Expand names and set required locks.
7105 This expands the node list, if any.
7108 self.needed_locks = {}
7109 if self.op.on_nodes:
7110 # _GetWantedNodes can be used here, but is not always appropriate to use
7111 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7113 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7114 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7116 def CheckPrereq(self):
7117 """Check prerequisites.
7121 def Exec(self, feedback_fn):
7122 """Do the actual sleep.
7125 if self.op.on_master:
7126 if not utils.TestDelay(self.op.duration):
7127 raise errors.OpExecError("Error during master delay test")
7128 if self.op.on_nodes:
7129 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7130 for node, node_result in result.items():
7131 node_result.Raise("Failure during rpc call to node %s" % node)
7134 class IAllocator(object):
7135 """IAllocator framework.
7137 An IAllocator instance has three sets of attributes:
7138 - cfg that is needed to query the cluster
7139 - input data (all members of the _KEYS class attribute are required)
7140 - four buffer attributes (in|out_data|text), that represent the
7141 input (to the external script) in text and data structure format,
7142 and the output from it, again in two formats
7143 - the result variables from the script (success, info, nodes) for
7148 "mem_size", "disks", "disk_template",
7149 "os", "tags", "nics", "vcpus", "hypervisor",
7155 def __init__(self, cfg, rpc, mode, name, **kwargs):
7158 # init buffer variables
7159 self.in_text = self.out_text = self.in_data = self.out_data = None
7160 # init all input fields so that pylint is happy
7163 self.mem_size = self.disks = self.disk_template = None
7164 self.os = self.tags = self.nics = self.vcpus = None
7165 self.hypervisor = None
7166 self.relocate_from = None
7168 self.required_nodes = None
7169 # init result fields
7170 self.success = self.info = self.nodes = None
7171 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7172 keyset = self._ALLO_KEYS
7173 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7174 keyset = self._RELO_KEYS
7176 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7177 " IAllocator" % self.mode)
7179 if key not in keyset:
7180 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7181 " IAllocator" % key)
7182 setattr(self, key, kwargs[key])
7184 if key not in kwargs:
7185 raise errors.ProgrammerError("Missing input parameter '%s' to"
7186 " IAllocator" % key)
7187 self._BuildInputData()
7189 def _ComputeClusterData(self):
7190 """Compute the generic allocator input data.
7192 This is the data that is independent of the actual operation.
7196 cluster_info = cfg.GetClusterInfo()
7199 "version": constants.IALLOCATOR_VERSION,
7200 "cluster_name": cfg.GetClusterName(),
7201 "cluster_tags": list(cluster_info.GetTags()),
7202 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7203 # we don't have job IDs
7205 iinfo = cfg.GetAllInstancesInfo().values()
7206 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7210 node_list = cfg.GetNodeList()
7212 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7213 hypervisor_name = self.hypervisor
7214 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7215 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7217 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7220 self.rpc.call_all_instances_info(node_list,
7221 cluster_info.enabled_hypervisors)
7222 for nname, nresult in node_data.items():
7223 # first fill in static (config-based) values
7224 ninfo = cfg.GetNodeInfo(nname)
7226 "tags": list(ninfo.GetTags()),
7227 "primary_ip": ninfo.primary_ip,
7228 "secondary_ip": ninfo.secondary_ip,
7229 "offline": ninfo.offline,
7230 "drained": ninfo.drained,
7231 "master_candidate": ninfo.master_candidate,
7234 if not ninfo.offline:
7235 nresult.Raise("Can't get data for node %s" % nname)
7236 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7238 remote_info = nresult.payload
7239 for attr in ['memory_total', 'memory_free', 'memory_dom0',
7240 'vg_size', 'vg_free', 'cpu_total']:
7241 if attr not in remote_info:
7242 raise errors.OpExecError("Node '%s' didn't return attribute"
7243 " '%s'" % (nname, attr))
7244 if not isinstance(remote_info[attr], int):
7245 raise errors.OpExecError("Node '%s' returned invalid value"
7247 (nname, attr, remote_info[attr]))
7248 # compute memory used by primary instances
7249 i_p_mem = i_p_up_mem = 0
7250 for iinfo, beinfo in i_list:
7251 if iinfo.primary_node == nname:
7252 i_p_mem += beinfo[constants.BE_MEMORY]
7253 if iinfo.name not in node_iinfo[nname].payload:
7256 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7257 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7258 remote_info['memory_free'] -= max(0, i_mem_diff)
7261 i_p_up_mem += beinfo[constants.BE_MEMORY]
7263 # compute memory used by instances
7265 "total_memory": remote_info['memory_total'],
7266 "reserved_memory": remote_info['memory_dom0'],
7267 "free_memory": remote_info['memory_free'],
7268 "total_disk": remote_info['vg_size'],
7269 "free_disk": remote_info['vg_free'],
7270 "total_cpus": remote_info['cpu_total'],
7271 "i_pri_memory": i_p_mem,
7272 "i_pri_up_memory": i_p_up_mem,
7276 node_results[nname] = pnr
7277 data["nodes"] = node_results
7281 for iinfo, beinfo in i_list:
7283 for nic in iinfo.nics:
7284 filled_params = objects.FillDict(
7285 cluster_info.nicparams[constants.PP_DEFAULT],
7287 nic_dict = {"mac": nic.mac,
7289 "mode": filled_params[constants.NIC_MODE],
7290 "link": filled_params[constants.NIC_LINK],
7292 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7293 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7294 nic_data.append(nic_dict)
7296 "tags": list(iinfo.GetTags()),
7297 "admin_up": iinfo.admin_up,
7298 "vcpus": beinfo[constants.BE_VCPUS],
7299 "memory": beinfo[constants.BE_MEMORY],
7301 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7303 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7304 "disk_template": iinfo.disk_template,
7305 "hypervisor": iinfo.hypervisor,
7307 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7309 instance_data[iinfo.name] = pir
7311 data["instances"] = instance_data
7315 def _AddNewInstance(self):
7316 """Add new instance data to allocator structure.
7318 This in combination with _AllocatorGetClusterData will create the
7319 correct structure needed as input for the allocator.
7321 The checks for the completeness of the opcode must have already been
7327 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7329 if self.disk_template in constants.DTS_NET_MIRROR:
7330 self.required_nodes = 2
7332 self.required_nodes = 1
7336 "disk_template": self.disk_template,
7339 "vcpus": self.vcpus,
7340 "memory": self.mem_size,
7341 "disks": self.disks,
7342 "disk_space_total": disk_space,
7344 "required_nodes": self.required_nodes,
7346 data["request"] = request
7348 def _AddRelocateInstance(self):
7349 """Add relocate instance data to allocator structure.
7351 This in combination with _IAllocatorGetClusterData will create the
7352 correct structure needed as input for the allocator.
7354 The checks for the completeness of the opcode must have already been
7358 instance = self.cfg.GetInstanceInfo(self.name)
7359 if instance is None:
7360 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7361 " IAllocator" % self.name)
7363 if instance.disk_template not in constants.DTS_NET_MIRROR:
7364 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7366 if len(instance.secondary_nodes) != 1:
7367 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7369 self.required_nodes = 1
7370 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7371 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7376 "disk_space_total": disk_space,
7377 "required_nodes": self.required_nodes,
7378 "relocate_from": self.relocate_from,
7380 self.in_data["request"] = request
7382 def _BuildInputData(self):
7383 """Build input data structures.
7386 self._ComputeClusterData()
7388 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7389 self._AddNewInstance()
7391 self._AddRelocateInstance()
7393 self.in_text = serializer.Dump(self.in_data)
7395 def Run(self, name, validate=True, call_fn=None):
7396 """Run an instance allocator and return the results.
7400 call_fn = self.rpc.call_iallocator_runner
7402 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7403 result.Raise("Failure while running the iallocator script")
7405 self.out_text = result.payload
7407 self._ValidateResult()
7409 def _ValidateResult(self):
7410 """Process the allocator results.
7412 This will process and if successful save the result in
7413 self.out_data and the other parameters.
7417 rdict = serializer.Load(self.out_text)
7418 except Exception, err:
7419 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7421 if not isinstance(rdict, dict):
7422 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7424 for key in "success", "info", "nodes":
7425 if key not in rdict:
7426 raise errors.OpExecError("Can't parse iallocator results:"
7427 " missing key '%s'" % key)
7428 setattr(self, key, rdict[key])
7430 if not isinstance(rdict["nodes"], list):
7431 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7433 self.out_data = rdict
7436 class LUTestAllocator(NoHooksLU):
7437 """Run allocator tests.
7439 This LU runs the allocator tests
7442 _OP_REQP = ["direction", "mode", "name"]
7444 def CheckPrereq(self):
7445 """Check prerequisites.
7447 This checks the opcode parameters depending on the director and mode test.
7450 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7451 for attr in ["name", "mem_size", "disks", "disk_template",
7452 "os", "tags", "nics", "vcpus"]:
7453 if not hasattr(self.op, attr):
7454 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7456 iname = self.cfg.ExpandInstanceName(self.op.name)
7457 if iname is not None:
7458 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7460 if not isinstance(self.op.nics, list):
7461 raise errors.OpPrereqError("Invalid parameter 'nics'")
7462 for row in self.op.nics:
7463 if (not isinstance(row, dict) or
7466 "bridge" not in row):
7467 raise errors.OpPrereqError("Invalid contents of the"
7468 " 'nics' parameter")
7469 if not isinstance(self.op.disks, list):
7470 raise errors.OpPrereqError("Invalid parameter 'disks'")
7471 for row in self.op.disks:
7472 if (not isinstance(row, dict) or
7473 "size" not in row or
7474 not isinstance(row["size"], int) or
7475 "mode" not in row or
7476 row["mode"] not in ['r', 'w']):
7477 raise errors.OpPrereqError("Invalid contents of the"
7478 " 'disks' parameter")
7479 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7480 self.op.hypervisor = self.cfg.GetHypervisorType()
7481 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7482 if not hasattr(self.op, "name"):
7483 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7484 fname = self.cfg.ExpandInstanceName(self.op.name)
7486 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7488 self.op.name = fname
7489 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7491 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7494 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7495 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7496 raise errors.OpPrereqError("Missing allocator name")
7497 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7498 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7501 def Exec(self, feedback_fn):
7502 """Run the allocator test.
7505 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7506 ial = IAllocator(self.cfg, self.rpc,
7509 mem_size=self.op.mem_size,
7510 disks=self.op.disks,
7511 disk_template=self.op.disk_template,
7515 vcpus=self.op.vcpus,
7516 hypervisor=self.op.hypervisor,
7519 ial = IAllocator(self.cfg, self.rpc,
7522 relocate_from=list(self.relocate_from),
7525 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7526 result = ial.in_text
7528 ial.Run(self.op.allocator, validate=False)
7529 result = ial.out_text