4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_WSSTORE: the LU needs a writable SimpleStore
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
71 def __init__(self, processor, op, context, sstore):
72 """Constructor for LogicalUnit.
74 This needs to be overriden in derived classes in order to check op
80 self.cfg = context.cfg
82 self.context = context
83 self.needed_locks = None
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 # Used to force good behavior when calling helper functions
86 self.recalculate_locks = {}
89 for attr_name in self._OP_REQP:
90 attr_val = getattr(op, attr_name, None)
92 raise errors.OpPrereqError("Required parameter '%s' missing" %
95 if not self.cfg.IsCluster():
96 raise errors.OpPrereqError("Cluster not initialized yet,"
97 " use 'gnt-cluster init' first.")
99 master = sstore.GetMasterNode()
100 if master != utils.HostInfo().name:
101 raise errors.OpPrereqError("Commands must be run on the master"
105 """Returns the SshRunner object
109 self.__ssh = ssh.SshRunner(self.sstore)
112 ssh = property(fget=__GetSSH)
114 def ExpandNames(self):
115 """Expand names for this LU.
117 This method is called before starting to execute the opcode, and it should
118 update all the parameters of the opcode to their canonical form (e.g. a
119 short node name must be fully expanded after this method has successfully
120 completed). This way locking, hooks, logging, ecc. can work correctly.
122 LUs which implement this method must also populate the self.needed_locks
123 member, as a dict with lock levels as keys, and a list of needed lock names
125 - Use an empty dict if you don't need any lock
126 - If you don't need any lock at a particular level omit that level
127 - Don't put anything for the BGL level
128 - If you want all locks at a level use None as a value
129 (this reflects what LockSet does, and will be replaced before
130 CheckPrereq with the full list of nodes that have been locked)
132 If you need to share locks (rather than acquire them exclusively) at one
133 level you can modify self.share_locks, setting a true value (usually 1) for
134 that level. By default locks are not shared.
137 # Acquire all nodes and one instance
138 self.needed_locks = {
139 locking.LEVEL_NODE: None,
140 locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142 # Acquire just two nodes
143 self.needed_locks = {
144 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147 self.needed_locks = {} # No, you can't leave it to the default value None
150 # The implementation of this method is mandatory only if the new LU is
151 # concurrent, so that old LUs don't need to be changed all at the same
154 self.needed_locks = {} # Exclusive LUs don't need locks.
156 raise NotImplementedError
158 def DeclareLocks(self, level):
159 """Declare LU locking needs for a level
161 While most LUs can just declare their locking needs at ExpandNames time,
162 sometimes there's the need to calculate some locks after having acquired
163 the ones before. This function is called just before acquiring locks at a
164 particular level, but after acquiring the ones at lower levels, and permits
165 such calculations. It can be used to modify self.needed_locks, and by
166 default it does nothing.
168 This function is only called if you have something already set in
169 self.needed_locks for the level.
171 @param level: Locking level which is going to be locked
172 @type level: member of ganeti.locking.LEVELS
176 def CheckPrereq(self):
177 """Check prerequisites for this LU.
179 This method should check that the prerequisites for the execution
180 of this LU are fulfilled. It can do internode communication, but
181 it should be idempotent - no cluster or system changes are
184 The method should raise errors.OpPrereqError in case something is
185 not fulfilled. Its return value is ignored.
187 This method should also update all the parameters of the opcode to
188 their canonical form if it hasn't been done by ExpandNames before.
191 raise NotImplementedError
193 def Exec(self, feedback_fn):
196 This method should implement the actual work. It should raise
197 errors.OpExecError for failures that are somewhat dealt with in
201 raise NotImplementedError
203 def BuildHooksEnv(self):
204 """Build hooks environment for this LU.
206 This method should return a three-node tuple consisting of: a dict
207 containing the environment that will be used for running the
208 specific hook for this LU, a list of node names on which the hook
209 should run before the execution, and a list of node names on which
210 the hook should run after the execution.
212 The keys of the dict must not have 'GANETI_' prefixed as this will
213 be handled in the hooks runner. Also note additional keys will be
214 added by the hooks runner. If the LU doesn't define any
215 environment, an empty dict (and not None) should be returned.
217 No nodes should be returned as an empty list (and not None).
219 Note that if the HPATH for a LU class is None, this function will
223 raise NotImplementedError
225 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226 """Notify the LU about the results of its hooks.
228 This method is called every time a hooks phase is executed, and notifies
229 the Logical Unit about the hooks' result. The LU can then use it to alter
230 its result based on the hooks. By default the method does nothing and the
231 previous result is passed back unchanged but any LU can define it if it
232 wants to use the local cluster hook-scripts somehow.
235 phase: the hooks phase that has just been run
236 hooks_results: the results of the multi-node hooks rpc call
237 feedback_fn: function to send feedback back to the caller
238 lu_result: the previous result this LU had, or None in the PRE phase.
243 def _ExpandAndLockInstance(self):
244 """Helper function to expand and lock an instance.
246 Many LUs that work on an instance take its name in self.op.instance_name
247 and need to expand it and then declare the expanded name for locking. This
248 function does it, and then updates self.op.instance_name to the expanded
249 name. It also initializes needed_locks as a dict, if this hasn't been done
253 if self.needed_locks is None:
254 self.needed_locks = {}
256 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257 "_ExpandAndLockInstance called with instance-level locks set"
258 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259 if expanded_name is None:
260 raise errors.OpPrereqError("Instance '%s' not known" %
261 self.op.instance_name)
262 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263 self.op.instance_name = expanded_name
265 def _LockInstancesNodes(self):
266 """Helper function to declare instances' nodes for locking.
268 This function should be called after locking one or more instances to lock
269 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270 with all primary or secondary nodes for instances already locked and
271 present in self.needed_locks[locking.LEVEL_INSTANCE].
273 It should be called from DeclareLocks, and for safety only works if
274 self.recalculate_locks[locking.LEVEL_NODE] is set.
276 In the future it may grow parameters to just lock some instance's nodes, or
277 to just lock primaries or secondary nodes, if needed.
279 If should be called in DeclareLocks in a way similar to:
281 if level == locking.LEVEL_NODE:
282 self._LockInstancesNodes()
285 assert locking.LEVEL_NODE in self.recalculate_locks, \
286 "_LockInstancesNodes helper function called with no nodes to recalculate"
288 # TODO: check if we're really been called with the instance locks held
290 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291 # future we might want to have different behaviors depending on the value
292 # of self.recalculate_locks[locking.LEVEL_NODE]
294 for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295 instance = self.context.cfg.GetInstanceInfo(instance_name)
296 wanted_nodes.append(instance.primary_node)
297 wanted_nodes.extend(instance.secondary_nodes)
298 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300 del self.recalculate_locks[locking.LEVEL_NODE]
303 class NoHooksLU(LogicalUnit):
304 """Simple LU which runs no hooks.
306 This LU is intended as a parent for other LogicalUnits which will
307 run no hooks, in order to reduce duplicate code.
314 def _GetWantedNodes(lu, nodes):
315 """Returns list of checked and expanded node names.
318 nodes: List of nodes (strings) or None for all
321 if not isinstance(nodes, list):
322 raise errors.OpPrereqError("Invalid argument type 'nodes'")
328 node = lu.cfg.ExpandNodeName(name)
330 raise errors.OpPrereqError("No such node name '%s'" % name)
334 wanted = lu.cfg.GetNodeList()
335 return utils.NiceSort(wanted)
338 def _GetWantedInstances(lu, instances):
339 """Returns list of checked and expanded instance names.
342 instances: List of instances (strings) or None for all
345 if not isinstance(instances, list):
346 raise errors.OpPrereqError("Invalid argument type 'instances'")
351 for name in instances:
352 instance = lu.cfg.ExpandInstanceName(name)
354 raise errors.OpPrereqError("No such instance name '%s'" % name)
355 wanted.append(instance)
358 wanted = lu.cfg.GetInstanceList()
359 return utils.NiceSort(wanted)
362 def _CheckOutputFields(static, dynamic, selected):
363 """Checks whether all selected fields are valid.
366 static: Static fields
367 dynamic: Dynamic fields
370 static_fields = frozenset(static)
371 dynamic_fields = frozenset(dynamic)
373 all_fields = static_fields | dynamic_fields
375 if not all_fields.issuperset(selected):
376 raise errors.OpPrereqError("Unknown output fields selected: %s"
377 % ",".join(frozenset(selected).
378 difference(all_fields)))
381 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382 memory, vcpus, nics):
383 """Builds instance related env variables for hooks from single variables.
386 secondary_nodes: List of secondary nodes as strings
390 "INSTANCE_NAME": name,
391 "INSTANCE_PRIMARY": primary_node,
392 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393 "INSTANCE_OS_TYPE": os_type,
394 "INSTANCE_STATUS": status,
395 "INSTANCE_MEMORY": memory,
396 "INSTANCE_VCPUS": vcpus,
400 nic_count = len(nics)
401 for idx, (ip, bridge, mac) in enumerate(nics):
404 env["INSTANCE_NIC%d_IP" % idx] = ip
405 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
410 env["INSTANCE_NIC_COUNT"] = nic_count
415 def _BuildInstanceHookEnvByObject(instance, override=None):
416 """Builds instance related env variables for hooks from an object.
419 instance: objects.Instance object of instance
420 override: dict of values to override
423 'name': instance.name,
424 'primary_node': instance.primary_node,
425 'secondary_nodes': instance.secondary_nodes,
426 'os_type': instance.os,
427 'status': instance.os,
428 'memory': instance.memory,
429 'vcpus': instance.vcpus,
430 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
433 args.update(override)
434 return _BuildInstanceHookEnv(**args)
437 def _CheckInstanceBridgesExist(instance):
438 """Check that the brigdes needed by an instance exist.
441 # check bridges existance
442 brlist = [nic.bridge for nic in instance.nics]
443 if not rpc.call_bridges_exist(instance.primary_node, brlist):
444 raise errors.OpPrereqError("one or more target bridges %s does not"
445 " exist on destination node '%s'" %
446 (brlist, instance.primary_node))
449 class LUDestroyCluster(NoHooksLU):
450 """Logical unit for destroying the cluster.
455 def CheckPrereq(self):
456 """Check prerequisites.
458 This checks whether the cluster is empty.
460 Any errors are signalled by raising errors.OpPrereqError.
463 master = self.sstore.GetMasterNode()
465 nodelist = self.cfg.GetNodeList()
466 if len(nodelist) != 1 or nodelist[0] != master:
467 raise errors.OpPrereqError("There are still %d node(s) in"
468 " this cluster." % (len(nodelist) - 1))
469 instancelist = self.cfg.GetInstanceList()
471 raise errors.OpPrereqError("There are still %d instance(s) in"
472 " this cluster." % len(instancelist))
474 def Exec(self, feedback_fn):
475 """Destroys the cluster.
478 master = self.sstore.GetMasterNode()
479 if not rpc.call_node_stop_master(master, False):
480 raise errors.OpExecError("Could not disable the master role")
481 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482 utils.CreateBackup(priv_key)
483 utils.CreateBackup(pub_key)
484 rpc.call_node_leave_cluster(master)
487 class LUVerifyCluster(LogicalUnit):
488 """Verifies the cluster status.
491 HPATH = "cluster-verify"
492 HTYPE = constants.HTYPE_CLUSTER
493 _OP_REQP = ["skip_checks"]
495 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496 remote_version, feedback_fn):
497 """Run multiple tests against a node.
500 - compares ganeti version
501 - checks vg existance and size > 20G
502 - checks config file checksum
503 - checks ssh to other nodes
506 node: name of the node to check
507 file_list: required list of files
508 local_cksum: dictionary of local files and their checksums
511 # compares ganeti version
512 local_version = constants.PROTOCOL_VERSION
513 if not remote_version:
514 feedback_fn(" - ERROR: connection to %s failed" % (node))
517 if local_version != remote_version:
518 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
519 (local_version, node, remote_version))
522 # checks vg existance and size > 20G
526 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
530 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531 constants.MIN_VG_SIZE)
533 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
536 # checks config file checksum
539 if 'filelist' not in node_result:
541 feedback_fn(" - ERROR: node hasn't returned file checksum data")
543 remote_cksum = node_result['filelist']
544 for file_name in file_list:
545 if file_name not in remote_cksum:
547 feedback_fn(" - ERROR: file '%s' missing" % file_name)
548 elif remote_cksum[file_name] != local_cksum[file_name]:
550 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
552 if 'nodelist' not in node_result:
554 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
556 if node_result['nodelist']:
558 for node in node_result['nodelist']:
559 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
560 (node, node_result['nodelist'][node]))
561 if 'node-net-test' not in node_result:
563 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
565 if node_result['node-net-test']:
567 nlist = utils.NiceSort(node_result['node-net-test'].keys())
569 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
570 (node, node_result['node-net-test'][node]))
572 hyp_result = node_result.get('hypervisor', None)
573 if hyp_result is not None:
574 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
577 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578 node_instance, feedback_fn):
579 """Verify an instance.
581 This function checks to see if the required block devices are
582 available on the instance's node.
587 node_current = instanceconfig.primary_node
590 instanceconfig.MapLVsByNode(node_vol_should)
592 for node in node_vol_should:
593 for volume in node_vol_should[node]:
594 if node not in node_vol_is or volume not in node_vol_is[node]:
595 feedback_fn(" - ERROR: volume %s missing on node %s" %
599 if not instanceconfig.status == 'down':
600 if (node_current not in node_instance or
601 not instance in node_instance[node_current]):
602 feedback_fn(" - ERROR: instance %s not running on node %s" %
603 (instance, node_current))
606 for node in node_instance:
607 if (not node == node_current):
608 if instance in node_instance[node]:
609 feedback_fn(" - ERROR: instance %s should not run on node %s" %
615 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616 """Verify if there are any unknown volumes in the cluster.
618 The .os, .swap and backup volumes are ignored. All other volumes are
624 for node in node_vol_is:
625 for volume in node_vol_is[node]:
626 if node not in node_vol_should or volume not in node_vol_should[node]:
627 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
632 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633 """Verify the list of running instances.
635 This checks what instances are running but unknown to the cluster.
639 for node in node_instance:
640 for runninginstance in node_instance[node]:
641 if runninginstance not in instancelist:
642 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
643 (runninginstance, node))
647 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648 """Verify N+1 Memory Resilience.
650 Check that if one single node dies we can still start all the instances it
656 for node, nodeinfo in node_info.iteritems():
657 # This code checks that every node which is now listed as secondary has
658 # enough memory to host all instances it is supposed to should a single
659 # other node in the cluster fail.
660 # FIXME: not ready for failover to an arbitrary node
661 # FIXME: does not support file-backed instances
662 # WARNING: we currently take into account down instances as well as up
663 # ones, considering that even if they're down someone might want to start
664 # them even in the event of a node failure.
665 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667 for instance in instances:
668 needed_mem += instance_cfg[instance].memory
669 if nodeinfo['mfree'] < needed_mem:
670 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
671 " failovers should node %s fail" % (node, prinode))
675 def CheckPrereq(self):
676 """Check prerequisites.
678 Transform the list of checks we're going to skip into a set and check that
679 all its members are valid.
682 self.skip_set = frozenset(self.op.skip_checks)
683 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684 raise errors.OpPrereqError("Invalid checks to be skipped specified")
686 def BuildHooksEnv(self):
689 Cluster-Verify hooks just rone in the post phase and their failure makes
690 the output be logged in the verify output and the verification to fail.
693 all_nodes = self.cfg.GetNodeList()
694 # TODO: populate the environment with useful information for verify hooks
696 return env, [], all_nodes
698 def Exec(self, feedback_fn):
699 """Verify integrity of cluster, performing various test on nodes.
703 feedback_fn("* Verifying global settings")
704 for msg in self.cfg.VerifyConfig():
705 feedback_fn(" - ERROR: %s" % msg)
707 vg_name = self.cfg.GetVGName()
708 nodelist = utils.NiceSort(self.cfg.GetNodeList())
709 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711 i_non_redundant = [] # Non redundant instances
717 # FIXME: verify OS list
719 file_names = list(self.sstore.GetFileList())
720 file_names.append(constants.SSL_CERT_FILE)
721 file_names.append(constants.CLUSTER_CONF_FILE)
722 local_checksums = utils.FingerprintFiles(file_names)
724 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726 all_instanceinfo = rpc.call_instance_list(nodelist)
727 all_vglist = rpc.call_vg_list(nodelist)
728 node_verify_param = {
729 'filelist': file_names,
730 'nodelist': nodelist,
732 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733 for node in nodeinfo]
735 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736 all_rversion = rpc.call_version(nodelist)
737 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739 for node in nodelist:
740 feedback_fn("* Verifying node %s" % node)
741 result = self._VerifyNode(node, file_names, local_checksums,
742 all_vglist[node], all_nvinfo[node],
743 all_rversion[node], feedback_fn)
747 volumeinfo = all_volumeinfo[node]
749 if isinstance(volumeinfo, basestring):
750 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
751 (node, volumeinfo[-400:].encode('string_escape')))
753 node_volume[node] = {}
754 elif not isinstance(volumeinfo, dict):
755 feedback_fn(" - ERROR: connection to %s failed" % (node,))
759 node_volume[node] = volumeinfo
762 nodeinstance = all_instanceinfo[node]
763 if type(nodeinstance) != list:
764 feedback_fn(" - ERROR: connection to %s failed" % (node,))
768 node_instance[node] = nodeinstance
771 nodeinfo = all_ninfo[node]
772 if not isinstance(nodeinfo, dict):
773 feedback_fn(" - ERROR: connection to %s failed" % (node,))
779 "mfree": int(nodeinfo['memory_free']),
780 "dfree": int(nodeinfo['vg_free']),
783 # dictionary holding all instances this node is secondary for,
784 # grouped by their primary node. Each key is a cluster node, and each
785 # value is a list of instances which have the key as primary and the
786 # current node as secondary. this is handy to calculate N+1 memory
787 # availability if you can only failover from a primary to its
789 "sinst-by-pnode": {},
792 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
798 for instance in instancelist:
799 feedback_fn("* Verifying instance %s" % instance)
800 inst_config = self.cfg.GetInstanceInfo(instance)
801 result = self._VerifyInstance(instance, inst_config, node_volume,
802 node_instance, feedback_fn)
805 inst_config.MapLVsByNode(node_vol_should)
807 instance_cfg[instance] = inst_config
809 pnode = inst_config.primary_node
810 if pnode in node_info:
811 node_info[pnode]['pinst'].append(instance)
813 feedback_fn(" - ERROR: instance %s, connection to primary node"
814 " %s failed" % (instance, pnode))
817 # If the instance is non-redundant we cannot survive losing its primary
818 # node, so we are not N+1 compliant. On the other hand we have no disk
819 # templates with more than one secondary so that situation is not well
821 # FIXME: does not support file-backed instances
822 if len(inst_config.secondary_nodes) == 0:
823 i_non_redundant.append(instance)
824 elif len(inst_config.secondary_nodes) > 1:
825 feedback_fn(" - WARNING: multiple secondaries for instance %s"
828 for snode in inst_config.secondary_nodes:
829 if snode in node_info:
830 node_info[snode]['sinst'].append(instance)
831 if pnode not in node_info[snode]['sinst-by-pnode']:
832 node_info[snode]['sinst-by-pnode'][pnode] = []
833 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835 feedback_fn(" - ERROR: instance %s, connection to secondary node"
836 " %s failed" % (instance, snode))
838 feedback_fn("* Verifying orphan volumes")
839 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843 feedback_fn("* Verifying remaining instances")
844 result = self._VerifyOrphanInstances(instancelist, node_instance,
848 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849 feedback_fn("* Verifying N+1 Memory redundancy")
850 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
853 feedback_fn("* Other Notes")
855 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
856 % len(i_non_redundant))
860 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861 """Analize the post-hooks' result, handle it, and send some
862 nicely-formatted feedback back to the user.
865 phase: the hooks phase that has just been run
866 hooks_results: the results of the multi-node hooks rpc call
867 feedback_fn: function to send feedback back to the caller
868 lu_result: previous Exec result
871 # We only really run POST phase hooks, and are only interested in
873 if phase == constants.HOOKS_PHASE_POST:
874 # Used to change hooks' output to proper indentation
875 indent_re = re.compile('^', re.M)
876 feedback_fn("* Hooks Results")
877 if not hooks_results:
878 feedback_fn(" - ERROR: general communication failure")
881 for node_name in hooks_results:
882 show_node_header = True
883 res = hooks_results[node_name]
884 if res is False or not isinstance(res, list):
885 feedback_fn(" Communication failure")
888 for script, hkr, output in res:
889 if hkr == constants.HKR_FAIL:
890 # The node header is only shown once, if there are
891 # failing hooks on that node
893 feedback_fn(" Node %s:" % node_name)
894 show_node_header = False
895 feedback_fn(" ERROR: Script %s failed, output:" % script)
896 output = indent_re.sub(' ', output)
897 feedback_fn("%s" % output)
903 class LUVerifyDisks(NoHooksLU):
904 """Verifies the cluster disks status.
909 def CheckPrereq(self):
910 """Check prerequisites.
912 This has no prerequisites.
917 def Exec(self, feedback_fn):
918 """Verify integrity of cluster disks.
921 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923 vg_name = self.cfg.GetVGName()
924 nodes = utils.NiceSort(self.cfg.GetNodeList())
925 instances = [self.cfg.GetInstanceInfo(name)
926 for name in self.cfg.GetInstanceList()]
929 for inst in instances:
931 if (inst.status != "up" or
932 inst.disk_template not in constants.DTS_NET_MIRROR):
934 inst.MapLVsByNode(inst_lvs)
935 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936 for node, vol_list in inst_lvs.iteritems():
938 nv_dict[(node, vol)] = inst
943 node_lvs = rpc.call_volume_list(nodes, vg_name)
950 if isinstance(lvs, basestring):
951 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953 elif not isinstance(lvs, dict):
954 logger.Info("connection to node %s failed or invalid data returned" %
956 res_nodes.append(node)
959 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960 inst = nv_dict.pop((node, lv_name), None)
961 if (not lv_online and inst is not None
962 and inst.name not in res_instances):
963 res_instances.append(inst.name)
965 # any leftover items in nv_dict are missing LVs, let's arrange the
967 for key, inst in nv_dict.iteritems():
968 if inst.name not in res_missing:
969 res_missing[inst.name] = []
970 res_missing[inst.name].append(key)
975 class LURenameCluster(LogicalUnit):
976 """Rename the cluster.
979 HPATH = "cluster-rename"
980 HTYPE = constants.HTYPE_CLUSTER
984 def BuildHooksEnv(self):
989 "OP_TARGET": self.sstore.GetClusterName(),
990 "NEW_NAME": self.op.name,
992 mn = self.sstore.GetMasterNode()
993 return env, [mn], [mn]
995 def CheckPrereq(self):
996 """Verify that the passed name is a valid one.
999 hostname = utils.HostInfo(self.op.name)
1001 new_name = hostname.name
1002 self.ip = new_ip = hostname.ip
1003 old_name = self.sstore.GetClusterName()
1004 old_ip = self.sstore.GetMasterIP()
1005 if new_name == old_name and new_ip == old_ip:
1006 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007 " cluster has changed")
1008 if new_ip != old_ip:
1009 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011 " reachable on the network. Aborting." %
1014 self.op.name = new_name
1016 def Exec(self, feedback_fn):
1017 """Rename the cluster.
1020 clustername = self.op.name
1024 # shutdown the master IP
1025 master = ss.GetMasterNode()
1026 if not rpc.call_node_stop_master(master, False):
1027 raise errors.OpExecError("Could not disable the master role")
1031 ss.SetKey(ss.SS_MASTER_IP, ip)
1032 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034 # Distribute updated ss config to all nodes
1035 myself = self.cfg.GetNodeInfo(master)
1036 dist_nodes = self.cfg.GetNodeList()
1037 if myself.name in dist_nodes:
1038 dist_nodes.remove(myself.name)
1040 logger.Debug("Copying updated ssconf data to all nodes")
1041 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042 fname = ss.KeyToFilename(keyname)
1043 result = rpc.call_upload_file(dist_nodes, fname)
1044 for to_node in dist_nodes:
1045 if not result[to_node]:
1046 logger.Error("copy of file %s to node %s failed" %
1049 if not rpc.call_node_start_master(master, False):
1050 logger.Error("Could not re-enable the master role on the master,"
1051 " please restart manually.")
1054 def _RecursiveCheckIfLVMBased(disk):
1055 """Check if the given disk or its children are lvm-based.
1058 disk: ganeti.objects.Disk object
1061 boolean indicating whether a LD_LV dev_type was found or not
1065 for chdisk in disk.children:
1066 if _RecursiveCheckIfLVMBased(chdisk):
1068 return disk.dev_type == constants.LD_LV
1071 class LUSetClusterParams(LogicalUnit):
1072 """Change the parameters of the cluster.
1075 HPATH = "cluster-modify"
1076 HTYPE = constants.HTYPE_CLUSTER
1079 def BuildHooksEnv(self):
1084 "OP_TARGET": self.sstore.GetClusterName(),
1085 "NEW_VG_NAME": self.op.vg_name,
1087 mn = self.sstore.GetMasterNode()
1088 return env, [mn], [mn]
1090 def CheckPrereq(self):
1091 """Check prerequisites.
1093 This checks whether the given params don't conflict and
1094 if the given volume group is valid.
1097 if not self.op.vg_name:
1098 instances = [self.cfg.GetInstanceInfo(name)
1099 for name in self.cfg.GetInstanceList()]
1100 for inst in instances:
1101 for disk in inst.disks:
1102 if _RecursiveCheckIfLVMBased(disk):
1103 raise errors.OpPrereqError("Cannot disable lvm storage while"
1104 " lvm-based instances exist")
1106 # if vg_name not None, checks given volume group on all nodes
1108 node_list = self.cfg.GetNodeList()
1109 vglist = rpc.call_vg_list(node_list)
1110 for node in node_list:
1111 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112 constants.MIN_VG_SIZE)
1114 raise errors.OpPrereqError("Error on node '%s': %s" %
1117 def Exec(self, feedback_fn):
1118 """Change the parameters of the cluster.
1121 if self.op.vg_name != self.cfg.GetVGName():
1122 self.cfg.SetVGName(self.op.vg_name)
1124 feedback_fn("Cluster LVM configuration already in desired"
1125 " state, not changing")
1128 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129 """Sleep and poll for an instance's disk to sync.
1132 if not instance.disks:
1136 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138 node = instance.primary_node
1140 for dev in instance.disks:
1141 cfgw.SetDiskID(dev, node)
1147 cumul_degraded = False
1148 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150 proc.LogWarning("Can't get any data from node %s" % node)
1153 raise errors.RemoteError("Can't contact node %s for mirror data,"
1154 " aborting." % node)
1158 for i in range(len(rstats)):
1161 proc.LogWarning("Can't compute data for node %s/%s" %
1162 (node, instance.disks[i].iv_name))
1164 # we ignore the ldisk parameter
1165 perc_done, est_time, is_degraded, _ = mstat
1166 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167 if perc_done is not None:
1169 if est_time is not None:
1170 rem_time = "%d estimated seconds remaining" % est_time
1173 rem_time = "no time estimate"
1174 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175 (instance.disks[i].iv_name, perc_done, rem_time))
1179 time.sleep(min(60, max_time))
1182 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183 return not cumul_degraded
1186 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187 """Check that mirrors are not degraded.
1189 The ldisk parameter, if True, will change the test from the
1190 is_degraded attribute (which represents overall non-ok status for
1191 the device(s)) to the ldisk (representing the local storage status).
1194 cfgw.SetDiskID(dev, node)
1201 if on_primary or dev.AssembleOnSecondary():
1202 rstats = rpc.call_blockdev_find(node, dev)
1204 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1207 result = result and (not rstats[idx])
1209 for child in dev.children:
1210 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1215 class LUDiagnoseOS(NoHooksLU):
1216 """Logical unit for OS diagnose/query.
1219 _OP_REQP = ["output_fields", "names"]
1221 def CheckPrereq(self):
1222 """Check prerequisites.
1224 This always succeeds, since this is a pure query LU.
1228 raise errors.OpPrereqError("Selective OS query not supported")
1230 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231 _CheckOutputFields(static=[],
1232 dynamic=self.dynamic_fields,
1233 selected=self.op.output_fields)
1236 def _DiagnoseByOS(node_list, rlist):
1237 """Remaps a per-node return list into an a per-os per-node dictionary
1240 node_list: a list with the names of all nodes
1241 rlist: a map with node names as keys and OS objects as values
1244 map: a map with osnames as keys and as value another map, with
1246 keys and list of OS objects as values
1247 e.g. {"debian-etch": {"node1": [<object>,...],
1248 "node2": [<object>,]}
1253 for node_name, nr in rlist.iteritems():
1257 if os_obj.name not in all_os:
1258 # build a list of nodes for this os containing empty lists
1259 # for each node in node_list
1260 all_os[os_obj.name] = {}
1261 for nname in node_list:
1262 all_os[os_obj.name][nname] = []
1263 all_os[os_obj.name][node_name].append(os_obj)
1266 def Exec(self, feedback_fn):
1267 """Compute the list of OSes.
1270 node_list = self.cfg.GetNodeList()
1271 node_data = rpc.call_os_diagnose(node_list)
1272 if node_data == False:
1273 raise errors.OpExecError("Can't gather the list of OSes")
1274 pol = self._DiagnoseByOS(node_list, node_data)
1276 for os_name, os_data in pol.iteritems():
1278 for field in self.op.output_fields:
1281 elif field == "valid":
1282 val = utils.all([osl and osl[0] for osl in os_data.values()])
1283 elif field == "node_status":
1285 for node_name, nos_list in os_data.iteritems():
1286 val[node_name] = [(v.status, v.path) for v in nos_list]
1288 raise errors.ParameterError(field)
1295 class LURemoveNode(LogicalUnit):
1296 """Logical unit for removing a node.
1299 HPATH = "node-remove"
1300 HTYPE = constants.HTYPE_NODE
1301 _OP_REQP = ["node_name"]
1303 def BuildHooksEnv(self):
1306 This doesn't run on the target node in the pre phase as a failed
1307 node would then be impossible to remove.
1311 "OP_TARGET": self.op.node_name,
1312 "NODE_NAME": self.op.node_name,
1314 all_nodes = self.cfg.GetNodeList()
1315 all_nodes.remove(self.op.node_name)
1316 return env, all_nodes, all_nodes
1318 def CheckPrereq(self):
1319 """Check prerequisites.
1322 - the node exists in the configuration
1323 - it does not have primary or secondary instances
1324 - it's not the master
1326 Any errors are signalled by raising errors.OpPrereqError.
1329 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1331 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1333 instance_list = self.cfg.GetInstanceList()
1335 masternode = self.sstore.GetMasterNode()
1336 if node.name == masternode:
1337 raise errors.OpPrereqError("Node is the master node,"
1338 " you need to failover first.")
1340 for instance_name in instance_list:
1341 instance = self.cfg.GetInstanceInfo(instance_name)
1342 if node.name == instance.primary_node:
1343 raise errors.OpPrereqError("Instance %s still running on the node,"
1344 " please remove first." % instance_name)
1345 if node.name in instance.secondary_nodes:
1346 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347 " please remove first." % instance_name)
1348 self.op.node_name = node.name
1351 def Exec(self, feedback_fn):
1352 """Removes the node from the cluster.
1356 logger.Info("stopping the node daemon and removing configs from node %s" %
1359 rpc.call_node_leave_cluster(node.name)
1361 logger.Info("Removing node %s from config" % node.name)
1363 self.cfg.RemoveNode(node.name)
1364 # Remove the node from the Ganeti Lock Manager
1365 self.context.glm.remove(locking.LEVEL_NODE, node.name)
1367 utils.RemoveHostFromEtcHosts(node.name)
1370 class LUQueryNodes(NoHooksLU):
1371 """Logical unit for querying nodes.
1374 _OP_REQP = ["output_fields", "names"]
1376 def CheckPrereq(self):
1377 """Check prerequisites.
1379 This checks that the fields required are valid output fields.
1382 self.dynamic_fields = frozenset([
1384 "mtotal", "mnode", "mfree",
1389 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1390 "pinst_list", "sinst_list",
1391 "pip", "sip", "tags"],
1392 dynamic=self.dynamic_fields,
1393 selected=self.op.output_fields)
1395 self.wanted = _GetWantedNodes(self, self.op.names)
1397 def Exec(self, feedback_fn):
1398 """Computes the list of nodes and their attributes.
1401 nodenames = self.wanted
1402 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1404 # begin data gathering
1406 if self.dynamic_fields.intersection(self.op.output_fields):
1408 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1409 for name in nodenames:
1410 nodeinfo = node_data.get(name, None)
1413 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1414 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1415 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1416 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1417 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1418 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1419 "bootid": nodeinfo['bootid'],
1422 live_data[name] = {}
1424 live_data = dict.fromkeys(nodenames, {})
1426 node_to_primary = dict([(name, set()) for name in nodenames])
1427 node_to_secondary = dict([(name, set()) for name in nodenames])
1429 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1430 "sinst_cnt", "sinst_list"))
1431 if inst_fields & frozenset(self.op.output_fields):
1432 instancelist = self.cfg.GetInstanceList()
1434 for instance_name in instancelist:
1435 inst = self.cfg.GetInstanceInfo(instance_name)
1436 if inst.primary_node in node_to_primary:
1437 node_to_primary[inst.primary_node].add(inst.name)
1438 for secnode in inst.secondary_nodes:
1439 if secnode in node_to_secondary:
1440 node_to_secondary[secnode].add(inst.name)
1442 # end data gathering
1445 for node in nodelist:
1447 for field in self.op.output_fields:
1450 elif field == "pinst_list":
1451 val = list(node_to_primary[node.name])
1452 elif field == "sinst_list":
1453 val = list(node_to_secondary[node.name])
1454 elif field == "pinst_cnt":
1455 val = len(node_to_primary[node.name])
1456 elif field == "sinst_cnt":
1457 val = len(node_to_secondary[node.name])
1458 elif field == "pip":
1459 val = node.primary_ip
1460 elif field == "sip":
1461 val = node.secondary_ip
1462 elif field == "tags":
1463 val = list(node.GetTags())
1464 elif field in self.dynamic_fields:
1465 val = live_data[node.name].get(field, None)
1467 raise errors.ParameterError(field)
1468 node_output.append(val)
1469 output.append(node_output)
1474 class LUQueryNodeVolumes(NoHooksLU):
1475 """Logical unit for getting volumes on node(s).
1478 _OP_REQP = ["nodes", "output_fields"]
1480 def CheckPrereq(self):
1481 """Check prerequisites.
1483 This checks that the fields required are valid output fields.
1486 self.nodes = _GetWantedNodes(self, self.op.nodes)
1488 _CheckOutputFields(static=["node"],
1489 dynamic=["phys", "vg", "name", "size", "instance"],
1490 selected=self.op.output_fields)
1493 def Exec(self, feedback_fn):
1494 """Computes the list of nodes and their attributes.
1497 nodenames = self.nodes
1498 volumes = rpc.call_node_volumes(nodenames)
1500 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1501 in self.cfg.GetInstanceList()]
1503 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1506 for node in nodenames:
1507 if node not in volumes or not volumes[node]:
1510 node_vols = volumes[node][:]
1511 node_vols.sort(key=lambda vol: vol['dev'])
1513 for vol in node_vols:
1515 for field in self.op.output_fields:
1518 elif field == "phys":
1522 elif field == "name":
1524 elif field == "size":
1525 val = int(float(vol['size']))
1526 elif field == "instance":
1528 if node not in lv_by_node[inst]:
1530 if vol['name'] in lv_by_node[inst][node]:
1536 raise errors.ParameterError(field)
1537 node_output.append(str(val))
1539 output.append(node_output)
1544 class LUAddNode(LogicalUnit):
1545 """Logical unit for adding node to the cluster.
1549 HTYPE = constants.HTYPE_NODE
1550 _OP_REQP = ["node_name"]
1552 def BuildHooksEnv(self):
1555 This will run on all nodes before, and on all nodes + the new node after.
1559 "OP_TARGET": self.op.node_name,
1560 "NODE_NAME": self.op.node_name,
1561 "NODE_PIP": self.op.primary_ip,
1562 "NODE_SIP": self.op.secondary_ip,
1564 nodes_0 = self.cfg.GetNodeList()
1565 nodes_1 = nodes_0 + [self.op.node_name, ]
1566 return env, nodes_0, nodes_1
1568 def CheckPrereq(self):
1569 """Check prerequisites.
1572 - the new node is not already in the config
1574 - its parameters (single/dual homed) matches the cluster
1576 Any errors are signalled by raising errors.OpPrereqError.
1579 node_name = self.op.node_name
1582 dns_data = utils.HostInfo(node_name)
1584 node = dns_data.name
1585 primary_ip = self.op.primary_ip = dns_data.ip
1586 secondary_ip = getattr(self.op, "secondary_ip", None)
1587 if secondary_ip is None:
1588 secondary_ip = primary_ip
1589 if not utils.IsValidIP(secondary_ip):
1590 raise errors.OpPrereqError("Invalid secondary IP given")
1591 self.op.secondary_ip = secondary_ip
1593 node_list = cfg.GetNodeList()
1594 if not self.op.readd and node in node_list:
1595 raise errors.OpPrereqError("Node %s is already in the configuration" %
1597 elif self.op.readd and node not in node_list:
1598 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1600 for existing_node_name in node_list:
1601 existing_node = cfg.GetNodeInfo(existing_node_name)
1603 if self.op.readd and node == existing_node_name:
1604 if (existing_node.primary_ip != primary_ip or
1605 existing_node.secondary_ip != secondary_ip):
1606 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1607 " address configuration as before")
1610 if (existing_node.primary_ip == primary_ip or
1611 existing_node.secondary_ip == primary_ip or
1612 existing_node.primary_ip == secondary_ip or
1613 existing_node.secondary_ip == secondary_ip):
1614 raise errors.OpPrereqError("New node ip address(es) conflict with"
1615 " existing node %s" % existing_node.name)
1617 # check that the type of the node (single versus dual homed) is the
1618 # same as for the master
1619 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1620 master_singlehomed = myself.secondary_ip == myself.primary_ip
1621 newbie_singlehomed = secondary_ip == primary_ip
1622 if master_singlehomed != newbie_singlehomed:
1623 if master_singlehomed:
1624 raise errors.OpPrereqError("The master has no private ip but the"
1625 " new node has one")
1627 raise errors.OpPrereqError("The master has a private ip but the"
1628 " new node doesn't have one")
1630 # checks reachablity
1631 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1632 raise errors.OpPrereqError("Node not reachable by ping")
1634 if not newbie_singlehomed:
1635 # check reachability from my secondary ip to newbie's secondary ip
1636 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1637 source=myself.secondary_ip):
1638 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1639 " based ping to noded port")
1641 self.new_node = objects.Node(name=node,
1642 primary_ip=primary_ip,
1643 secondary_ip=secondary_ip)
1645 def Exec(self, feedback_fn):
1646 """Adds the new node to the cluster.
1649 new_node = self.new_node
1650 node = new_node.name
1652 # check connectivity
1653 result = rpc.call_version([node])[node]
1655 if constants.PROTOCOL_VERSION == result:
1656 logger.Info("communication to node %s fine, sw version %s match" %
1659 raise errors.OpExecError("Version mismatch master version %s,"
1660 " node version %s" %
1661 (constants.PROTOCOL_VERSION, result))
1663 raise errors.OpExecError("Cannot get version from the new node")
1666 logger.Info("copy ssh key to node %s" % node)
1667 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1669 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1670 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1676 keyarray.append(f.read())
1680 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1681 keyarray[3], keyarray[4], keyarray[5])
1684 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1686 # Add node to our /etc/hosts, and add key to known_hosts
1687 utils.AddHostToEtcHosts(new_node.name)
1689 if new_node.secondary_ip != new_node.primary_ip:
1690 if not rpc.call_node_tcp_ping(new_node.name,
1691 constants.LOCALHOST_IP_ADDRESS,
1692 new_node.secondary_ip,
1693 constants.DEFAULT_NODED_PORT,
1695 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1696 " you gave (%s). Please fix and re-run this"
1697 " command." % new_node.secondary_ip)
1699 node_verify_list = [self.sstore.GetMasterNode()]
1700 node_verify_param = {
1702 # TODO: do a node-net-test as well?
1705 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1706 for verifier in node_verify_list:
1707 if not result[verifier]:
1708 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1709 " for remote verification" % verifier)
1710 if result[verifier]['nodelist']:
1711 for failed in result[verifier]['nodelist']:
1712 feedback_fn("ssh/hostname verification failed %s -> %s" %
1713 (verifier, result[verifier]['nodelist'][failed]))
1714 raise errors.OpExecError("ssh/hostname verification failed.")
1716 # Distribute updated /etc/hosts and known_hosts to all nodes,
1717 # including the node just added
1718 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1719 dist_nodes = self.cfg.GetNodeList()
1720 if not self.op.readd:
1721 dist_nodes.append(node)
1722 if myself.name in dist_nodes:
1723 dist_nodes.remove(myself.name)
1725 logger.Debug("Copying hosts and known_hosts to all nodes")
1726 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1727 result = rpc.call_upload_file(dist_nodes, fname)
1728 for to_node in dist_nodes:
1729 if not result[to_node]:
1730 logger.Error("copy of file %s to node %s failed" %
1733 to_copy = self.sstore.GetFileList()
1734 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1735 to_copy.append(constants.VNC_PASSWORD_FILE)
1736 for fname in to_copy:
1737 result = rpc.call_upload_file([node], fname)
1738 if not result[node]:
1739 logger.Error("could not copy file %s to node %s" % (fname, node))
1741 if not self.op.readd:
1742 logger.Info("adding node %s to cluster.conf" % node)
1743 self.cfg.AddNode(new_node)
1744 # Add the new node to the Ganeti Lock Manager
1745 self.context.glm.add(locking.LEVEL_NODE, node)
1748 class LUQueryClusterInfo(NoHooksLU):
1749 """Query cluster configuration.
1756 def ExpandNames(self):
1757 self.needed_locks = {}
1759 def CheckPrereq(self):
1760 """No prerequsites needed for this LU.
1765 def Exec(self, feedback_fn):
1766 """Return cluster config.
1770 "name": self.sstore.GetClusterName(),
1771 "software_version": constants.RELEASE_VERSION,
1772 "protocol_version": constants.PROTOCOL_VERSION,
1773 "config_version": constants.CONFIG_VERSION,
1774 "os_api_version": constants.OS_API_VERSION,
1775 "export_version": constants.EXPORT_VERSION,
1776 "master": self.sstore.GetMasterNode(),
1777 "architecture": (platform.architecture()[0], platform.machine()),
1778 "hypervisor_type": self.sstore.GetHypervisorType(),
1784 class LUDumpClusterConfig(NoHooksLU):
1785 """Return a text-representation of the cluster-config.
1791 def ExpandNames(self):
1792 self.needed_locks = {}
1794 def CheckPrereq(self):
1795 """No prerequisites.
1800 def Exec(self, feedback_fn):
1801 """Dump a representation of the cluster config to the standard output.
1804 return self.cfg.DumpConfig()
1807 class LUActivateInstanceDisks(NoHooksLU):
1808 """Bring up an instance's disks.
1811 _OP_REQP = ["instance_name"]
1813 def CheckPrereq(self):
1814 """Check prerequisites.
1816 This checks that the instance is in the cluster.
1819 instance = self.cfg.GetInstanceInfo(
1820 self.cfg.ExpandInstanceName(self.op.instance_name))
1821 if instance is None:
1822 raise errors.OpPrereqError("Instance '%s' not known" %
1823 self.op.instance_name)
1824 self.instance = instance
1827 def Exec(self, feedback_fn):
1828 """Activate the disks.
1831 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1833 raise errors.OpExecError("Cannot activate block devices")
1838 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1839 """Prepare the block devices for an instance.
1841 This sets up the block devices on all nodes.
1844 instance: a ganeti.objects.Instance object
1845 ignore_secondaries: if true, errors on secondary nodes won't result
1846 in an error return from the function
1849 false if the operation failed
1850 list of (host, instance_visible_name, node_visible_name) if the operation
1851 suceeded with the mapping from node devices to instance devices
1855 iname = instance.name
1856 # With the two passes mechanism we try to reduce the window of
1857 # opportunity for the race condition of switching DRBD to primary
1858 # before handshaking occured, but we do not eliminate it
1860 # The proper fix would be to wait (with some limits) until the
1861 # connection has been made and drbd transitions from WFConnection
1862 # into any other network-connected state (Connected, SyncTarget,
1865 # 1st pass, assemble on all nodes in secondary mode
1866 for inst_disk in instance.disks:
1867 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1868 cfg.SetDiskID(node_disk, node)
1869 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1871 logger.Error("could not prepare block device %s on node %s"
1872 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1873 if not ignore_secondaries:
1876 # FIXME: race condition on drbd migration to primary
1878 # 2nd pass, do only the primary node
1879 for inst_disk in instance.disks:
1880 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1881 if node != instance.primary_node:
1883 cfg.SetDiskID(node_disk, node)
1884 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1886 logger.Error("could not prepare block device %s on node %s"
1887 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1889 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1891 # leave the disks configured for the primary node
1892 # this is a workaround that would be fixed better by
1893 # improving the logical/physical id handling
1894 for disk in instance.disks:
1895 cfg.SetDiskID(disk, instance.primary_node)
1897 return disks_ok, device_info
1900 def _StartInstanceDisks(cfg, instance, force):
1901 """Start the disks of an instance.
1904 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1905 ignore_secondaries=force)
1907 _ShutdownInstanceDisks(instance, cfg)
1908 if force is not None and not force:
1909 logger.Error("If the message above refers to a secondary node,"
1910 " you can retry the operation using '--force'.")
1911 raise errors.OpExecError("Disk consistency error")
1914 class LUDeactivateInstanceDisks(NoHooksLU):
1915 """Shutdown an instance's disks.
1918 _OP_REQP = ["instance_name"]
1920 def CheckPrereq(self):
1921 """Check prerequisites.
1923 This checks that the instance is in the cluster.
1926 instance = self.cfg.GetInstanceInfo(
1927 self.cfg.ExpandInstanceName(self.op.instance_name))
1928 if instance is None:
1929 raise errors.OpPrereqError("Instance '%s' not known" %
1930 self.op.instance_name)
1931 self.instance = instance
1933 def Exec(self, feedback_fn):
1934 """Deactivate the disks
1937 instance = self.instance
1938 ins_l = rpc.call_instance_list([instance.primary_node])
1939 ins_l = ins_l[instance.primary_node]
1940 if not type(ins_l) is list:
1941 raise errors.OpExecError("Can't contact node '%s'" %
1942 instance.primary_node)
1944 if self.instance.name in ins_l:
1945 raise errors.OpExecError("Instance is running, can't shutdown"
1948 _ShutdownInstanceDisks(instance, self.cfg)
1951 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1952 """Shutdown block devices of an instance.
1954 This does the shutdown on all nodes of the instance.
1956 If the ignore_primary is false, errors on the primary node are
1961 for disk in instance.disks:
1962 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1963 cfg.SetDiskID(top_disk, node)
1964 if not rpc.call_blockdev_shutdown(node, top_disk):
1965 logger.Error("could not shutdown block device %s on node %s" %
1966 (disk.iv_name, node))
1967 if not ignore_primary or node != instance.primary_node:
1972 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1973 """Checks if a node has enough free memory.
1975 This function check if a given node has the needed amount of free
1976 memory. In case the node has less memory or we cannot get the
1977 information from the node, this function raise an OpPrereqError
1981 - cfg: a ConfigWriter instance
1982 - node: the node name
1983 - reason: string to use in the error message
1984 - requested: the amount of memory in MiB
1987 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1988 if not nodeinfo or not isinstance(nodeinfo, dict):
1989 raise errors.OpPrereqError("Could not contact node %s for resource"
1990 " information" % (node,))
1992 free_mem = nodeinfo[node].get('memory_free')
1993 if not isinstance(free_mem, int):
1994 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1995 " was '%s'" % (node, free_mem))
1996 if requested > free_mem:
1997 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1998 " needed %s MiB, available %s MiB" %
1999 (node, reason, requested, free_mem))
2002 class LUStartupInstance(LogicalUnit):
2003 """Starts an instance.
2006 HPATH = "instance-start"
2007 HTYPE = constants.HTYPE_INSTANCE
2008 _OP_REQP = ["instance_name", "force"]
2011 def ExpandNames(self):
2012 self._ExpandAndLockInstance()
2013 self.needed_locks[locking.LEVEL_NODE] = []
2014 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2016 def DeclareLocks(self, level):
2017 if level == locking.LEVEL_NODE:
2018 self._LockInstancesNodes()
2020 def BuildHooksEnv(self):
2023 This runs on master, primary and secondary nodes of the instance.
2027 "FORCE": self.op.force,
2029 env.update(_BuildInstanceHookEnvByObject(self.instance))
2030 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2031 list(self.instance.secondary_nodes))
2034 def CheckPrereq(self):
2035 """Check prerequisites.
2037 This checks that the instance is in the cluster.
2040 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041 assert self.instance is not None, \
2042 "Cannot retrieve locked instance %s" % self.op.instance_name
2044 # check bridges existance
2045 _CheckInstanceBridgesExist(instance)
2047 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048 "starting instance %s" % instance.name,
2051 def Exec(self, feedback_fn):
2052 """Start the instance.
2055 instance = self.instance
2056 force = self.op.force
2057 extra_args = getattr(self.op, "extra_args", "")
2059 self.cfg.MarkInstanceUp(instance.name)
2061 node_current = instance.primary_node
2063 _StartInstanceDisks(self.cfg, instance, force)
2065 if not rpc.call_instance_start(node_current, instance, extra_args):
2066 _ShutdownInstanceDisks(instance, self.cfg)
2067 raise errors.OpExecError("Could not start instance")
2070 class LURebootInstance(LogicalUnit):
2071 """Reboot an instance.
2074 HPATH = "instance-reboot"
2075 HTYPE = constants.HTYPE_INSTANCE
2076 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2079 def ExpandNames(self):
2080 self._ExpandAndLockInstance()
2081 self.needed_locks[locking.LEVEL_NODE] = []
2082 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2084 def DeclareLocks(self, level):
2085 if level == locking.LEVEL_NODE:
2086 self._LockInstancesNodes()
2088 def BuildHooksEnv(self):
2091 This runs on master, primary and secondary nodes of the instance.
2095 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2097 env.update(_BuildInstanceHookEnvByObject(self.instance))
2098 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2099 list(self.instance.secondary_nodes))
2102 def CheckPrereq(self):
2103 """Check prerequisites.
2105 This checks that the instance is in the cluster.
2108 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109 assert self.instance is not None, \
2110 "Cannot retrieve locked instance %s" % self.op.instance_name
2112 # check bridges existance
2113 _CheckInstanceBridgesExist(instance)
2115 def Exec(self, feedback_fn):
2116 """Reboot the instance.
2119 instance = self.instance
2120 ignore_secondaries = self.op.ignore_secondaries
2121 reboot_type = self.op.reboot_type
2122 extra_args = getattr(self.op, "extra_args", "")
2124 node_current = instance.primary_node
2126 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127 constants.INSTANCE_REBOOT_HARD,
2128 constants.INSTANCE_REBOOT_FULL]:
2129 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130 (constants.INSTANCE_REBOOT_SOFT,
2131 constants.INSTANCE_REBOOT_HARD,
2132 constants.INSTANCE_REBOOT_FULL))
2134 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2135 constants.INSTANCE_REBOOT_HARD]:
2136 if not rpc.call_instance_reboot(node_current, instance,
2137 reboot_type, extra_args):
2138 raise errors.OpExecError("Could not reboot instance")
2140 if not rpc.call_instance_shutdown(node_current, instance):
2141 raise errors.OpExecError("could not shutdown instance for full reboot")
2142 _ShutdownInstanceDisks(instance, self.cfg)
2143 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2144 if not rpc.call_instance_start(node_current, instance, extra_args):
2145 _ShutdownInstanceDisks(instance, self.cfg)
2146 raise errors.OpExecError("Could not start instance for full reboot")
2148 self.cfg.MarkInstanceUp(instance.name)
2151 class LUShutdownInstance(LogicalUnit):
2152 """Shutdown an instance.
2155 HPATH = "instance-stop"
2156 HTYPE = constants.HTYPE_INSTANCE
2157 _OP_REQP = ["instance_name"]
2160 def ExpandNames(self):
2161 self._ExpandAndLockInstance()
2162 self.needed_locks[locking.LEVEL_NODE] = []
2163 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2165 def DeclareLocks(self, level):
2166 if level == locking.LEVEL_NODE:
2167 self._LockInstancesNodes()
2169 def BuildHooksEnv(self):
2172 This runs on master, primary and secondary nodes of the instance.
2175 env = _BuildInstanceHookEnvByObject(self.instance)
2176 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2177 list(self.instance.secondary_nodes))
2180 def CheckPrereq(self):
2181 """Check prerequisites.
2183 This checks that the instance is in the cluster.
2186 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2187 assert self.instance is not None, \
2188 "Cannot retrieve locked instance %s" % self.op.instance_name
2190 def Exec(self, feedback_fn):
2191 """Shutdown the instance.
2194 instance = self.instance
2195 node_current = instance.primary_node
2196 self.cfg.MarkInstanceDown(instance.name)
2197 if not rpc.call_instance_shutdown(node_current, instance):
2198 logger.Error("could not shutdown instance")
2200 _ShutdownInstanceDisks(instance, self.cfg)
2203 class LUReinstallInstance(LogicalUnit):
2204 """Reinstall an instance.
2207 HPATH = "instance-reinstall"
2208 HTYPE = constants.HTYPE_INSTANCE
2209 _OP_REQP = ["instance_name"]
2212 def ExpandNames(self):
2213 self._ExpandAndLockInstance()
2214 self.needed_locks[locking.LEVEL_NODE] = []
2215 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2217 def DeclareLocks(self, level):
2218 if level == locking.LEVEL_NODE:
2219 self._LockInstancesNodes()
2221 def BuildHooksEnv(self):
2224 This runs on master, primary and secondary nodes of the instance.
2227 env = _BuildInstanceHookEnvByObject(self.instance)
2228 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2229 list(self.instance.secondary_nodes))
2232 def CheckPrereq(self):
2233 """Check prerequisites.
2235 This checks that the instance is in the cluster and is not running.
2238 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2239 assert instance is not None, \
2240 "Cannot retrieve locked instance %s" % self.op.instance_name
2242 if instance.disk_template == constants.DT_DISKLESS:
2243 raise errors.OpPrereqError("Instance '%s' has no disks" %
2244 self.op.instance_name)
2245 if instance.status != "down":
2246 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2247 self.op.instance_name)
2248 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2250 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2251 (self.op.instance_name,
2252 instance.primary_node))
2254 self.op.os_type = getattr(self.op, "os_type", None)
2255 if self.op.os_type is not None:
2257 pnode = self.cfg.GetNodeInfo(
2258 self.cfg.ExpandNodeName(instance.primary_node))
2260 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2262 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2264 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2265 " primary node" % self.op.os_type)
2267 self.instance = instance
2269 def Exec(self, feedback_fn):
2270 """Reinstall the instance.
2273 inst = self.instance
2275 if self.op.os_type is not None:
2276 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2277 inst.os = self.op.os_type
2278 self.cfg.AddInstance(inst)
2280 _StartInstanceDisks(self.cfg, inst, None)
2282 feedback_fn("Running the instance OS create scripts...")
2283 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2284 raise errors.OpExecError("Could not install OS for instance %s"
2286 (inst.name, inst.primary_node))
2288 _ShutdownInstanceDisks(inst, self.cfg)
2291 class LURenameInstance(LogicalUnit):
2292 """Rename an instance.
2295 HPATH = "instance-rename"
2296 HTYPE = constants.HTYPE_INSTANCE
2297 _OP_REQP = ["instance_name", "new_name"]
2299 def BuildHooksEnv(self):
2302 This runs on master, primary and secondary nodes of the instance.
2305 env = _BuildInstanceHookEnvByObject(self.instance)
2306 env["INSTANCE_NEW_NAME"] = self.op.new_name
2307 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2308 list(self.instance.secondary_nodes))
2311 def CheckPrereq(self):
2312 """Check prerequisites.
2314 This checks that the instance is in the cluster and is not running.
2317 instance = self.cfg.GetInstanceInfo(
2318 self.cfg.ExpandInstanceName(self.op.instance_name))
2319 if instance is None:
2320 raise errors.OpPrereqError("Instance '%s' not known" %
2321 self.op.instance_name)
2322 if instance.status != "down":
2323 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2324 self.op.instance_name)
2325 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2327 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2328 (self.op.instance_name,
2329 instance.primary_node))
2330 self.instance = instance
2332 # new name verification
2333 name_info = utils.HostInfo(self.op.new_name)
2335 self.op.new_name = new_name = name_info.name
2336 instance_list = self.cfg.GetInstanceList()
2337 if new_name in instance_list:
2338 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2341 if not getattr(self.op, "ignore_ip", False):
2342 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2343 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2344 (name_info.ip, new_name))
2347 def Exec(self, feedback_fn):
2348 """Reinstall the instance.
2351 inst = self.instance
2352 old_name = inst.name
2354 if inst.disk_template == constants.DT_FILE:
2355 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2357 self.cfg.RenameInstance(inst.name, self.op.new_name)
2358 # Change the instance lock. This is definitely safe while we hold the BGL
2359 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2360 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2362 # re-read the instance from the configuration after rename
2363 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2365 if inst.disk_template == constants.DT_FILE:
2366 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2367 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2368 old_file_storage_dir,
2369 new_file_storage_dir)
2372 raise errors.OpExecError("Could not connect to node '%s' to rename"
2373 " directory '%s' to '%s' (but the instance"
2374 " has been renamed in Ganeti)" % (
2375 inst.primary_node, old_file_storage_dir,
2376 new_file_storage_dir))
2379 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2380 " (but the instance has been renamed in"
2381 " Ganeti)" % (old_file_storage_dir,
2382 new_file_storage_dir))
2384 _StartInstanceDisks(self.cfg, inst, None)
2386 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2388 msg = ("Could run OS rename script for instance %s on node %s (but the"
2389 " instance has been renamed in Ganeti)" %
2390 (inst.name, inst.primary_node))
2393 _ShutdownInstanceDisks(inst, self.cfg)
2396 class LURemoveInstance(LogicalUnit):
2397 """Remove an instance.
2400 HPATH = "instance-remove"
2401 HTYPE = constants.HTYPE_INSTANCE
2402 _OP_REQP = ["instance_name", "ignore_failures"]
2404 def BuildHooksEnv(self):
2407 This runs on master, primary and secondary nodes of the instance.
2410 env = _BuildInstanceHookEnvByObject(self.instance)
2411 nl = [self.sstore.GetMasterNode()]
2414 def CheckPrereq(self):
2415 """Check prerequisites.
2417 This checks that the instance is in the cluster.
2420 instance = self.cfg.GetInstanceInfo(
2421 self.cfg.ExpandInstanceName(self.op.instance_name))
2422 if instance is None:
2423 raise errors.OpPrereqError("Instance '%s' not known" %
2424 self.op.instance_name)
2425 self.instance = instance
2427 def Exec(self, feedback_fn):
2428 """Remove the instance.
2431 instance = self.instance
2432 logger.Info("shutting down instance %s on node %s" %
2433 (instance.name, instance.primary_node))
2435 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2436 if self.op.ignore_failures:
2437 feedback_fn("Warning: can't shutdown instance")
2439 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2440 (instance.name, instance.primary_node))
2442 logger.Info("removing block devices for instance %s" % instance.name)
2444 if not _RemoveDisks(instance, self.cfg):
2445 if self.op.ignore_failures:
2446 feedback_fn("Warning: can't remove instance's disks")
2448 raise errors.OpExecError("Can't remove instance's disks")
2450 logger.Info("removing instance %s out of cluster config" % instance.name)
2452 self.cfg.RemoveInstance(instance.name)
2453 # Remove the new instance from the Ganeti Lock Manager
2454 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2457 class LUQueryInstances(NoHooksLU):
2458 """Logical unit for querying instances.
2461 _OP_REQP = ["output_fields", "names"]
2463 def CheckPrereq(self):
2464 """Check prerequisites.
2466 This checks that the fields required are valid output fields.
2469 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2470 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2471 "admin_state", "admin_ram",
2472 "disk_template", "ip", "mac", "bridge",
2473 "sda_size", "sdb_size", "vcpus", "tags"],
2474 dynamic=self.dynamic_fields,
2475 selected=self.op.output_fields)
2477 self.wanted = _GetWantedInstances(self, self.op.names)
2479 def Exec(self, feedback_fn):
2480 """Computes the list of nodes and their attributes.
2483 instance_names = self.wanted
2484 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2487 # begin data gathering
2489 nodes = frozenset([inst.primary_node for inst in instance_list])
2492 if self.dynamic_fields.intersection(self.op.output_fields):
2494 node_data = rpc.call_all_instances_info(nodes)
2496 result = node_data[name]
2498 live_data.update(result)
2499 elif result == False:
2500 bad_nodes.append(name)
2501 # else no instance is alive
2503 live_data = dict([(name, {}) for name in instance_names])
2505 # end data gathering
2508 for instance in instance_list:
2510 for field in self.op.output_fields:
2515 elif field == "pnode":
2516 val = instance.primary_node
2517 elif field == "snodes":
2518 val = list(instance.secondary_nodes)
2519 elif field == "admin_state":
2520 val = (instance.status != "down")
2521 elif field == "oper_state":
2522 if instance.primary_node in bad_nodes:
2525 val = bool(live_data.get(instance.name))
2526 elif field == "status":
2527 if instance.primary_node in bad_nodes:
2528 val = "ERROR_nodedown"
2530 running = bool(live_data.get(instance.name))
2532 if instance.status != "down":
2537 if instance.status != "down":
2541 elif field == "admin_ram":
2542 val = instance.memory
2543 elif field == "oper_ram":
2544 if instance.primary_node in bad_nodes:
2546 elif instance.name in live_data:
2547 val = live_data[instance.name].get("memory", "?")
2550 elif field == "disk_template":
2551 val = instance.disk_template
2553 val = instance.nics[0].ip
2554 elif field == "bridge":
2555 val = instance.nics[0].bridge
2556 elif field == "mac":
2557 val = instance.nics[0].mac
2558 elif field == "sda_size" or field == "sdb_size":
2559 disk = instance.FindDisk(field[:3])
2564 elif field == "vcpus":
2565 val = instance.vcpus
2566 elif field == "tags":
2567 val = list(instance.GetTags())
2569 raise errors.ParameterError(field)
2576 class LUFailoverInstance(LogicalUnit):
2577 """Failover an instance.
2580 HPATH = "instance-failover"
2581 HTYPE = constants.HTYPE_INSTANCE
2582 _OP_REQP = ["instance_name", "ignore_consistency"]
2585 def ExpandNames(self):
2586 self._ExpandAndLockInstance()
2587 self.needed_locks[locking.LEVEL_NODE] = []
2588 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2590 def DeclareLocks(self, level):
2591 if level == locking.LEVEL_NODE:
2592 self._LockInstancesNodes()
2594 def BuildHooksEnv(self):
2597 This runs on master, primary and secondary nodes of the instance.
2601 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2603 env.update(_BuildInstanceHookEnvByObject(self.instance))
2604 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2607 def CheckPrereq(self):
2608 """Check prerequisites.
2610 This checks that the instance is in the cluster.
2613 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2614 assert self.instance is not None, \
2615 "Cannot retrieve locked instance %s" % self.op.instance_name
2617 if instance.disk_template not in constants.DTS_NET_MIRROR:
2618 raise errors.OpPrereqError("Instance's disk layout is not"
2619 " network mirrored, cannot failover.")
2621 secondary_nodes = instance.secondary_nodes
2622 if not secondary_nodes:
2623 raise errors.ProgrammerError("no secondary node but using "
2624 "a mirrored disk template")
2626 target_node = secondary_nodes[0]
2627 # check memory requirements on the secondary node
2628 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2629 instance.name, instance.memory)
2631 # check bridge existance
2632 brlist = [nic.bridge for nic in instance.nics]
2633 if not rpc.call_bridges_exist(target_node, brlist):
2634 raise errors.OpPrereqError("One or more target bridges %s does not"
2635 " exist on destination node '%s'" %
2636 (brlist, target_node))
2638 def Exec(self, feedback_fn):
2639 """Failover an instance.
2641 The failover is done by shutting it down on its present node and
2642 starting it on the secondary.
2645 instance = self.instance
2647 source_node = instance.primary_node
2648 target_node = instance.secondary_nodes[0]
2650 feedback_fn("* checking disk consistency between source and target")
2651 for dev in instance.disks:
2652 # for drbd, these are drbd over lvm
2653 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2654 if instance.status == "up" and not self.op.ignore_consistency:
2655 raise errors.OpExecError("Disk %s is degraded on target node,"
2656 " aborting failover." % dev.iv_name)
2658 feedback_fn("* shutting down instance on source node")
2659 logger.Info("Shutting down instance %s on node %s" %
2660 (instance.name, source_node))
2662 if not rpc.call_instance_shutdown(source_node, instance):
2663 if self.op.ignore_consistency:
2664 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2665 " anyway. Please make sure node %s is down" %
2666 (instance.name, source_node, source_node))
2668 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2669 (instance.name, source_node))
2671 feedback_fn("* deactivating the instance's disks on source node")
2672 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2673 raise errors.OpExecError("Can't shut down the instance's disks.")
2675 instance.primary_node = target_node
2676 # distribute new instance config to the other nodes
2677 self.cfg.Update(instance)
2679 # Only start the instance if it's marked as up
2680 if instance.status == "up":
2681 feedback_fn("* activating the instance's disks on target node")
2682 logger.Info("Starting instance %s on node %s" %
2683 (instance.name, target_node))
2685 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2686 ignore_secondaries=True)
2688 _ShutdownInstanceDisks(instance, self.cfg)
2689 raise errors.OpExecError("Can't activate the instance's disks")
2691 feedback_fn("* starting the instance on the target node")
2692 if not rpc.call_instance_start(target_node, instance, None):
2693 _ShutdownInstanceDisks(instance, self.cfg)
2694 raise errors.OpExecError("Could not start instance %s on node %s." %
2695 (instance.name, target_node))
2698 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2699 """Create a tree of block devices on the primary node.
2701 This always creates all devices.
2705 for child in device.children:
2706 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2709 cfg.SetDiskID(device, node)
2710 new_id = rpc.call_blockdev_create(node, device, device.size,
2711 instance.name, True, info)
2714 if device.physical_id is None:
2715 device.physical_id = new_id
2719 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2720 """Create a tree of block devices on a secondary node.
2722 If this device type has to be created on secondaries, create it and
2725 If not, just recurse to children keeping the same 'force' value.
2728 if device.CreateOnSecondary():
2731 for child in device.children:
2732 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2733 child, force, info):
2738 cfg.SetDiskID(device, node)
2739 new_id = rpc.call_blockdev_create(node, device, device.size,
2740 instance.name, False, info)
2743 if device.physical_id is None:
2744 device.physical_id = new_id
2748 def _GenerateUniqueNames(cfg, exts):
2749 """Generate a suitable LV name.
2751 This will generate a logical volume name for the given instance.
2756 new_id = cfg.GenerateUniqueID()
2757 results.append("%s%s" % (new_id, val))
2761 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2762 """Generate a drbd8 device complete with its children.
2765 port = cfg.AllocatePort()
2766 vgname = cfg.GetVGName()
2767 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2768 logical_id=(vgname, names[0]))
2769 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2770 logical_id=(vgname, names[1]))
2771 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2772 logical_id = (primary, secondary, port),
2773 children = [dev_data, dev_meta],
2778 def _GenerateDiskTemplate(cfg, template_name,
2779 instance_name, primary_node,
2780 secondary_nodes, disk_sz, swap_sz,
2781 file_storage_dir, file_driver):
2782 """Generate the entire disk layout for a given template type.
2785 #TODO: compute space requirements
2787 vgname = cfg.GetVGName()
2788 if template_name == constants.DT_DISKLESS:
2790 elif template_name == constants.DT_PLAIN:
2791 if len(secondary_nodes) != 0:
2792 raise errors.ProgrammerError("Wrong template configuration")
2794 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2795 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2796 logical_id=(vgname, names[0]),
2798 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2799 logical_id=(vgname, names[1]),
2801 disks = [sda_dev, sdb_dev]
2802 elif template_name == constants.DT_DRBD8:
2803 if len(secondary_nodes) != 1:
2804 raise errors.ProgrammerError("Wrong template configuration")
2805 remote_node = secondary_nodes[0]
2806 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2807 ".sdb_data", ".sdb_meta"])
2808 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809 disk_sz, names[0:2], "sda")
2810 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811 swap_sz, names[2:4], "sdb")
2812 disks = [drbd_sda_dev, drbd_sdb_dev]
2813 elif template_name == constants.DT_FILE:
2814 if len(secondary_nodes) != 0:
2815 raise errors.ProgrammerError("Wrong template configuration")
2817 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2818 iv_name="sda", logical_id=(file_driver,
2819 "%s/sda" % file_storage_dir))
2820 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2821 iv_name="sdb", logical_id=(file_driver,
2822 "%s/sdb" % file_storage_dir))
2823 disks = [file_sda_dev, file_sdb_dev]
2825 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2829 def _GetInstanceInfoText(instance):
2830 """Compute that text that should be added to the disk's metadata.
2833 return "originstname+%s" % instance.name
2836 def _CreateDisks(cfg, instance):
2837 """Create all disks for an instance.
2839 This abstracts away some work from AddInstance.
2842 instance: the instance object
2845 True or False showing the success of the creation process
2848 info = _GetInstanceInfoText(instance)
2850 if instance.disk_template == constants.DT_FILE:
2851 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2852 result = rpc.call_file_storage_dir_create(instance.primary_node,
2856 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2860 logger.Error("failed to create directory '%s'" % file_storage_dir)
2863 for device in instance.disks:
2864 logger.Info("creating volume %s for instance %s" %
2865 (device.iv_name, instance.name))
2867 for secondary_node in instance.secondary_nodes:
2868 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2869 device, False, info):
2870 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2871 (device.iv_name, device, secondary_node))
2874 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2875 instance, device, info):
2876 logger.Error("failed to create volume %s on primary!" %
2883 def _RemoveDisks(instance, cfg):
2884 """Remove all disks for an instance.
2886 This abstracts away some work from `AddInstance()` and
2887 `RemoveInstance()`. Note that in case some of the devices couldn't
2888 be removed, the removal will continue with the other ones (compare
2889 with `_CreateDisks()`).
2892 instance: the instance object
2895 True or False showing the success of the removal proces
2898 logger.Info("removing block devices for instance %s" % instance.name)
2901 for device in instance.disks:
2902 for node, disk in device.ComputeNodeTree(instance.primary_node):
2903 cfg.SetDiskID(disk, node)
2904 if not rpc.call_blockdev_remove(node, disk):
2905 logger.Error("could not remove block device %s on node %s,"
2906 " continuing anyway" %
2907 (device.iv_name, node))
2910 if instance.disk_template == constants.DT_FILE:
2911 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2912 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2914 logger.Error("could not remove directory '%s'" % file_storage_dir)
2920 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2921 """Compute disk size requirements in the volume group
2923 This is currently hard-coded for the two-drive layout.
2926 # Required free disk space as a function of disk and swap space
2928 constants.DT_DISKLESS: None,
2929 constants.DT_PLAIN: disk_size + swap_size,
2930 # 256 MB are added for drbd metadata, 128MB for each drbd device
2931 constants.DT_DRBD8: disk_size + swap_size + 256,
2932 constants.DT_FILE: None,
2935 if disk_template not in req_size_dict:
2936 raise errors.ProgrammerError("Disk template '%s' size requirement"
2937 " is unknown" % disk_template)
2939 return req_size_dict[disk_template]
2942 class LUCreateInstance(LogicalUnit):
2943 """Create an instance.
2946 HPATH = "instance-add"
2947 HTYPE = constants.HTYPE_INSTANCE
2948 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2949 "disk_template", "swap_size", "mode", "start", "vcpus",
2950 "wait_for_sync", "ip_check", "mac"]
2952 def _RunAllocator(self):
2953 """Run the allocator based on input opcode.
2956 disks = [{"size": self.op.disk_size, "mode": "w"},
2957 {"size": self.op.swap_size, "mode": "w"}]
2958 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2959 "bridge": self.op.bridge}]
2960 ial = IAllocator(self.cfg, self.sstore,
2961 mode=constants.IALLOCATOR_MODE_ALLOC,
2962 name=self.op.instance_name,
2963 disk_template=self.op.disk_template,
2966 vcpus=self.op.vcpus,
2967 mem_size=self.op.mem_size,
2972 ial.Run(self.op.iallocator)
2975 raise errors.OpPrereqError("Can't compute nodes using"
2976 " iallocator '%s': %s" % (self.op.iallocator,
2978 if len(ial.nodes) != ial.required_nodes:
2979 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2980 " of nodes (%s), required %s" %
2981 (len(ial.nodes), ial.required_nodes))
2982 self.op.pnode = ial.nodes[0]
2983 logger.ToStdout("Selected nodes for the instance: %s" %
2984 (", ".join(ial.nodes),))
2985 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2986 (self.op.instance_name, self.op.iallocator, ial.nodes))
2987 if ial.required_nodes == 2:
2988 self.op.snode = ial.nodes[1]
2990 def BuildHooksEnv(self):
2993 This runs on master, primary and secondary nodes of the instance.
2997 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2998 "INSTANCE_DISK_SIZE": self.op.disk_size,
2999 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3000 "INSTANCE_ADD_MODE": self.op.mode,
3002 if self.op.mode == constants.INSTANCE_IMPORT:
3003 env["INSTANCE_SRC_NODE"] = self.op.src_node
3004 env["INSTANCE_SRC_PATH"] = self.op.src_path
3005 env["INSTANCE_SRC_IMAGE"] = self.src_image
3007 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3008 primary_node=self.op.pnode,
3009 secondary_nodes=self.secondaries,
3010 status=self.instance_status,
3011 os_type=self.op.os_type,
3012 memory=self.op.mem_size,
3013 vcpus=self.op.vcpus,
3014 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3017 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3022 def CheckPrereq(self):
3023 """Check prerequisites.
3026 # set optional parameters to none if they don't exist
3027 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3028 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3029 "vnc_bind_address"]:
3030 if not hasattr(self.op, attr):
3031 setattr(self.op, attr, None)
3033 if self.op.mode not in (constants.INSTANCE_CREATE,
3034 constants.INSTANCE_IMPORT):
3035 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3038 if (not self.cfg.GetVGName() and
3039 self.op.disk_template not in constants.DTS_NOT_LVM):
3040 raise errors.OpPrereqError("Cluster does not support lvm-based"
3043 if self.op.mode == constants.INSTANCE_IMPORT:
3044 src_node = getattr(self.op, "src_node", None)
3045 src_path = getattr(self.op, "src_path", None)
3046 if src_node is None or src_path is None:
3047 raise errors.OpPrereqError("Importing an instance requires source"
3048 " node and path options")
3049 src_node_full = self.cfg.ExpandNodeName(src_node)
3050 if src_node_full is None:
3051 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3052 self.op.src_node = src_node = src_node_full
3054 if not os.path.isabs(src_path):
3055 raise errors.OpPrereqError("The source path must be absolute")
3057 export_info = rpc.call_export_info(src_node, src_path)
3060 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3062 if not export_info.has_section(constants.INISECT_EXP):
3063 raise errors.ProgrammerError("Corrupted export config")
3065 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3066 if (int(ei_version) != constants.EXPORT_VERSION):
3067 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3068 (ei_version, constants.EXPORT_VERSION))
3070 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3071 raise errors.OpPrereqError("Can't import instance with more than"
3074 # FIXME: are the old os-es, disk sizes, etc. useful?
3075 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3076 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3078 self.src_image = diskimage
3079 else: # INSTANCE_CREATE
3080 if getattr(self.op, "os_type", None) is None:
3081 raise errors.OpPrereqError("No guest OS specified")
3083 #### instance parameters check
3085 # disk template and mirror node verification
3086 if self.op.disk_template not in constants.DISK_TEMPLATES:
3087 raise errors.OpPrereqError("Invalid disk template name")
3089 # instance name verification
3090 hostname1 = utils.HostInfo(self.op.instance_name)
3092 self.op.instance_name = instance_name = hostname1.name
3093 instance_list = self.cfg.GetInstanceList()
3094 if instance_name in instance_list:
3095 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3098 # ip validity checks
3099 ip = getattr(self.op, "ip", None)
3100 if ip is None or ip.lower() == "none":
3102 elif ip.lower() == "auto":
3103 inst_ip = hostname1.ip
3105 if not utils.IsValidIP(ip):
3106 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3107 " like a valid IP" % ip)
3109 self.inst_ip = self.op.ip = inst_ip
3111 if self.op.start and not self.op.ip_check:
3112 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3113 " adding an instance in start mode")
3115 if self.op.ip_check:
3116 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3117 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3118 (hostname1.ip, instance_name))
3120 # MAC address verification
3121 if self.op.mac != "auto":
3122 if not utils.IsValidMac(self.op.mac.lower()):
3123 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3126 # bridge verification
3127 bridge = getattr(self.op, "bridge", None)
3129 self.op.bridge = self.cfg.GetDefBridge()
3131 self.op.bridge = bridge
3133 # boot order verification
3134 if self.op.hvm_boot_order is not None:
3135 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3136 raise errors.OpPrereqError("invalid boot order specified,"
3137 " must be one or more of [acdn]")
3138 # file storage checks
3139 if (self.op.file_driver and
3140 not self.op.file_driver in constants.FILE_DRIVER):
3141 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3142 self.op.file_driver)
3144 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3145 raise errors.OpPrereqError("File storage directory not a relative"
3149 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3150 raise errors.OpPrereqError("One and only one of iallocator and primary"
3151 " node must be given")
3153 if self.op.iallocator is not None:
3154 self._RunAllocator()
3156 #### node related checks
3158 # check primary node
3159 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3161 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3163 self.op.pnode = pnode.name
3165 self.secondaries = []
3167 # mirror node verification
3168 if self.op.disk_template in constants.DTS_NET_MIRROR:
3169 if getattr(self.op, "snode", None) is None:
3170 raise errors.OpPrereqError("The networked disk templates need"
3173 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3174 if snode_name is None:
3175 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3177 elif snode_name == pnode.name:
3178 raise errors.OpPrereqError("The secondary node cannot be"
3179 " the primary node.")
3180 self.secondaries.append(snode_name)
3182 req_size = _ComputeDiskSize(self.op.disk_template,
3183 self.op.disk_size, self.op.swap_size)
3185 # Check lv size requirements
3186 if req_size is not None:
3187 nodenames = [pnode.name] + self.secondaries
3188 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3189 for node in nodenames:
3190 info = nodeinfo.get(node, None)
3192 raise errors.OpPrereqError("Cannot get current information"
3193 " from node '%s'" % node)
3194 vg_free = info.get('vg_free', None)
3195 if not isinstance(vg_free, int):
3196 raise errors.OpPrereqError("Can't compute free disk space on"
3198 if req_size > info['vg_free']:
3199 raise errors.OpPrereqError("Not enough disk space on target node %s."
3200 " %d MB available, %d MB required" %
3201 (node, info['vg_free'], req_size))
3204 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3206 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3207 " primary node" % self.op.os_type)
3209 if self.op.kernel_path == constants.VALUE_NONE:
3210 raise errors.OpPrereqError("Can't set instance kernel to none")
3213 # bridge check on primary node
3214 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3215 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3216 " destination node '%s'" %
3217 (self.op.bridge, pnode.name))
3219 # memory check on primary node
3221 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3222 "creating instance %s" % self.op.instance_name,
3225 # hvm_cdrom_image_path verification
3226 if self.op.hvm_cdrom_image_path is not None:
3227 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3228 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3229 " be an absolute path or None, not %s" %
3230 self.op.hvm_cdrom_image_path)
3231 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3232 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3233 " regular file or a symlink pointing to"
3234 " an existing regular file, not %s" %
3235 self.op.hvm_cdrom_image_path)
3237 # vnc_bind_address verification
3238 if self.op.vnc_bind_address is not None:
3239 if not utils.IsValidIP(self.op.vnc_bind_address):
3240 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3241 " like a valid IP address" %
3242 self.op.vnc_bind_address)
3245 self.instance_status = 'up'
3247 self.instance_status = 'down'
3249 def Exec(self, feedback_fn):
3250 """Create and add the instance to the cluster.
3253 instance = self.op.instance_name
3254 pnode_name = self.pnode.name
3256 if self.op.mac == "auto":
3257 mac_address = self.cfg.GenerateMAC()
3259 mac_address = self.op.mac
3261 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3262 if self.inst_ip is not None:
3263 nic.ip = self.inst_ip
3265 ht_kind = self.sstore.GetHypervisorType()
3266 if ht_kind in constants.HTS_REQ_PORT:
3267 network_port = self.cfg.AllocatePort()
3271 if self.op.vnc_bind_address is None:
3272 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3274 # this is needed because os.path.join does not accept None arguments
3275 if self.op.file_storage_dir is None:
3276 string_file_storage_dir = ""
3278 string_file_storage_dir = self.op.file_storage_dir
3280 # build the full file storage dir path
3281 file_storage_dir = os.path.normpath(os.path.join(
3282 self.sstore.GetFileStorageDir(),
3283 string_file_storage_dir, instance))
3286 disks = _GenerateDiskTemplate(self.cfg,
3287 self.op.disk_template,
3288 instance, pnode_name,
3289 self.secondaries, self.op.disk_size,
3292 self.op.file_driver)
3294 iobj = objects.Instance(name=instance, os=self.op.os_type,
3295 primary_node=pnode_name,
3296 memory=self.op.mem_size,
3297 vcpus=self.op.vcpus,
3298 nics=[nic], disks=disks,
3299 disk_template=self.op.disk_template,
3300 status=self.instance_status,
3301 network_port=network_port,
3302 kernel_path=self.op.kernel_path,
3303 initrd_path=self.op.initrd_path,
3304 hvm_boot_order=self.op.hvm_boot_order,
3305 hvm_acpi=self.op.hvm_acpi,
3306 hvm_pae=self.op.hvm_pae,
3307 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3308 vnc_bind_address=self.op.vnc_bind_address,
3311 feedback_fn("* creating instance disks...")
3312 if not _CreateDisks(self.cfg, iobj):
3313 _RemoveDisks(iobj, self.cfg)
3314 raise errors.OpExecError("Device creation failed, reverting...")
3316 feedback_fn("adding instance %s to cluster config" % instance)
3318 self.cfg.AddInstance(iobj)
3319 # Add the new instance to the Ganeti Lock Manager
3320 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3322 if self.op.wait_for_sync:
3323 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3324 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3325 # make sure the disks are not degraded (still sync-ing is ok)
3327 feedback_fn("* checking mirrors status")
3328 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3333 _RemoveDisks(iobj, self.cfg)
3334 self.cfg.RemoveInstance(iobj.name)
3335 # Remove the new instance from the Ganeti Lock Manager
3336 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3337 raise errors.OpExecError("There are some degraded disks for"
3340 feedback_fn("creating os for instance %s on node %s" %
3341 (instance, pnode_name))
3343 if iobj.disk_template != constants.DT_DISKLESS:
3344 if self.op.mode == constants.INSTANCE_CREATE:
3345 feedback_fn("* running the instance OS create scripts...")
3346 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3347 raise errors.OpExecError("could not add os for instance %s"
3349 (instance, pnode_name))
3351 elif self.op.mode == constants.INSTANCE_IMPORT:
3352 feedback_fn("* running the instance OS import scripts...")
3353 src_node = self.op.src_node
3354 src_image = self.src_image
3355 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3356 src_node, src_image):
3357 raise errors.OpExecError("Could not import os for instance"
3359 (instance, pnode_name))
3361 # also checked in the prereq part
3362 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3366 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3367 feedback_fn("* starting instance...")
3368 if not rpc.call_instance_start(pnode_name, iobj, None):
3369 raise errors.OpExecError("Could not start instance")
3372 class LUConnectConsole(NoHooksLU):
3373 """Connect to an instance's console.
3375 This is somewhat special in that it returns the command line that
3376 you need to run on the master node in order to connect to the
3380 _OP_REQP = ["instance_name"]
3383 def ExpandNames(self):
3384 self._ExpandAndLockInstance()
3386 def CheckPrereq(self):
3387 """Check prerequisites.
3389 This checks that the instance is in the cluster.
3392 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3393 assert self.instance is not None, \
3394 "Cannot retrieve locked instance %s" % self.op.instance_name
3396 def Exec(self, feedback_fn):
3397 """Connect to the console of an instance
3400 instance = self.instance
3401 node = instance.primary_node
3403 node_insts = rpc.call_instance_list([node])[node]
3404 if node_insts is False:
3405 raise errors.OpExecError("Can't connect to node %s." % node)
3407 if instance.name not in node_insts:
3408 raise errors.OpExecError("Instance %s is not running." % instance.name)
3410 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3412 hyper = hypervisor.GetHypervisor()
3413 console_cmd = hyper.GetShellCommandForConsole(instance)
3416 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3419 class LUReplaceDisks(LogicalUnit):
3420 """Replace the disks of an instance.
3423 HPATH = "mirrors-replace"
3424 HTYPE = constants.HTYPE_INSTANCE
3425 _OP_REQP = ["instance_name", "mode", "disks"]
3427 def _RunAllocator(self):
3428 """Compute a new secondary node using an IAllocator.
3431 ial = IAllocator(self.cfg, self.sstore,
3432 mode=constants.IALLOCATOR_MODE_RELOC,
3433 name=self.op.instance_name,
3434 relocate_from=[self.sec_node])
3436 ial.Run(self.op.iallocator)
3439 raise errors.OpPrereqError("Can't compute nodes using"
3440 " iallocator '%s': %s" % (self.op.iallocator,
3442 if len(ial.nodes) != ial.required_nodes:
3443 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3444 " of nodes (%s), required %s" %
3445 (len(ial.nodes), ial.required_nodes))
3446 self.op.remote_node = ial.nodes[0]
3447 logger.ToStdout("Selected new secondary for the instance: %s" %
3448 self.op.remote_node)
3450 def BuildHooksEnv(self):
3453 This runs on the master, the primary and all the secondaries.
3457 "MODE": self.op.mode,
3458 "NEW_SECONDARY": self.op.remote_node,
3459 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3461 env.update(_BuildInstanceHookEnvByObject(self.instance))
3463 self.sstore.GetMasterNode(),
3464 self.instance.primary_node,
3466 if self.op.remote_node is not None:
3467 nl.append(self.op.remote_node)
3470 def CheckPrereq(self):
3471 """Check prerequisites.
3473 This checks that the instance is in the cluster.
3476 if not hasattr(self.op, "remote_node"):
3477 self.op.remote_node = None
3479 instance = self.cfg.GetInstanceInfo(
3480 self.cfg.ExpandInstanceName(self.op.instance_name))
3481 if instance is None:
3482 raise errors.OpPrereqError("Instance '%s' not known" %
3483 self.op.instance_name)
3484 self.instance = instance
3485 self.op.instance_name = instance.name
3487 if instance.disk_template not in constants.DTS_NET_MIRROR:
3488 raise errors.OpPrereqError("Instance's disk layout is not"
3489 " network mirrored.")
3491 if len(instance.secondary_nodes) != 1:
3492 raise errors.OpPrereqError("The instance has a strange layout,"
3493 " expected one secondary but found %d" %
3494 len(instance.secondary_nodes))
3496 self.sec_node = instance.secondary_nodes[0]
3498 ia_name = getattr(self.op, "iallocator", None)
3499 if ia_name is not None:
3500 if self.op.remote_node is not None:
3501 raise errors.OpPrereqError("Give either the iallocator or the new"
3502 " secondary, not both")
3503 self.op.remote_node = self._RunAllocator()
3505 remote_node = self.op.remote_node
3506 if remote_node is not None:
3507 remote_node = self.cfg.ExpandNodeName(remote_node)
3508 if remote_node is None:
3509 raise errors.OpPrereqError("Node '%s' not known" %
3510 self.op.remote_node)
3511 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3513 self.remote_node_info = None
3514 if remote_node == instance.primary_node:
3515 raise errors.OpPrereqError("The specified node is the primary node of"
3517 elif remote_node == self.sec_node:
3518 if self.op.mode == constants.REPLACE_DISK_SEC:
3519 # this is for DRBD8, where we can't execute the same mode of
3520 # replacement as for drbd7 (no different port allocated)
3521 raise errors.OpPrereqError("Same secondary given, cannot execute"
3523 if instance.disk_template == constants.DT_DRBD8:
3524 if (self.op.mode == constants.REPLACE_DISK_ALL and
3525 remote_node is not None):
3526 # switch to replace secondary mode
3527 self.op.mode = constants.REPLACE_DISK_SEC
3529 if self.op.mode == constants.REPLACE_DISK_ALL:
3530 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3531 " secondary disk replacement, not"
3533 elif self.op.mode == constants.REPLACE_DISK_PRI:
3534 if remote_node is not None:
3535 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3536 " the secondary while doing a primary"
3537 " node disk replacement")
3538 self.tgt_node = instance.primary_node
3539 self.oth_node = instance.secondary_nodes[0]
3540 elif self.op.mode == constants.REPLACE_DISK_SEC:
3541 self.new_node = remote_node # this can be None, in which case
3542 # we don't change the secondary
3543 self.tgt_node = instance.secondary_nodes[0]
3544 self.oth_node = instance.primary_node
3546 raise errors.ProgrammerError("Unhandled disk replace mode")
3548 for name in self.op.disks:
3549 if instance.FindDisk(name) is None:
3550 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3551 (name, instance.name))
3552 self.op.remote_node = remote_node
3554 def _ExecD8DiskOnly(self, feedback_fn):
3555 """Replace a disk on the primary or secondary for dbrd8.
3557 The algorithm for replace is quite complicated:
3558 - for each disk to be replaced:
3559 - create new LVs on the target node with unique names
3560 - detach old LVs from the drbd device
3561 - rename old LVs to name_replaced.<time_t>
3562 - rename new LVs to old LVs
3563 - attach the new LVs (with the old names now) to the drbd device
3564 - wait for sync across all devices
3565 - for each modified disk:
3566 - remove old LVs (which have the name name_replaces.<time_t>)
3568 Failures are not very well handled.
3572 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3573 instance = self.instance
3575 vgname = self.cfg.GetVGName()
3578 tgt_node = self.tgt_node
3579 oth_node = self.oth_node
3581 # Step: check device activation
3582 self.proc.LogStep(1, steps_total, "check device existence")
3583 info("checking volume groups")
3584 my_vg = cfg.GetVGName()
3585 results = rpc.call_vg_list([oth_node, tgt_node])
3587 raise errors.OpExecError("Can't list volume groups on the nodes")
3588 for node in oth_node, tgt_node:
3589 res = results.get(node, False)
3590 if not res or my_vg not in res:
3591 raise errors.OpExecError("Volume group '%s' not found on %s" %
3593 for dev in instance.disks:
3594 if not dev.iv_name in self.op.disks:
3596 for node in tgt_node, oth_node:
3597 info("checking %s on %s" % (dev.iv_name, node))
3598 cfg.SetDiskID(dev, node)
3599 if not rpc.call_blockdev_find(node, dev):
3600 raise errors.OpExecError("Can't find device %s on node %s" %
3601 (dev.iv_name, node))
3603 # Step: check other node consistency
3604 self.proc.LogStep(2, steps_total, "check peer consistency")
3605 for dev in instance.disks:
3606 if not dev.iv_name in self.op.disks:
3608 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3609 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3610 oth_node==instance.primary_node):
3611 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3612 " to replace disks on this node (%s)" %
3613 (oth_node, tgt_node))
3615 # Step: create new storage
3616 self.proc.LogStep(3, steps_total, "allocate new storage")
3617 for dev in instance.disks:
3618 if not dev.iv_name in self.op.disks:
3621 cfg.SetDiskID(dev, tgt_node)
3622 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3623 names = _GenerateUniqueNames(cfg, lv_names)
3624 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3625 logical_id=(vgname, names[0]))
3626 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3627 logical_id=(vgname, names[1]))
3628 new_lvs = [lv_data, lv_meta]
3629 old_lvs = dev.children
3630 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3631 info("creating new local storage on %s for %s" %
3632 (tgt_node, dev.iv_name))
3633 # since we *always* want to create this LV, we use the
3634 # _Create...OnPrimary (which forces the creation), even if we
3635 # are talking about the secondary node
3636 for new_lv in new_lvs:
3637 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3638 _GetInstanceInfoText(instance)):
3639 raise errors.OpExecError("Failed to create new LV named '%s' on"
3641 (new_lv.logical_id[1], tgt_node))
3643 # Step: for each lv, detach+rename*2+attach
3644 self.proc.LogStep(4, steps_total, "change drbd configuration")
3645 for dev, old_lvs, new_lvs in iv_names.itervalues():
3646 info("detaching %s drbd from local storage" % dev.iv_name)
3647 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3648 raise errors.OpExecError("Can't detach drbd from local storage on node"
3649 " %s for device %s" % (tgt_node, dev.iv_name))
3651 #cfg.Update(instance)
3653 # ok, we created the new LVs, so now we know we have the needed
3654 # storage; as such, we proceed on the target node to rename
3655 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3656 # using the assumption that logical_id == physical_id (which in
3657 # turn is the unique_id on that node)
3659 # FIXME(iustin): use a better name for the replaced LVs
3660 temp_suffix = int(time.time())
3661 ren_fn = lambda d, suff: (d.physical_id[0],
3662 d.physical_id[1] + "_replaced-%s" % suff)
3663 # build the rename list based on what LVs exist on the node
3665 for to_ren in old_lvs:
3666 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3667 if find_res is not None: # device exists
3668 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3670 info("renaming the old LVs on the target node")
3671 if not rpc.call_blockdev_rename(tgt_node, rlist):
3672 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3673 # now we rename the new LVs to the old LVs
3674 info("renaming the new LVs on the target node")
3675 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3676 if not rpc.call_blockdev_rename(tgt_node, rlist):
3677 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3679 for old, new in zip(old_lvs, new_lvs):
3680 new.logical_id = old.logical_id
3681 cfg.SetDiskID(new, tgt_node)
3683 for disk in old_lvs:
3684 disk.logical_id = ren_fn(disk, temp_suffix)
3685 cfg.SetDiskID(disk, tgt_node)
3687 # now that the new lvs have the old name, we can add them to the device
3688 info("adding new mirror component on %s" % tgt_node)
3689 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3690 for new_lv in new_lvs:
3691 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3692 warning("Can't rollback device %s", hint="manually cleanup unused"
3694 raise errors.OpExecError("Can't add local storage to drbd")
3696 dev.children = new_lvs
3697 cfg.Update(instance)
3699 # Step: wait for sync
3701 # this can fail as the old devices are degraded and _WaitForSync
3702 # does a combined result over all disks, so we don't check its
3704 self.proc.LogStep(5, steps_total, "sync devices")
3705 _WaitForSync(cfg, instance, self.proc, unlock=True)
3707 # so check manually all the devices
3708 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3709 cfg.SetDiskID(dev, instance.primary_node)
3710 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3712 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3714 # Step: remove old storage
3715 self.proc.LogStep(6, steps_total, "removing old storage")
3716 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3717 info("remove logical volumes for %s" % name)
3719 cfg.SetDiskID(lv, tgt_node)
3720 if not rpc.call_blockdev_remove(tgt_node, lv):
3721 warning("Can't remove old LV", hint="manually remove unused LVs")
3724 def _ExecD8Secondary(self, feedback_fn):
3725 """Replace the secondary node for drbd8.
3727 The algorithm for replace is quite complicated:
3728 - for all disks of the instance:
3729 - create new LVs on the new node with same names
3730 - shutdown the drbd device on the old secondary
3731 - disconnect the drbd network on the primary
3732 - create the drbd device on the new secondary
3733 - network attach the drbd on the primary, using an artifice:
3734 the drbd code for Attach() will connect to the network if it
3735 finds a device which is connected to the good local disks but
3737 - wait for sync across all devices
3738 - remove all disks from the old secondary
3740 Failures are not very well handled.
3744 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3745 instance = self.instance
3747 vgname = self.cfg.GetVGName()
3750 old_node = self.tgt_node
3751 new_node = self.new_node
3752 pri_node = instance.primary_node
3754 # Step: check device activation
3755 self.proc.LogStep(1, steps_total, "check device existence")
3756 info("checking volume groups")
3757 my_vg = cfg.GetVGName()
3758 results = rpc.call_vg_list([pri_node, new_node])
3760 raise errors.OpExecError("Can't list volume groups on the nodes")
3761 for node in pri_node, new_node:
3762 res = results.get(node, False)
3763 if not res or my_vg not in res:
3764 raise errors.OpExecError("Volume group '%s' not found on %s" %
3766 for dev in instance.disks:
3767 if not dev.iv_name in self.op.disks:
3769 info("checking %s on %s" % (dev.iv_name, pri_node))
3770 cfg.SetDiskID(dev, pri_node)
3771 if not rpc.call_blockdev_find(pri_node, dev):
3772 raise errors.OpExecError("Can't find device %s on node %s" %
3773 (dev.iv_name, pri_node))
3775 # Step: check other node consistency
3776 self.proc.LogStep(2, steps_total, "check peer consistency")
3777 for dev in instance.disks:
3778 if not dev.iv_name in self.op.disks:
3780 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3781 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3782 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3783 " unsafe to replace the secondary" %
3786 # Step: create new storage
3787 self.proc.LogStep(3, steps_total, "allocate new storage")
3788 for dev in instance.disks:
3790 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3791 # since we *always* want to create this LV, we use the
3792 # _Create...OnPrimary (which forces the creation), even if we
3793 # are talking about the secondary node
3794 for new_lv in dev.children:
3795 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3796 _GetInstanceInfoText(instance)):
3797 raise errors.OpExecError("Failed to create new LV named '%s' on"
3799 (new_lv.logical_id[1], new_node))
3801 iv_names[dev.iv_name] = (dev, dev.children)
3803 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3804 for dev in instance.disks:
3806 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3807 # create new devices on new_node
3808 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3809 logical_id=(pri_node, new_node,
3811 children=dev.children)
3812 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3814 _GetInstanceInfoText(instance)):
3815 raise errors.OpExecError("Failed to create new DRBD on"
3816 " node '%s'" % new_node)
3818 for dev in instance.disks:
3819 # we have new devices, shutdown the drbd on the old secondary
3820 info("shutting down drbd for %s on old node" % dev.iv_name)
3821 cfg.SetDiskID(dev, old_node)
3822 if not rpc.call_blockdev_shutdown(old_node, dev):
3823 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3824 hint="Please cleanup this device manually as soon as possible")
3826 info("detaching primary drbds from the network (=> standalone)")
3828 for dev in instance.disks:
3829 cfg.SetDiskID(dev, pri_node)
3830 # set the physical (unique in bdev terms) id to None, meaning
3831 # detach from network
3832 dev.physical_id = (None,) * len(dev.physical_id)
3833 # and 'find' the device, which will 'fix' it to match the
3835 if rpc.call_blockdev_find(pri_node, dev):
3838 warning("Failed to detach drbd %s from network, unusual case" %
3842 # no detaches succeeded (very unlikely)
3843 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3845 # if we managed to detach at least one, we update all the disks of
3846 # the instance to point to the new secondary
3847 info("updating instance configuration")
3848 for dev in instance.disks:
3849 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3850 cfg.SetDiskID(dev, pri_node)
3851 cfg.Update(instance)
3853 # and now perform the drbd attach
3854 info("attaching primary drbds to new secondary (standalone => connected)")
3856 for dev in instance.disks:
3857 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3858 # since the attach is smart, it's enough to 'find' the device,
3859 # it will automatically activate the network, if the physical_id
3861 cfg.SetDiskID(dev, pri_node)
3862 if not rpc.call_blockdev_find(pri_node, dev):
3863 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3864 "please do a gnt-instance info to see the status of disks")
3866 # this can fail as the old devices are degraded and _WaitForSync
3867 # does a combined result over all disks, so we don't check its
3869 self.proc.LogStep(5, steps_total, "sync devices")
3870 _WaitForSync(cfg, instance, self.proc, unlock=True)
3872 # so check manually all the devices
3873 for name, (dev, old_lvs) in iv_names.iteritems():
3874 cfg.SetDiskID(dev, pri_node)
3875 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3877 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3879 self.proc.LogStep(6, steps_total, "removing old storage")
3880 for name, (dev, old_lvs) in iv_names.iteritems():
3881 info("remove logical volumes for %s" % name)
3883 cfg.SetDiskID(lv, old_node)
3884 if not rpc.call_blockdev_remove(old_node, lv):
3885 warning("Can't remove LV on old secondary",
3886 hint="Cleanup stale volumes by hand")
3888 def Exec(self, feedback_fn):
3889 """Execute disk replacement.
3891 This dispatches the disk replacement to the appropriate handler.
3894 instance = self.instance
3896 # Activate the instance disks if we're replacing them on a down instance
3897 if instance.status == "down":
3898 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3899 self.proc.ChainOpCode(op)
3901 if instance.disk_template == constants.DT_DRBD8:
3902 if self.op.remote_node is None:
3903 fn = self._ExecD8DiskOnly
3905 fn = self._ExecD8Secondary
3907 raise errors.ProgrammerError("Unhandled disk replacement case")
3909 ret = fn(feedback_fn)
3911 # Deactivate the instance disks if we're replacing them on a down instance
3912 if instance.status == "down":
3913 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3914 self.proc.ChainOpCode(op)
3919 class LUGrowDisk(LogicalUnit):
3920 """Grow a disk of an instance.
3924 HTYPE = constants.HTYPE_INSTANCE
3925 _OP_REQP = ["instance_name", "disk", "amount"]
3927 def BuildHooksEnv(self):
3930 This runs on the master, the primary and all the secondaries.
3934 "DISK": self.op.disk,
3935 "AMOUNT": self.op.amount,
3937 env.update(_BuildInstanceHookEnvByObject(self.instance))
3939 self.sstore.GetMasterNode(),
3940 self.instance.primary_node,
3944 def CheckPrereq(self):
3945 """Check prerequisites.
3947 This checks that the instance is in the cluster.
3950 instance = self.cfg.GetInstanceInfo(
3951 self.cfg.ExpandInstanceName(self.op.instance_name))
3952 if instance is None:
3953 raise errors.OpPrereqError("Instance '%s' not known" %
3954 self.op.instance_name)
3955 self.instance = instance
3956 self.op.instance_name = instance.name
3958 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3959 raise errors.OpPrereqError("Instance's disk layout does not support"
3962 if instance.FindDisk(self.op.disk) is None:
3963 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3964 (self.op.disk, instance.name))
3966 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3967 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3968 for node in nodenames:
3969 info = nodeinfo.get(node, None)
3971 raise errors.OpPrereqError("Cannot get current information"
3972 " from node '%s'" % node)
3973 vg_free = info.get('vg_free', None)
3974 if not isinstance(vg_free, int):
3975 raise errors.OpPrereqError("Can't compute free disk space on"
3977 if self.op.amount > info['vg_free']:
3978 raise errors.OpPrereqError("Not enough disk space on target node %s:"
3979 " %d MiB available, %d MiB required" %
3980 (node, info['vg_free'], self.op.amount))
3982 def Exec(self, feedback_fn):
3983 """Execute disk grow.
3986 instance = self.instance
3987 disk = instance.FindDisk(self.op.disk)
3988 for node in (instance.secondary_nodes + (instance.primary_node,)):
3989 self.cfg.SetDiskID(disk, node)
3990 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3991 if not result or not isinstance(result, tuple) or len(result) != 2:
3992 raise errors.OpExecError("grow request failed to node %s" % node)
3994 raise errors.OpExecError("grow request failed to node %s: %s" %
3996 disk.RecordGrow(self.op.amount)
3997 self.cfg.Update(instance)
4001 class LUQueryInstanceData(NoHooksLU):
4002 """Query runtime instance data.
4005 _OP_REQP = ["instances"]
4007 def CheckPrereq(self):
4008 """Check prerequisites.
4010 This only checks the optional instance list against the existing names.
4013 if not isinstance(self.op.instances, list):
4014 raise errors.OpPrereqError("Invalid argument type 'instances'")
4015 if self.op.instances:
4016 self.wanted_instances = []
4017 names = self.op.instances
4019 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4020 if instance is None:
4021 raise errors.OpPrereqError("No such instance name '%s'" % name)
4022 self.wanted_instances.append(instance)
4024 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4025 in self.cfg.GetInstanceList()]
4029 def _ComputeDiskStatus(self, instance, snode, dev):
4030 """Compute block device status.
4033 self.cfg.SetDiskID(dev, instance.primary_node)
4034 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4035 if dev.dev_type in constants.LDS_DRBD:
4036 # we change the snode then (otherwise we use the one passed in)
4037 if dev.logical_id[0] == instance.primary_node:
4038 snode = dev.logical_id[1]
4040 snode = dev.logical_id[0]
4043 self.cfg.SetDiskID(dev, snode)
4044 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4049 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4050 for child in dev.children]
4055 "iv_name": dev.iv_name,
4056 "dev_type": dev.dev_type,
4057 "logical_id": dev.logical_id,
4058 "physical_id": dev.physical_id,
4059 "pstatus": dev_pstatus,
4060 "sstatus": dev_sstatus,
4061 "children": dev_children,
4066 def Exec(self, feedback_fn):
4067 """Gather and return data"""
4069 for instance in self.wanted_instances:
4070 remote_info = rpc.call_instance_info(instance.primary_node,
4072 if remote_info and "state" in remote_info:
4075 remote_state = "down"
4076 if instance.status == "down":
4077 config_state = "down"
4081 disks = [self._ComputeDiskStatus(instance, None, device)
4082 for device in instance.disks]
4085 "name": instance.name,
4086 "config_state": config_state,
4087 "run_state": remote_state,
4088 "pnode": instance.primary_node,
4089 "snodes": instance.secondary_nodes,
4091 "memory": instance.memory,
4092 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4094 "vcpus": instance.vcpus,
4097 htkind = self.sstore.GetHypervisorType()
4098 if htkind == constants.HT_XEN_PVM30:
4099 idict["kernel_path"] = instance.kernel_path
4100 idict["initrd_path"] = instance.initrd_path
4102 if htkind == constants.HT_XEN_HVM31:
4103 idict["hvm_boot_order"] = instance.hvm_boot_order
4104 idict["hvm_acpi"] = instance.hvm_acpi
4105 idict["hvm_pae"] = instance.hvm_pae
4106 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4108 if htkind in constants.HTS_REQ_PORT:
4109 idict["vnc_bind_address"] = instance.vnc_bind_address
4110 idict["network_port"] = instance.network_port
4112 result[instance.name] = idict
4117 class LUSetInstanceParams(LogicalUnit):
4118 """Modifies an instances's parameters.
4121 HPATH = "instance-modify"
4122 HTYPE = constants.HTYPE_INSTANCE
4123 _OP_REQP = ["instance_name"]
4126 def ExpandNames(self):
4127 self._ExpandAndLockInstance()
4129 def BuildHooksEnv(self):
4132 This runs on the master, primary and secondaries.
4137 args['memory'] = self.mem
4139 args['vcpus'] = self.vcpus
4140 if self.do_ip or self.do_bridge or self.mac:
4144 ip = self.instance.nics[0].ip
4146 bridge = self.bridge
4148 bridge = self.instance.nics[0].bridge
4152 mac = self.instance.nics[0].mac
4153 args['nics'] = [(ip, bridge, mac)]
4154 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4155 nl = [self.sstore.GetMasterNode(),
4156 self.instance.primary_node] + list(self.instance.secondary_nodes)
4159 def CheckPrereq(self):
4160 """Check prerequisites.
4162 This only checks the instance list against the existing names.
4165 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4166 # a separate CheckArguments function, if we implement one, so the operation
4167 # can be aborted without waiting for any lock, should it have an error...
4168 self.mem = getattr(self.op, "mem", None)
4169 self.vcpus = getattr(self.op, "vcpus", None)
4170 self.ip = getattr(self.op, "ip", None)
4171 self.mac = getattr(self.op, "mac", None)
4172 self.bridge = getattr(self.op, "bridge", None)
4173 self.kernel_path = getattr(self.op, "kernel_path", None)
4174 self.initrd_path = getattr(self.op, "initrd_path", None)
4175 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4176 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4177 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4178 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4179 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4180 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4181 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4182 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4183 self.vnc_bind_address]
4184 if all_parms.count(None) == len(all_parms):
4185 raise errors.OpPrereqError("No changes submitted")
4186 if self.mem is not None:
4188 self.mem = int(self.mem)
4189 except ValueError, err:
4190 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4191 if self.vcpus is not None:
4193 self.vcpus = int(self.vcpus)
4194 except ValueError, err:
4195 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4196 if self.ip is not None:
4198 if self.ip.lower() == "none":
4201 if not utils.IsValidIP(self.ip):
4202 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4205 self.do_bridge = (self.bridge is not None)
4206 if self.mac is not None:
4207 if self.cfg.IsMacInUse(self.mac):
4208 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4210 if not utils.IsValidMac(self.mac):
4211 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4213 if self.kernel_path is not None:
4214 self.do_kernel_path = True
4215 if self.kernel_path == constants.VALUE_NONE:
4216 raise errors.OpPrereqError("Can't set instance to no kernel")
4218 if self.kernel_path != constants.VALUE_DEFAULT:
4219 if not os.path.isabs(self.kernel_path):
4220 raise errors.OpPrereqError("The kernel path must be an absolute"
4223 self.do_kernel_path = False
4225 if self.initrd_path is not None:
4226 self.do_initrd_path = True
4227 if self.initrd_path not in (constants.VALUE_NONE,
4228 constants.VALUE_DEFAULT):
4229 if not os.path.isabs(self.initrd_path):
4230 raise errors.OpPrereqError("The initrd path must be an absolute"
4233 self.do_initrd_path = False
4235 # boot order verification
4236 if self.hvm_boot_order is not None:
4237 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4238 if len(self.hvm_boot_order.strip("acdn")) != 0:
4239 raise errors.OpPrereqError("invalid boot order specified,"
4240 " must be one or more of [acdn]"
4243 # hvm_cdrom_image_path verification
4244 if self.op.hvm_cdrom_image_path is not None:
4245 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4246 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4247 " be an absolute path or None, not %s" %
4248 self.op.hvm_cdrom_image_path)
4249 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4250 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4251 " regular file or a symlink pointing to"
4252 " an existing regular file, not %s" %
4253 self.op.hvm_cdrom_image_path)
4255 # vnc_bind_address verification
4256 if self.op.vnc_bind_address is not None:
4257 if not utils.IsValidIP(self.op.vnc_bind_address):
4258 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4259 " like a valid IP address" %
4260 self.op.vnc_bind_address)
4262 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4263 assert self.instance is not None, \
4264 "Cannot retrieve locked instance %s" % self.op.instance_name
4267 def Exec(self, feedback_fn):
4268 """Modifies an instance.
4270 All parameters take effect only at the next restart of the instance.
4273 instance = self.instance
4275 instance.memory = self.mem
4276 result.append(("mem", self.mem))
4278 instance.vcpus = self.vcpus
4279 result.append(("vcpus", self.vcpus))
4281 instance.nics[0].ip = self.ip
4282 result.append(("ip", self.ip))
4284 instance.nics[0].bridge = self.bridge
4285 result.append(("bridge", self.bridge))
4287 instance.nics[0].mac = self.mac
4288 result.append(("mac", self.mac))
4289 if self.do_kernel_path:
4290 instance.kernel_path = self.kernel_path
4291 result.append(("kernel_path", self.kernel_path))
4292 if self.do_initrd_path:
4293 instance.initrd_path = self.initrd_path
4294 result.append(("initrd_path", self.initrd_path))
4295 if self.hvm_boot_order:
4296 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4297 instance.hvm_boot_order = None
4299 instance.hvm_boot_order = self.hvm_boot_order
4300 result.append(("hvm_boot_order", self.hvm_boot_order))
4302 instance.hvm_acpi = self.hvm_acpi
4303 result.append(("hvm_acpi", self.hvm_acpi))
4305 instance.hvm_pae = self.hvm_pae
4306 result.append(("hvm_pae", self.hvm_pae))
4307 if self.hvm_cdrom_image_path:
4308 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4309 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4310 if self.vnc_bind_address:
4311 instance.vnc_bind_address = self.vnc_bind_address
4312 result.append(("vnc_bind_address", self.vnc_bind_address))
4314 self.cfg.Update(instance)
4319 class LUQueryExports(NoHooksLU):
4320 """Query the exports list
4325 def CheckPrereq(self):
4326 """Check that the nodelist contains only existing nodes.
4329 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4331 def Exec(self, feedback_fn):
4332 """Compute the list of all the exported system images.
4335 a dictionary with the structure node->(export-list)
4336 where export-list is a list of the instances exported on
4340 return rpc.call_export_list(self.nodes)
4343 class LUExportInstance(LogicalUnit):
4344 """Export an instance to an image in the cluster.
4347 HPATH = "instance-export"
4348 HTYPE = constants.HTYPE_INSTANCE
4349 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4351 def BuildHooksEnv(self):
4354 This will run on the master, primary node and target node.
4358 "EXPORT_NODE": self.op.target_node,
4359 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4361 env.update(_BuildInstanceHookEnvByObject(self.instance))
4362 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4363 self.op.target_node]
4366 def CheckPrereq(self):
4367 """Check prerequisites.
4369 This checks that the instance and node names are valid.
4372 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4373 self.instance = self.cfg.GetInstanceInfo(instance_name)
4374 if self.instance is None:
4375 raise errors.OpPrereqError("Instance '%s' not found" %
4376 self.op.instance_name)
4379 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4380 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4382 if self.dst_node is None:
4383 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4384 self.op.target_node)
4385 self.op.target_node = self.dst_node.name
4387 # instance disk type verification
4388 for disk in self.instance.disks:
4389 if disk.dev_type == constants.LD_FILE:
4390 raise errors.OpPrereqError("Export not supported for instances with"
4391 " file-based disks")
4393 def Exec(self, feedback_fn):
4394 """Export an instance to an image in the cluster.
4397 instance = self.instance
4398 dst_node = self.dst_node
4399 src_node = instance.primary_node
4400 if self.op.shutdown:
4401 # shutdown the instance, but not the disks
4402 if not rpc.call_instance_shutdown(src_node, instance):
4403 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4404 (instance.name, src_node))
4406 vgname = self.cfg.GetVGName()
4411 for disk in instance.disks:
4412 if disk.iv_name == "sda":
4413 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4414 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4416 if not new_dev_name:
4417 logger.Error("could not snapshot block device %s on node %s" %
4418 (disk.logical_id[1], src_node))
4420 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4421 logical_id=(vgname, new_dev_name),
4422 physical_id=(vgname, new_dev_name),
4423 iv_name=disk.iv_name)
4424 snap_disks.append(new_dev)
4427 if self.op.shutdown and instance.status == "up":
4428 if not rpc.call_instance_start(src_node, instance, None):
4429 _ShutdownInstanceDisks(instance, self.cfg)
4430 raise errors.OpExecError("Could not start instance")
4432 # TODO: check for size
4434 for dev in snap_disks:
4435 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4436 logger.Error("could not export block device %s from node %s to node %s"
4437 % (dev.logical_id[1], src_node, dst_node.name))
4438 if not rpc.call_blockdev_remove(src_node, dev):
4439 logger.Error("could not remove snapshot block device %s from node %s" %
4440 (dev.logical_id[1], src_node))
4442 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4443 logger.Error("could not finalize export for instance %s on node %s" %
4444 (instance.name, dst_node.name))
4446 nodelist = self.cfg.GetNodeList()
4447 nodelist.remove(dst_node.name)
4449 # on one-node clusters nodelist will be empty after the removal
4450 # if we proceed the backup would be removed because OpQueryExports
4451 # substitutes an empty list with the full cluster node list.
4453 op = opcodes.OpQueryExports(nodes=nodelist)
4454 exportlist = self.proc.ChainOpCode(op)
4455 for node in exportlist:
4456 if instance.name in exportlist[node]:
4457 if not rpc.call_export_remove(node, instance.name):
4458 logger.Error("could not remove older export for instance %s"
4459 " on node %s" % (instance.name, node))
4462 class LURemoveExport(NoHooksLU):
4463 """Remove exports related to the named instance.
4466 _OP_REQP = ["instance_name"]
4468 def CheckPrereq(self):
4469 """Check prerequisites.
4473 def Exec(self, feedback_fn):
4474 """Remove any export.
4477 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4478 # If the instance was not found we'll try with the name that was passed in.
4479 # This will only work if it was an FQDN, though.
4481 if not instance_name:
4483 instance_name = self.op.instance_name
4485 op = opcodes.OpQueryExports(nodes=[])
4486 exportlist = self.proc.ChainOpCode(op)
4488 for node in exportlist:
4489 if instance_name in exportlist[node]:
4491 if not rpc.call_export_remove(node, instance_name):
4492 logger.Error("could not remove export for instance %s"
4493 " on node %s" % (instance_name, node))
4495 if fqdn_warn and not found:
4496 feedback_fn("Export not found. If trying to remove an export belonging"
4497 " to a deleted instance please use its Fully Qualified"
4501 class TagsLU(NoHooksLU):
4504 This is an abstract class which is the parent of all the other tags LUs.
4507 def CheckPrereq(self):
4508 """Check prerequisites.
4511 if self.op.kind == constants.TAG_CLUSTER:
4512 self.target = self.cfg.GetClusterInfo()
4513 elif self.op.kind == constants.TAG_NODE:
4514 name = self.cfg.ExpandNodeName(self.op.name)
4516 raise errors.OpPrereqError("Invalid node name (%s)" %
4519 self.target = self.cfg.GetNodeInfo(name)
4520 elif self.op.kind == constants.TAG_INSTANCE:
4521 name = self.cfg.ExpandInstanceName(self.op.name)
4523 raise errors.OpPrereqError("Invalid instance name (%s)" %
4526 self.target = self.cfg.GetInstanceInfo(name)
4528 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4532 class LUGetTags(TagsLU):
4533 """Returns the tags of a given object.
4536 _OP_REQP = ["kind", "name"]
4538 def Exec(self, feedback_fn):
4539 """Returns the tag list.
4542 return list(self.target.GetTags())
4545 class LUSearchTags(NoHooksLU):
4546 """Searches the tags for a given pattern.
4549 _OP_REQP = ["pattern"]
4551 def CheckPrereq(self):
4552 """Check prerequisites.
4554 This checks the pattern passed for validity by compiling it.
4558 self.re = re.compile(self.op.pattern)
4559 except re.error, err:
4560 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4561 (self.op.pattern, err))
4563 def Exec(self, feedback_fn):
4564 """Returns the tag list.
4568 tgts = [("/cluster", cfg.GetClusterInfo())]
4569 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4570 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4571 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4572 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4574 for path, target in tgts:
4575 for tag in target.GetTags():
4576 if self.re.search(tag):
4577 results.append((path, tag))
4581 class LUAddTags(TagsLU):
4582 """Sets a tag on a given object.
4585 _OP_REQP = ["kind", "name", "tags"]
4587 def CheckPrereq(self):
4588 """Check prerequisites.
4590 This checks the type and length of the tag name and value.
4593 TagsLU.CheckPrereq(self)
4594 for tag in self.op.tags:
4595 objects.TaggableObject.ValidateTag(tag)
4597 def Exec(self, feedback_fn):
4602 for tag in self.op.tags:
4603 self.target.AddTag(tag)
4604 except errors.TagError, err:
4605 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4607 self.cfg.Update(self.target)
4608 except errors.ConfigurationError:
4609 raise errors.OpRetryError("There has been a modification to the"
4610 " config file and the operation has been"
4611 " aborted. Please retry.")
4614 class LUDelTags(TagsLU):
4615 """Delete a list of tags from a given object.
4618 _OP_REQP = ["kind", "name", "tags"]
4620 def CheckPrereq(self):
4621 """Check prerequisites.
4623 This checks that we have the given tag.
4626 TagsLU.CheckPrereq(self)
4627 for tag in self.op.tags:
4628 objects.TaggableObject.ValidateTag(tag)
4629 del_tags = frozenset(self.op.tags)
4630 cur_tags = self.target.GetTags()
4631 if not del_tags <= cur_tags:
4632 diff_tags = del_tags - cur_tags
4633 diff_names = ["'%s'" % tag for tag in diff_tags]
4635 raise errors.OpPrereqError("Tag(s) %s not found" %
4636 (",".join(diff_names)))
4638 def Exec(self, feedback_fn):
4639 """Remove the tag from the object.
4642 for tag in self.op.tags:
4643 self.target.RemoveTag(tag)
4645 self.cfg.Update(self.target)
4646 except errors.ConfigurationError:
4647 raise errors.OpRetryError("There has been a modification to the"
4648 " config file and the operation has been"
4649 " aborted. Please retry.")
4652 class LUTestDelay(NoHooksLU):
4653 """Sleep for a specified amount of time.
4655 This LU sleeps on the master and/or nodes for a specified amount of
4659 _OP_REQP = ["duration", "on_master", "on_nodes"]
4662 def ExpandNames(self):
4663 """Expand names and set required locks.
4665 This expands the node list, if any.
4668 self.needed_locks = {}
4669 if self.op.on_nodes:
4670 # _GetWantedNodes can be used here, but is not always appropriate to use
4671 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4673 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4674 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4676 def CheckPrereq(self):
4677 """Check prerequisites.
4681 def Exec(self, feedback_fn):
4682 """Do the actual sleep.
4685 if self.op.on_master:
4686 if not utils.TestDelay(self.op.duration):
4687 raise errors.OpExecError("Error during master delay test")
4688 if self.op.on_nodes:
4689 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4691 raise errors.OpExecError("Complete failure from rpc call")
4692 for node, node_result in result.items():
4694 raise errors.OpExecError("Failure during rpc call to node %s,"
4695 " result: %s" % (node, node_result))
4698 class IAllocator(object):
4699 """IAllocator framework.
4701 An IAllocator instance has three sets of attributes:
4702 - cfg/sstore that are needed to query the cluster
4703 - input data (all members of the _KEYS class attribute are required)
4704 - four buffer attributes (in|out_data|text), that represent the
4705 input (to the external script) in text and data structure format,
4706 and the output from it, again in two formats
4707 - the result variables from the script (success, info, nodes) for
4712 "mem_size", "disks", "disk_template",
4713 "os", "tags", "nics", "vcpus",
4719 def __init__(self, cfg, sstore, mode, name, **kwargs):
4721 self.sstore = sstore
4722 # init buffer variables
4723 self.in_text = self.out_text = self.in_data = self.out_data = None
4724 # init all input fields so that pylint is happy
4727 self.mem_size = self.disks = self.disk_template = None
4728 self.os = self.tags = self.nics = self.vcpus = None
4729 self.relocate_from = None
4731 self.required_nodes = None
4732 # init result fields
4733 self.success = self.info = self.nodes = None
4734 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4735 keyset = self._ALLO_KEYS
4736 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4737 keyset = self._RELO_KEYS
4739 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4740 " IAllocator" % self.mode)
4742 if key not in keyset:
4743 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4744 " IAllocator" % key)
4745 setattr(self, key, kwargs[key])
4747 if key not in kwargs:
4748 raise errors.ProgrammerError("Missing input parameter '%s' to"
4749 " IAllocator" % key)
4750 self._BuildInputData()
4752 def _ComputeClusterData(self):
4753 """Compute the generic allocator input data.
4755 This is the data that is independent of the actual operation.
4762 "cluster_name": self.sstore.GetClusterName(),
4763 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4764 "hypervisor_type": self.sstore.GetHypervisorType(),
4765 # we don't have job IDs
4768 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4772 node_list = cfg.GetNodeList()
4773 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4774 for nname in node_list:
4775 ninfo = cfg.GetNodeInfo(nname)
4776 if nname not in node_data or not isinstance(node_data[nname], dict):
4777 raise errors.OpExecError("Can't get data for node %s" % nname)
4778 remote_info = node_data[nname]
4779 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4780 'vg_size', 'vg_free', 'cpu_total']:
4781 if attr not in remote_info:
4782 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4785 remote_info[attr] = int(remote_info[attr])
4786 except ValueError, err:
4787 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4788 " %s" % (nname, attr, str(err)))
4789 # compute memory used by primary instances
4790 i_p_mem = i_p_up_mem = 0
4791 for iinfo in i_list:
4792 if iinfo.primary_node == nname:
4793 i_p_mem += iinfo.memory
4794 if iinfo.status == "up":
4795 i_p_up_mem += iinfo.memory
4797 # compute memory used by instances
4799 "tags": list(ninfo.GetTags()),
4800 "total_memory": remote_info['memory_total'],
4801 "reserved_memory": remote_info['memory_dom0'],
4802 "free_memory": remote_info['memory_free'],
4803 "i_pri_memory": i_p_mem,
4804 "i_pri_up_memory": i_p_up_mem,
4805 "total_disk": remote_info['vg_size'],
4806 "free_disk": remote_info['vg_free'],
4807 "primary_ip": ninfo.primary_ip,
4808 "secondary_ip": ninfo.secondary_ip,
4809 "total_cpus": remote_info['cpu_total'],
4811 node_results[nname] = pnr
4812 data["nodes"] = node_results
4816 for iinfo in i_list:
4817 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4818 for n in iinfo.nics]
4820 "tags": list(iinfo.GetTags()),
4821 "should_run": iinfo.status == "up",
4822 "vcpus": iinfo.vcpus,
4823 "memory": iinfo.memory,
4825 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4827 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4828 "disk_template": iinfo.disk_template,
4830 instance_data[iinfo.name] = pir
4832 data["instances"] = instance_data
4836 def _AddNewInstance(self):
4837 """Add new instance data to allocator structure.
4839 This in combination with _AllocatorGetClusterData will create the
4840 correct structure needed as input for the allocator.
4842 The checks for the completeness of the opcode must have already been
4847 if len(self.disks) != 2:
4848 raise errors.OpExecError("Only two-disk configurations supported")
4850 disk_space = _ComputeDiskSize(self.disk_template,
4851 self.disks[0]["size"], self.disks[1]["size"])
4853 if self.disk_template in constants.DTS_NET_MIRROR:
4854 self.required_nodes = 2
4856 self.required_nodes = 1
4860 "disk_template": self.disk_template,
4863 "vcpus": self.vcpus,
4864 "memory": self.mem_size,
4865 "disks": self.disks,
4866 "disk_space_total": disk_space,
4868 "required_nodes": self.required_nodes,
4870 data["request"] = request
4872 def _AddRelocateInstance(self):
4873 """Add relocate instance data to allocator structure.
4875 This in combination with _IAllocatorGetClusterData will create the
4876 correct structure needed as input for the allocator.
4878 The checks for the completeness of the opcode must have already been
4882 instance = self.cfg.GetInstanceInfo(self.name)
4883 if instance is None:
4884 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4885 " IAllocator" % self.name)
4887 if instance.disk_template not in constants.DTS_NET_MIRROR:
4888 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4890 if len(instance.secondary_nodes) != 1:
4891 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4893 self.required_nodes = 1
4895 disk_space = _ComputeDiskSize(instance.disk_template,
4896 instance.disks[0].size,
4897 instance.disks[1].size)
4902 "disk_space_total": disk_space,
4903 "required_nodes": self.required_nodes,
4904 "relocate_from": self.relocate_from,
4906 self.in_data["request"] = request
4908 def _BuildInputData(self):
4909 """Build input data structures.
4912 self._ComputeClusterData()
4914 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4915 self._AddNewInstance()
4917 self._AddRelocateInstance()
4919 self.in_text = serializer.Dump(self.in_data)
4921 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4922 """Run an instance allocator and return the results.
4927 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4929 if not isinstance(result, tuple) or len(result) != 4:
4930 raise errors.OpExecError("Invalid result from master iallocator runner")
4932 rcode, stdout, stderr, fail = result
4934 if rcode == constants.IARUN_NOTFOUND:
4935 raise errors.OpExecError("Can't find allocator '%s'" % name)
4936 elif rcode == constants.IARUN_FAILURE:
4937 raise errors.OpExecError("Instance allocator call failed: %s,"
4938 " output: %s" % (fail, stdout+stderr))
4939 self.out_text = stdout
4941 self._ValidateResult()
4943 def _ValidateResult(self):
4944 """Process the allocator results.
4946 This will process and if successful save the result in
4947 self.out_data and the other parameters.
4951 rdict = serializer.Load(self.out_text)
4952 except Exception, err:
4953 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4955 if not isinstance(rdict, dict):
4956 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4958 for key in "success", "info", "nodes":
4959 if key not in rdict:
4960 raise errors.OpExecError("Can't parse iallocator results:"
4961 " missing key '%s'" % key)
4962 setattr(self, key, rdict[key])
4964 if not isinstance(rdict["nodes"], list):
4965 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4967 self.out_data = rdict
4970 class LUTestAllocator(NoHooksLU):
4971 """Run allocator tests.
4973 This LU runs the allocator tests
4976 _OP_REQP = ["direction", "mode", "name"]
4978 def CheckPrereq(self):
4979 """Check prerequisites.
4981 This checks the opcode parameters depending on the director and mode test.
4984 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4985 for attr in ["name", "mem_size", "disks", "disk_template",
4986 "os", "tags", "nics", "vcpus"]:
4987 if not hasattr(self.op, attr):
4988 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4990 iname = self.cfg.ExpandInstanceName(self.op.name)
4991 if iname is not None:
4992 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4994 if not isinstance(self.op.nics, list):
4995 raise errors.OpPrereqError("Invalid parameter 'nics'")
4996 for row in self.op.nics:
4997 if (not isinstance(row, dict) or
5000 "bridge" not in row):
5001 raise errors.OpPrereqError("Invalid contents of the"
5002 " 'nics' parameter")
5003 if not isinstance(self.op.disks, list):
5004 raise errors.OpPrereqError("Invalid parameter 'disks'")
5005 if len(self.op.disks) != 2:
5006 raise errors.OpPrereqError("Only two-disk configurations supported")
5007 for row in self.op.disks:
5008 if (not isinstance(row, dict) or
5009 "size" not in row or
5010 not isinstance(row["size"], int) or
5011 "mode" not in row or
5012 row["mode"] not in ['r', 'w']):
5013 raise errors.OpPrereqError("Invalid contents of the"
5014 " 'disks' parameter")
5015 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5016 if not hasattr(self.op, "name"):
5017 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5018 fname = self.cfg.ExpandInstanceName(self.op.name)
5020 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5022 self.op.name = fname
5023 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5025 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5028 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5029 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5030 raise errors.OpPrereqError("Missing allocator name")
5031 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5032 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5035 def Exec(self, feedback_fn):
5036 """Run the allocator test.
5039 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5040 ial = IAllocator(self.cfg, self.sstore,
5043 mem_size=self.op.mem_size,
5044 disks=self.op.disks,
5045 disk_template=self.op.disk_template,
5049 vcpus=self.op.vcpus,
5052 ial = IAllocator(self.cfg, self.sstore,
5055 relocate_from=list(self.relocate_from),
5058 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5059 result = ial.in_text
5061 ial.Run(self.op.allocator, validate=False)
5062 result = ial.out_text