4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_WSSTORE: the LU needs a writable SimpleStore
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
71 def __init__(self, processor, op, context, sstore):
72 """Constructor for LogicalUnit.
74 This needs to be overriden in derived classes in order to check op
80 self.cfg = context.cfg
82 self.context = context
83 self.needed_locks = None
84 self.acquired_locks = {}
85 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 for attr_name in self._OP_REQP:
91 attr_val = getattr(op, attr_name, None)
93 raise errors.OpPrereqError("Required parameter '%s' missing" %
96 if not self.cfg.IsCluster():
97 raise errors.OpPrereqError("Cluster not initialized yet,"
98 " use 'gnt-cluster init' first.")
100 master = sstore.GetMasterNode()
101 if master != utils.HostInfo().name:
102 raise errors.OpPrereqError("Commands must be run on the master"
106 """Returns the SshRunner object
110 self.__ssh = ssh.SshRunner(self.sstore)
113 ssh = property(fget=__GetSSH)
115 def ExpandNames(self):
116 """Expand names for this LU.
118 This method is called before starting to execute the opcode, and it should
119 update all the parameters of the opcode to their canonical form (e.g. a
120 short node name must be fully expanded after this method has successfully
121 completed). This way locking, hooks, logging, ecc. can work correctly.
123 LUs which implement this method must also populate the self.needed_locks
124 member, as a dict with lock levels as keys, and a list of needed lock names
126 - Use an empty dict if you don't need any lock
127 - If you don't need any lock at a particular level omit that level
128 - Don't put anything for the BGL level
129 - If you want all locks at a level use locking.ALL_SET as a value
131 If you need to share locks (rather than acquire them exclusively) at one
132 level you can modify self.share_locks, setting a true value (usually 1) for
133 that level. By default locks are not shared.
136 # Acquire all nodes and one instance
137 self.needed_locks = {
138 locking.LEVEL_NODE: locking.ALL_SET,
139 locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141 # Acquire just two nodes
142 self.needed_locks = {
143 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146 self.needed_locks = {} # No, you can't leave it to the default value None
149 # The implementation of this method is mandatory only if the new LU is
150 # concurrent, so that old LUs don't need to be changed all at the same
153 self.needed_locks = {} # Exclusive LUs don't need locks.
155 raise NotImplementedError
157 def DeclareLocks(self, level):
158 """Declare LU locking needs for a level
160 While most LUs can just declare their locking needs at ExpandNames time,
161 sometimes there's the need to calculate some locks after having acquired
162 the ones before. This function is called just before acquiring locks at a
163 particular level, but after acquiring the ones at lower levels, and permits
164 such calculations. It can be used to modify self.needed_locks, and by
165 default it does nothing.
167 This function is only called if you have something already set in
168 self.needed_locks for the level.
170 @param level: Locking level which is going to be locked
171 @type level: member of ganeti.locking.LEVELS
175 def CheckPrereq(self):
176 """Check prerequisites for this LU.
178 This method should check that the prerequisites for the execution
179 of this LU are fulfilled. It can do internode communication, but
180 it should be idempotent - no cluster or system changes are
183 The method should raise errors.OpPrereqError in case something is
184 not fulfilled. Its return value is ignored.
186 This method should also update all the parameters of the opcode to
187 their canonical form if it hasn't been done by ExpandNames before.
190 raise NotImplementedError
192 def Exec(self, feedback_fn):
195 This method should implement the actual work. It should raise
196 errors.OpExecError for failures that are somewhat dealt with in
200 raise NotImplementedError
202 def BuildHooksEnv(self):
203 """Build hooks environment for this LU.
205 This method should return a three-node tuple consisting of: a dict
206 containing the environment that will be used for running the
207 specific hook for this LU, a list of node names on which the hook
208 should run before the execution, and a list of node names on which
209 the hook should run after the execution.
211 The keys of the dict must not have 'GANETI_' prefixed as this will
212 be handled in the hooks runner. Also note additional keys will be
213 added by the hooks runner. If the LU doesn't define any
214 environment, an empty dict (and not None) should be returned.
216 No nodes should be returned as an empty list (and not None).
218 Note that if the HPATH for a LU class is None, this function will
222 raise NotImplementedError
224 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225 """Notify the LU about the results of its hooks.
227 This method is called every time a hooks phase is executed, and notifies
228 the Logical Unit about the hooks' result. The LU can then use it to alter
229 its result based on the hooks. By default the method does nothing and the
230 previous result is passed back unchanged but any LU can define it if it
231 wants to use the local cluster hook-scripts somehow.
234 phase: the hooks phase that has just been run
235 hooks_results: the results of the multi-node hooks rpc call
236 feedback_fn: function to send feedback back to the caller
237 lu_result: the previous result this LU had, or None in the PRE phase.
242 def _ExpandAndLockInstance(self):
243 """Helper function to expand and lock an instance.
245 Many LUs that work on an instance take its name in self.op.instance_name
246 and need to expand it and then declare the expanded name for locking. This
247 function does it, and then updates self.op.instance_name to the expanded
248 name. It also initializes needed_locks as a dict, if this hasn't been done
252 if self.needed_locks is None:
253 self.needed_locks = {}
255 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256 "_ExpandAndLockInstance called with instance-level locks set"
257 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258 if expanded_name is None:
259 raise errors.OpPrereqError("Instance '%s' not known" %
260 self.op.instance_name)
261 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262 self.op.instance_name = expanded_name
264 def _LockInstancesNodes(self, primary_only=False):
265 """Helper function to declare instances' nodes for locking.
267 This function should be called after locking one or more instances to lock
268 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269 with all primary or secondary nodes for instances already locked and
270 present in self.needed_locks[locking.LEVEL_INSTANCE].
272 It should be called from DeclareLocks, and for safety only works if
273 self.recalculate_locks[locking.LEVEL_NODE] is set.
275 In the future it may grow parameters to just lock some instance's nodes, or
276 to just lock primaries or secondary nodes, if needed.
278 If should be called in DeclareLocks in a way similar to:
280 if level == locking.LEVEL_NODE:
281 self._LockInstancesNodes()
283 @type primary_only: boolean
284 @param primary_only: only lock primary nodes of locked instances
287 assert locking.LEVEL_NODE in self.recalculate_locks, \
288 "_LockInstancesNodes helper function called with no nodes to recalculate"
290 # TODO: check if we're really been called with the instance locks held
292 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293 # future we might want to have different behaviors depending on the value
294 # of self.recalculate_locks[locking.LEVEL_NODE]
296 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297 instance = self.context.cfg.GetInstanceInfo(instance_name)
298 wanted_nodes.append(instance.primary_node)
300 wanted_nodes.extend(instance.secondary_nodes)
301 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
303 del self.recalculate_locks[locking.LEVEL_NODE]
306 class NoHooksLU(LogicalUnit):
307 """Simple LU which runs no hooks.
309 This LU is intended as a parent for other LogicalUnits which will
310 run no hooks, in order to reduce duplicate code.
317 def _GetWantedNodes(lu, nodes):
318 """Returns list of checked and expanded node names.
321 nodes: List of nodes (strings) or None for all
324 if not isinstance(nodes, list):
325 raise errors.OpPrereqError("Invalid argument type 'nodes'")
328 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
329 " non-empty list of nodes whose name is to be expanded.")
333 node = lu.cfg.ExpandNodeName(name)
335 raise errors.OpPrereqError("No such node name '%s'" % name)
338 return utils.NiceSort(wanted)
341 def _GetWantedInstances(lu, instances):
342 """Returns list of checked and expanded instance names.
345 instances: List of instances (strings) or None for all
348 if not isinstance(instances, list):
349 raise errors.OpPrereqError("Invalid argument type 'instances'")
354 for name in instances:
355 instance = lu.cfg.ExpandInstanceName(name)
357 raise errors.OpPrereqError("No such instance name '%s'" % name)
358 wanted.append(instance)
361 wanted = lu.cfg.GetInstanceList()
362 return utils.NiceSort(wanted)
365 def _CheckOutputFields(static, dynamic, selected):
366 """Checks whether all selected fields are valid.
369 static: Static fields
370 dynamic: Dynamic fields
373 static_fields = frozenset(static)
374 dynamic_fields = frozenset(dynamic)
376 all_fields = static_fields | dynamic_fields
378 if not all_fields.issuperset(selected):
379 raise errors.OpPrereqError("Unknown output fields selected: %s"
380 % ",".join(frozenset(selected).
381 difference(all_fields)))
384 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
385 memory, vcpus, nics):
386 """Builds instance related env variables for hooks from single variables.
389 secondary_nodes: List of secondary nodes as strings
393 "INSTANCE_NAME": name,
394 "INSTANCE_PRIMARY": primary_node,
395 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
396 "INSTANCE_OS_TYPE": os_type,
397 "INSTANCE_STATUS": status,
398 "INSTANCE_MEMORY": memory,
399 "INSTANCE_VCPUS": vcpus,
403 nic_count = len(nics)
404 for idx, (ip, bridge, mac) in enumerate(nics):
407 env["INSTANCE_NIC%d_IP" % idx] = ip
408 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
409 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
413 env["INSTANCE_NIC_COUNT"] = nic_count
418 def _BuildInstanceHookEnvByObject(instance, override=None):
419 """Builds instance related env variables for hooks from an object.
422 instance: objects.Instance object of instance
423 override: dict of values to override
426 'name': instance.name,
427 'primary_node': instance.primary_node,
428 'secondary_nodes': instance.secondary_nodes,
429 'os_type': instance.os,
430 'status': instance.os,
431 'memory': instance.memory,
432 'vcpus': instance.vcpus,
433 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
436 args.update(override)
437 return _BuildInstanceHookEnv(**args)
440 def _CheckInstanceBridgesExist(instance):
441 """Check that the brigdes needed by an instance exist.
444 # check bridges existance
445 brlist = [nic.bridge for nic in instance.nics]
446 if not rpc.call_bridges_exist(instance.primary_node, brlist):
447 raise errors.OpPrereqError("one or more target bridges %s does not"
448 " exist on destination node '%s'" %
449 (brlist, instance.primary_node))
452 class LUDestroyCluster(NoHooksLU):
453 """Logical unit for destroying the cluster.
458 def CheckPrereq(self):
459 """Check prerequisites.
461 This checks whether the cluster is empty.
463 Any errors are signalled by raising errors.OpPrereqError.
466 master = self.sstore.GetMasterNode()
468 nodelist = self.cfg.GetNodeList()
469 if len(nodelist) != 1 or nodelist[0] != master:
470 raise errors.OpPrereqError("There are still %d node(s) in"
471 " this cluster." % (len(nodelist) - 1))
472 instancelist = self.cfg.GetInstanceList()
474 raise errors.OpPrereqError("There are still %d instance(s) in"
475 " this cluster." % len(instancelist))
477 def Exec(self, feedback_fn):
478 """Destroys the cluster.
481 master = self.sstore.GetMasterNode()
482 if not rpc.call_node_stop_master(master, False):
483 raise errors.OpExecError("Could not disable the master role")
484 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
485 utils.CreateBackup(priv_key)
486 utils.CreateBackup(pub_key)
490 class LUVerifyCluster(LogicalUnit):
491 """Verifies the cluster status.
494 HPATH = "cluster-verify"
495 HTYPE = constants.HTYPE_CLUSTER
496 _OP_REQP = ["skip_checks"]
498 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
499 remote_version, feedback_fn):
500 """Run multiple tests against a node.
503 - compares ganeti version
504 - checks vg existance and size > 20G
505 - checks config file checksum
506 - checks ssh to other nodes
509 node: name of the node to check
510 file_list: required list of files
511 local_cksum: dictionary of local files and their checksums
514 # compares ganeti version
515 local_version = constants.PROTOCOL_VERSION
516 if not remote_version:
517 feedback_fn(" - ERROR: connection to %s failed" % (node))
520 if local_version != remote_version:
521 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
522 (local_version, node, remote_version))
525 # checks vg existance and size > 20G
529 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
533 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
534 constants.MIN_VG_SIZE)
536 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
539 # checks config file checksum
542 if 'filelist' not in node_result:
544 feedback_fn(" - ERROR: node hasn't returned file checksum data")
546 remote_cksum = node_result['filelist']
547 for file_name in file_list:
548 if file_name not in remote_cksum:
550 feedback_fn(" - ERROR: file '%s' missing" % file_name)
551 elif remote_cksum[file_name] != local_cksum[file_name]:
553 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
555 if 'nodelist' not in node_result:
557 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
559 if node_result['nodelist']:
561 for node in node_result['nodelist']:
562 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
563 (node, node_result['nodelist'][node]))
564 if 'node-net-test' not in node_result:
566 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
568 if node_result['node-net-test']:
570 nlist = utils.NiceSort(node_result['node-net-test'].keys())
572 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
573 (node, node_result['node-net-test'][node]))
575 hyp_result = node_result.get('hypervisor', None)
576 if hyp_result is not None:
577 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
580 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
581 node_instance, feedback_fn):
582 """Verify an instance.
584 This function checks to see if the required block devices are
585 available on the instance's node.
590 node_current = instanceconfig.primary_node
593 instanceconfig.MapLVsByNode(node_vol_should)
595 for node in node_vol_should:
596 for volume in node_vol_should[node]:
597 if node not in node_vol_is or volume not in node_vol_is[node]:
598 feedback_fn(" - ERROR: volume %s missing on node %s" %
602 if not instanceconfig.status == 'down':
603 if (node_current not in node_instance or
604 not instance in node_instance[node_current]):
605 feedback_fn(" - ERROR: instance %s not running on node %s" %
606 (instance, node_current))
609 for node in node_instance:
610 if (not node == node_current):
611 if instance in node_instance[node]:
612 feedback_fn(" - ERROR: instance %s should not run on node %s" %
618 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
619 """Verify if there are any unknown volumes in the cluster.
621 The .os, .swap and backup volumes are ignored. All other volumes are
627 for node in node_vol_is:
628 for volume in node_vol_is[node]:
629 if node not in node_vol_should or volume not in node_vol_should[node]:
630 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
635 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
636 """Verify the list of running instances.
638 This checks what instances are running but unknown to the cluster.
642 for node in node_instance:
643 for runninginstance in node_instance[node]:
644 if runninginstance not in instancelist:
645 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
646 (runninginstance, node))
650 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
651 """Verify N+1 Memory Resilience.
653 Check that if one single node dies we can still start all the instances it
659 for node, nodeinfo in node_info.iteritems():
660 # This code checks that every node which is now listed as secondary has
661 # enough memory to host all instances it is supposed to should a single
662 # other node in the cluster fail.
663 # FIXME: not ready for failover to an arbitrary node
664 # FIXME: does not support file-backed instances
665 # WARNING: we currently take into account down instances as well as up
666 # ones, considering that even if they're down someone might want to start
667 # them even in the event of a node failure.
668 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
670 for instance in instances:
671 needed_mem += instance_cfg[instance].memory
672 if nodeinfo['mfree'] < needed_mem:
673 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
674 " failovers should node %s fail" % (node, prinode))
678 def CheckPrereq(self):
679 """Check prerequisites.
681 Transform the list of checks we're going to skip into a set and check that
682 all its members are valid.
685 self.skip_set = frozenset(self.op.skip_checks)
686 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
687 raise errors.OpPrereqError("Invalid checks to be skipped specified")
689 def BuildHooksEnv(self):
692 Cluster-Verify hooks just rone in the post phase and their failure makes
693 the output be logged in the verify output and the verification to fail.
696 all_nodes = self.cfg.GetNodeList()
697 # TODO: populate the environment with useful information for verify hooks
699 return env, [], all_nodes
701 def Exec(self, feedback_fn):
702 """Verify integrity of cluster, performing various test on nodes.
706 feedback_fn("* Verifying global settings")
707 for msg in self.cfg.VerifyConfig():
708 feedback_fn(" - ERROR: %s" % msg)
710 vg_name = self.cfg.GetVGName()
711 nodelist = utils.NiceSort(self.cfg.GetNodeList())
712 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
713 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
714 i_non_redundant = [] # Non redundant instances
720 # FIXME: verify OS list
722 file_names = list(self.sstore.GetFileList())
723 file_names.append(constants.SSL_CERT_FILE)
724 file_names.append(constants.CLUSTER_CONF_FILE)
725 local_checksums = utils.FingerprintFiles(file_names)
727 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
728 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
729 all_instanceinfo = rpc.call_instance_list(nodelist)
730 all_vglist = rpc.call_vg_list(nodelist)
731 node_verify_param = {
732 'filelist': file_names,
733 'nodelist': nodelist,
735 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
736 for node in nodeinfo]
738 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
739 all_rversion = rpc.call_version(nodelist)
740 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
742 for node in nodelist:
743 feedback_fn("* Verifying node %s" % node)
744 result = self._VerifyNode(node, file_names, local_checksums,
745 all_vglist[node], all_nvinfo[node],
746 all_rversion[node], feedback_fn)
750 volumeinfo = all_volumeinfo[node]
752 if isinstance(volumeinfo, basestring):
753 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
754 (node, volumeinfo[-400:].encode('string_escape')))
756 node_volume[node] = {}
757 elif not isinstance(volumeinfo, dict):
758 feedback_fn(" - ERROR: connection to %s failed" % (node,))
762 node_volume[node] = volumeinfo
765 nodeinstance = all_instanceinfo[node]
766 if type(nodeinstance) != list:
767 feedback_fn(" - ERROR: connection to %s failed" % (node,))
771 node_instance[node] = nodeinstance
774 nodeinfo = all_ninfo[node]
775 if not isinstance(nodeinfo, dict):
776 feedback_fn(" - ERROR: connection to %s failed" % (node,))
782 "mfree": int(nodeinfo['memory_free']),
783 "dfree": int(nodeinfo['vg_free']),
786 # dictionary holding all instances this node is secondary for,
787 # grouped by their primary node. Each key is a cluster node, and each
788 # value is a list of instances which have the key as primary and the
789 # current node as secondary. this is handy to calculate N+1 memory
790 # availability if you can only failover from a primary to its
792 "sinst-by-pnode": {},
795 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
801 for instance in instancelist:
802 feedback_fn("* Verifying instance %s" % instance)
803 inst_config = self.cfg.GetInstanceInfo(instance)
804 result = self._VerifyInstance(instance, inst_config, node_volume,
805 node_instance, feedback_fn)
808 inst_config.MapLVsByNode(node_vol_should)
810 instance_cfg[instance] = inst_config
812 pnode = inst_config.primary_node
813 if pnode in node_info:
814 node_info[pnode]['pinst'].append(instance)
816 feedback_fn(" - ERROR: instance %s, connection to primary node"
817 " %s failed" % (instance, pnode))
820 # If the instance is non-redundant we cannot survive losing its primary
821 # node, so we are not N+1 compliant. On the other hand we have no disk
822 # templates with more than one secondary so that situation is not well
824 # FIXME: does not support file-backed instances
825 if len(inst_config.secondary_nodes) == 0:
826 i_non_redundant.append(instance)
827 elif len(inst_config.secondary_nodes) > 1:
828 feedback_fn(" - WARNING: multiple secondaries for instance %s"
831 for snode in inst_config.secondary_nodes:
832 if snode in node_info:
833 node_info[snode]['sinst'].append(instance)
834 if pnode not in node_info[snode]['sinst-by-pnode']:
835 node_info[snode]['sinst-by-pnode'][pnode] = []
836 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
838 feedback_fn(" - ERROR: instance %s, connection to secondary node"
839 " %s failed" % (instance, snode))
841 feedback_fn("* Verifying orphan volumes")
842 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
846 feedback_fn("* Verifying remaining instances")
847 result = self._VerifyOrphanInstances(instancelist, node_instance,
851 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
852 feedback_fn("* Verifying N+1 Memory redundancy")
853 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
856 feedback_fn("* Other Notes")
858 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
859 % len(i_non_redundant))
863 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
864 """Analize the post-hooks' result, handle it, and send some
865 nicely-formatted feedback back to the user.
868 phase: the hooks phase that has just been run
869 hooks_results: the results of the multi-node hooks rpc call
870 feedback_fn: function to send feedback back to the caller
871 lu_result: previous Exec result
874 # We only really run POST phase hooks, and are only interested in
876 if phase == constants.HOOKS_PHASE_POST:
877 # Used to change hooks' output to proper indentation
878 indent_re = re.compile('^', re.M)
879 feedback_fn("* Hooks Results")
880 if not hooks_results:
881 feedback_fn(" - ERROR: general communication failure")
884 for node_name in hooks_results:
885 show_node_header = True
886 res = hooks_results[node_name]
887 if res is False or not isinstance(res, list):
888 feedback_fn(" Communication failure")
891 for script, hkr, output in res:
892 if hkr == constants.HKR_FAIL:
893 # The node header is only shown once, if there are
894 # failing hooks on that node
896 feedback_fn(" Node %s:" % node_name)
897 show_node_header = False
898 feedback_fn(" ERROR: Script %s failed, output:" % script)
899 output = indent_re.sub(' ', output)
900 feedback_fn("%s" % output)
906 class LUVerifyDisks(NoHooksLU):
907 """Verifies the cluster disks status.
912 def CheckPrereq(self):
913 """Check prerequisites.
915 This has no prerequisites.
920 def Exec(self, feedback_fn):
921 """Verify integrity of cluster disks.
924 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
926 vg_name = self.cfg.GetVGName()
927 nodes = utils.NiceSort(self.cfg.GetNodeList())
928 instances = [self.cfg.GetInstanceInfo(name)
929 for name in self.cfg.GetInstanceList()]
932 for inst in instances:
934 if (inst.status != "up" or
935 inst.disk_template not in constants.DTS_NET_MIRROR):
937 inst.MapLVsByNode(inst_lvs)
938 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
939 for node, vol_list in inst_lvs.iteritems():
941 nv_dict[(node, vol)] = inst
946 node_lvs = rpc.call_volume_list(nodes, vg_name)
953 if isinstance(lvs, basestring):
954 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
956 elif not isinstance(lvs, dict):
957 logger.Info("connection to node %s failed or invalid data returned" %
959 res_nodes.append(node)
962 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
963 inst = nv_dict.pop((node, lv_name), None)
964 if (not lv_online and inst is not None
965 and inst.name not in res_instances):
966 res_instances.append(inst.name)
968 # any leftover items in nv_dict are missing LVs, let's arrange the
970 for key, inst in nv_dict.iteritems():
971 if inst.name not in res_missing:
972 res_missing[inst.name] = []
973 res_missing[inst.name].append(key)
978 class LURenameCluster(LogicalUnit):
979 """Rename the cluster.
982 HPATH = "cluster-rename"
983 HTYPE = constants.HTYPE_CLUSTER
987 def BuildHooksEnv(self):
992 "OP_TARGET": self.sstore.GetClusterName(),
993 "NEW_NAME": self.op.name,
995 mn = self.sstore.GetMasterNode()
996 return env, [mn], [mn]
998 def CheckPrereq(self):
999 """Verify that the passed name is a valid one.
1002 hostname = utils.HostInfo(self.op.name)
1004 new_name = hostname.name
1005 self.ip = new_ip = hostname.ip
1006 old_name = self.sstore.GetClusterName()
1007 old_ip = self.sstore.GetMasterIP()
1008 if new_name == old_name and new_ip == old_ip:
1009 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1010 " cluster has changed")
1011 if new_ip != old_ip:
1012 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1013 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1014 " reachable on the network. Aborting." %
1017 self.op.name = new_name
1019 def Exec(self, feedback_fn):
1020 """Rename the cluster.
1023 clustername = self.op.name
1027 # shutdown the master IP
1028 master = ss.GetMasterNode()
1029 if not rpc.call_node_stop_master(master, False):
1030 raise errors.OpExecError("Could not disable the master role")
1034 ss.SetKey(ss.SS_MASTER_IP, ip)
1035 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1037 # Distribute updated ss config to all nodes
1038 myself = self.cfg.GetNodeInfo(master)
1039 dist_nodes = self.cfg.GetNodeList()
1040 if myself.name in dist_nodes:
1041 dist_nodes.remove(myself.name)
1043 logger.Debug("Copying updated ssconf data to all nodes")
1044 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1045 fname = ss.KeyToFilename(keyname)
1046 result = rpc.call_upload_file(dist_nodes, fname)
1047 for to_node in dist_nodes:
1048 if not result[to_node]:
1049 logger.Error("copy of file %s to node %s failed" %
1052 if not rpc.call_node_start_master(master, False):
1053 logger.Error("Could not re-enable the master role on the master,"
1054 " please restart manually.")
1057 def _RecursiveCheckIfLVMBased(disk):
1058 """Check if the given disk or its children are lvm-based.
1061 disk: ganeti.objects.Disk object
1064 boolean indicating whether a LD_LV dev_type was found or not
1068 for chdisk in disk.children:
1069 if _RecursiveCheckIfLVMBased(chdisk):
1071 return disk.dev_type == constants.LD_LV
1074 class LUSetClusterParams(LogicalUnit):
1075 """Change the parameters of the cluster.
1078 HPATH = "cluster-modify"
1079 HTYPE = constants.HTYPE_CLUSTER
1082 def BuildHooksEnv(self):
1087 "OP_TARGET": self.sstore.GetClusterName(),
1088 "NEW_VG_NAME": self.op.vg_name,
1090 mn = self.sstore.GetMasterNode()
1091 return env, [mn], [mn]
1093 def CheckPrereq(self):
1094 """Check prerequisites.
1096 This checks whether the given params don't conflict and
1097 if the given volume group is valid.
1100 if not self.op.vg_name:
1101 instances = [self.cfg.GetInstanceInfo(name)
1102 for name in self.cfg.GetInstanceList()]
1103 for inst in instances:
1104 for disk in inst.disks:
1105 if _RecursiveCheckIfLVMBased(disk):
1106 raise errors.OpPrereqError("Cannot disable lvm storage while"
1107 " lvm-based instances exist")
1109 # if vg_name not None, checks given volume group on all nodes
1111 node_list = self.cfg.GetNodeList()
1112 vglist = rpc.call_vg_list(node_list)
1113 for node in node_list:
1114 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1115 constants.MIN_VG_SIZE)
1117 raise errors.OpPrereqError("Error on node '%s': %s" %
1120 def Exec(self, feedback_fn):
1121 """Change the parameters of the cluster.
1124 if self.op.vg_name != self.cfg.GetVGName():
1125 self.cfg.SetVGName(self.op.vg_name)
1127 feedback_fn("Cluster LVM configuration already in desired"
1128 " state, not changing")
1131 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1132 """Sleep and poll for an instance's disk to sync.
1135 if not instance.disks:
1139 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1141 node = instance.primary_node
1143 for dev in instance.disks:
1144 cfgw.SetDiskID(dev, node)
1150 cumul_degraded = False
1151 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1153 proc.LogWarning("Can't get any data from node %s" % node)
1156 raise errors.RemoteError("Can't contact node %s for mirror data,"
1157 " aborting." % node)
1161 for i in range(len(rstats)):
1164 proc.LogWarning("Can't compute data for node %s/%s" %
1165 (node, instance.disks[i].iv_name))
1167 # we ignore the ldisk parameter
1168 perc_done, est_time, is_degraded, _ = mstat
1169 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1170 if perc_done is not None:
1172 if est_time is not None:
1173 rem_time = "%d estimated seconds remaining" % est_time
1176 rem_time = "no time estimate"
1177 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1178 (instance.disks[i].iv_name, perc_done, rem_time))
1182 time.sleep(min(60, max_time))
1185 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1186 return not cumul_degraded
1189 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1190 """Check that mirrors are not degraded.
1192 The ldisk parameter, if True, will change the test from the
1193 is_degraded attribute (which represents overall non-ok status for
1194 the device(s)) to the ldisk (representing the local storage status).
1197 cfgw.SetDiskID(dev, node)
1204 if on_primary or dev.AssembleOnSecondary():
1205 rstats = rpc.call_blockdev_find(node, dev)
1207 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1210 result = result and (not rstats[idx])
1212 for child in dev.children:
1213 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1218 class LUDiagnoseOS(NoHooksLU):
1219 """Logical unit for OS diagnose/query.
1222 _OP_REQP = ["output_fields", "names"]
1225 def ExpandNames(self):
1227 raise errors.OpPrereqError("Selective OS query not supported")
1229 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1230 _CheckOutputFields(static=[],
1231 dynamic=self.dynamic_fields,
1232 selected=self.op.output_fields)
1234 # Lock all nodes, in shared mode
1235 self.needed_locks = {}
1236 self.share_locks[locking.LEVEL_NODE] = 1
1237 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1239 def CheckPrereq(self):
1240 """Check prerequisites.
1245 def _DiagnoseByOS(node_list, rlist):
1246 """Remaps a per-node return list into an a per-os per-node dictionary
1249 node_list: a list with the names of all nodes
1250 rlist: a map with node names as keys and OS objects as values
1253 map: a map with osnames as keys and as value another map, with
1255 keys and list of OS objects as values
1256 e.g. {"debian-etch": {"node1": [<object>,...],
1257 "node2": [<object>,]}
1262 for node_name, nr in rlist.iteritems():
1266 if os_obj.name not in all_os:
1267 # build a list of nodes for this os containing empty lists
1268 # for each node in node_list
1269 all_os[os_obj.name] = {}
1270 for nname in node_list:
1271 all_os[os_obj.name][nname] = []
1272 all_os[os_obj.name][node_name].append(os_obj)
1275 def Exec(self, feedback_fn):
1276 """Compute the list of OSes.
1279 node_list = self.acquired_locks[locking.LEVEL_NODE]
1280 node_data = rpc.call_os_diagnose(node_list)
1281 if node_data == False:
1282 raise errors.OpExecError("Can't gather the list of OSes")
1283 pol = self._DiagnoseByOS(node_list, node_data)
1285 for os_name, os_data in pol.iteritems():
1287 for field in self.op.output_fields:
1290 elif field == "valid":
1291 val = utils.all([osl and osl[0] for osl in os_data.values()])
1292 elif field == "node_status":
1294 for node_name, nos_list in os_data.iteritems():
1295 val[node_name] = [(v.status, v.path) for v in nos_list]
1297 raise errors.ParameterError(field)
1304 class LURemoveNode(LogicalUnit):
1305 """Logical unit for removing a node.
1308 HPATH = "node-remove"
1309 HTYPE = constants.HTYPE_NODE
1310 _OP_REQP = ["node_name"]
1312 def BuildHooksEnv(self):
1315 This doesn't run on the target node in the pre phase as a failed
1316 node would then be impossible to remove.
1320 "OP_TARGET": self.op.node_name,
1321 "NODE_NAME": self.op.node_name,
1323 all_nodes = self.cfg.GetNodeList()
1324 all_nodes.remove(self.op.node_name)
1325 return env, all_nodes, all_nodes
1327 def CheckPrereq(self):
1328 """Check prerequisites.
1331 - the node exists in the configuration
1332 - it does not have primary or secondary instances
1333 - it's not the master
1335 Any errors are signalled by raising errors.OpPrereqError.
1338 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1340 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1342 instance_list = self.cfg.GetInstanceList()
1344 masternode = self.sstore.GetMasterNode()
1345 if node.name == masternode:
1346 raise errors.OpPrereqError("Node is the master node,"
1347 " you need to failover first.")
1349 for instance_name in instance_list:
1350 instance = self.cfg.GetInstanceInfo(instance_name)
1351 if node.name == instance.primary_node:
1352 raise errors.OpPrereqError("Instance %s still running on the node,"
1353 " please remove first." % instance_name)
1354 if node.name in instance.secondary_nodes:
1355 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1356 " please remove first." % instance_name)
1357 self.op.node_name = node.name
1360 def Exec(self, feedback_fn):
1361 """Removes the node from the cluster.
1365 logger.Info("stopping the node daemon and removing configs from node %s" %
1368 self.context.RemoveNode(node.name)
1370 rpc.call_node_leave_cluster(node.name)
1373 class LUQueryNodes(NoHooksLU):
1374 """Logical unit for querying nodes.
1377 _OP_REQP = ["output_fields", "names"]
1380 def ExpandNames(self):
1381 self.dynamic_fields = frozenset([
1383 "mtotal", "mnode", "mfree",
1388 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1389 "pinst_list", "sinst_list",
1390 "pip", "sip", "tags"],
1391 dynamic=self.dynamic_fields,
1392 selected=self.op.output_fields)
1394 self.needed_locks = {}
1395 self.share_locks[locking.LEVEL_NODE] = 1
1396 # TODO: we could lock nodes only if the user asked for dynamic fields. For
1397 # that we need atomic ways to get info for a group of nodes from the
1399 if not self.op.names:
1400 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1402 self.needed_locks[locking.LEVEL_NODE] = \
1403 _GetWantedNodes(self, self.op.names)
1405 def CheckPrereq(self):
1406 """Check prerequisites.
1409 # This of course is valid only if we locked the nodes
1410 self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1412 def Exec(self, feedback_fn):
1413 """Computes the list of nodes and their attributes.
1416 nodenames = self.wanted
1417 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1419 # begin data gathering
1421 if self.dynamic_fields.intersection(self.op.output_fields):
1423 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1424 for name in nodenames:
1425 nodeinfo = node_data.get(name, None)
1428 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1429 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1430 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1431 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1432 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1433 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1434 "bootid": nodeinfo['bootid'],
1437 live_data[name] = {}
1439 live_data = dict.fromkeys(nodenames, {})
1441 node_to_primary = dict([(name, set()) for name in nodenames])
1442 node_to_secondary = dict([(name, set()) for name in nodenames])
1444 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1445 "sinst_cnt", "sinst_list"))
1446 if inst_fields & frozenset(self.op.output_fields):
1447 instancelist = self.cfg.GetInstanceList()
1449 for instance_name in instancelist:
1450 inst = self.cfg.GetInstanceInfo(instance_name)
1451 if inst.primary_node in node_to_primary:
1452 node_to_primary[inst.primary_node].add(inst.name)
1453 for secnode in inst.secondary_nodes:
1454 if secnode in node_to_secondary:
1455 node_to_secondary[secnode].add(inst.name)
1457 # end data gathering
1460 for node in nodelist:
1462 for field in self.op.output_fields:
1465 elif field == "pinst_list":
1466 val = list(node_to_primary[node.name])
1467 elif field == "sinst_list":
1468 val = list(node_to_secondary[node.name])
1469 elif field == "pinst_cnt":
1470 val = len(node_to_primary[node.name])
1471 elif field == "sinst_cnt":
1472 val = len(node_to_secondary[node.name])
1473 elif field == "pip":
1474 val = node.primary_ip
1475 elif field == "sip":
1476 val = node.secondary_ip
1477 elif field == "tags":
1478 val = list(node.GetTags())
1479 elif field in self.dynamic_fields:
1480 val = live_data[node.name].get(field, None)
1482 raise errors.ParameterError(field)
1483 node_output.append(val)
1484 output.append(node_output)
1489 class LUQueryNodeVolumes(NoHooksLU):
1490 """Logical unit for getting volumes on node(s).
1493 _OP_REQP = ["nodes", "output_fields"]
1496 def ExpandNames(self):
1497 _CheckOutputFields(static=["node"],
1498 dynamic=["phys", "vg", "name", "size", "instance"],
1499 selected=self.op.output_fields)
1501 self.needed_locks = {}
1502 self.share_locks[locking.LEVEL_NODE] = 1
1503 if not self.op.nodes:
1504 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1506 self.needed_locks[locking.LEVEL_NODE] = \
1507 _GetWantedNodes(self, self.op.nodes)
1509 def CheckPrereq(self):
1510 """Check prerequisites.
1512 This checks that the fields required are valid output fields.
1515 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1517 def Exec(self, feedback_fn):
1518 """Computes the list of nodes and their attributes.
1521 nodenames = self.nodes
1522 volumes = rpc.call_node_volumes(nodenames)
1524 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1525 in self.cfg.GetInstanceList()]
1527 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1530 for node in nodenames:
1531 if node not in volumes or not volumes[node]:
1534 node_vols = volumes[node][:]
1535 node_vols.sort(key=lambda vol: vol['dev'])
1537 for vol in node_vols:
1539 for field in self.op.output_fields:
1542 elif field == "phys":
1546 elif field == "name":
1548 elif field == "size":
1549 val = int(float(vol['size']))
1550 elif field == "instance":
1552 if node not in lv_by_node[inst]:
1554 if vol['name'] in lv_by_node[inst][node]:
1560 raise errors.ParameterError(field)
1561 node_output.append(str(val))
1563 output.append(node_output)
1568 class LUAddNode(LogicalUnit):
1569 """Logical unit for adding node to the cluster.
1573 HTYPE = constants.HTYPE_NODE
1574 _OP_REQP = ["node_name"]
1576 def BuildHooksEnv(self):
1579 This will run on all nodes before, and on all nodes + the new node after.
1583 "OP_TARGET": self.op.node_name,
1584 "NODE_NAME": self.op.node_name,
1585 "NODE_PIP": self.op.primary_ip,
1586 "NODE_SIP": self.op.secondary_ip,
1588 nodes_0 = self.cfg.GetNodeList()
1589 nodes_1 = nodes_0 + [self.op.node_name, ]
1590 return env, nodes_0, nodes_1
1592 def CheckPrereq(self):
1593 """Check prerequisites.
1596 - the new node is not already in the config
1598 - its parameters (single/dual homed) matches the cluster
1600 Any errors are signalled by raising errors.OpPrereqError.
1603 node_name = self.op.node_name
1606 dns_data = utils.HostInfo(node_name)
1608 node = dns_data.name
1609 primary_ip = self.op.primary_ip = dns_data.ip
1610 secondary_ip = getattr(self.op, "secondary_ip", None)
1611 if secondary_ip is None:
1612 secondary_ip = primary_ip
1613 if not utils.IsValidIP(secondary_ip):
1614 raise errors.OpPrereqError("Invalid secondary IP given")
1615 self.op.secondary_ip = secondary_ip
1617 node_list = cfg.GetNodeList()
1618 if not self.op.readd and node in node_list:
1619 raise errors.OpPrereqError("Node %s is already in the configuration" %
1621 elif self.op.readd and node not in node_list:
1622 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1624 for existing_node_name in node_list:
1625 existing_node = cfg.GetNodeInfo(existing_node_name)
1627 if self.op.readd and node == existing_node_name:
1628 if (existing_node.primary_ip != primary_ip or
1629 existing_node.secondary_ip != secondary_ip):
1630 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1631 " address configuration as before")
1634 if (existing_node.primary_ip == primary_ip or
1635 existing_node.secondary_ip == primary_ip or
1636 existing_node.primary_ip == secondary_ip or
1637 existing_node.secondary_ip == secondary_ip):
1638 raise errors.OpPrereqError("New node ip address(es) conflict with"
1639 " existing node %s" % existing_node.name)
1641 # check that the type of the node (single versus dual homed) is the
1642 # same as for the master
1643 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1644 master_singlehomed = myself.secondary_ip == myself.primary_ip
1645 newbie_singlehomed = secondary_ip == primary_ip
1646 if master_singlehomed != newbie_singlehomed:
1647 if master_singlehomed:
1648 raise errors.OpPrereqError("The master has no private ip but the"
1649 " new node has one")
1651 raise errors.OpPrereqError("The master has a private ip but the"
1652 " new node doesn't have one")
1654 # checks reachablity
1655 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1656 raise errors.OpPrereqError("Node not reachable by ping")
1658 if not newbie_singlehomed:
1659 # check reachability from my secondary ip to newbie's secondary ip
1660 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1661 source=myself.secondary_ip):
1662 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1663 " based ping to noded port")
1665 self.new_node = objects.Node(name=node,
1666 primary_ip=primary_ip,
1667 secondary_ip=secondary_ip)
1669 def Exec(self, feedback_fn):
1670 """Adds the new node to the cluster.
1673 new_node = self.new_node
1674 node = new_node.name
1676 # check connectivity
1677 result = rpc.call_version([node])[node]
1679 if constants.PROTOCOL_VERSION == result:
1680 logger.Info("communication to node %s fine, sw version %s match" %
1683 raise errors.OpExecError("Version mismatch master version %s,"
1684 " node version %s" %
1685 (constants.PROTOCOL_VERSION, result))
1687 raise errors.OpExecError("Cannot get version from the new node")
1690 logger.Info("copy ssh key to node %s" % node)
1691 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1693 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1694 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1700 keyarray.append(f.read())
1704 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1705 keyarray[3], keyarray[4], keyarray[5])
1708 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1710 # Add node to our /etc/hosts, and add key to known_hosts
1711 utils.AddHostToEtcHosts(new_node.name)
1713 if new_node.secondary_ip != new_node.primary_ip:
1714 if not rpc.call_node_tcp_ping(new_node.name,
1715 constants.LOCALHOST_IP_ADDRESS,
1716 new_node.secondary_ip,
1717 constants.DEFAULT_NODED_PORT,
1719 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1720 " you gave (%s). Please fix and re-run this"
1721 " command." % new_node.secondary_ip)
1723 node_verify_list = [self.sstore.GetMasterNode()]
1724 node_verify_param = {
1726 # TODO: do a node-net-test as well?
1729 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1730 for verifier in node_verify_list:
1731 if not result[verifier]:
1732 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1733 " for remote verification" % verifier)
1734 if result[verifier]['nodelist']:
1735 for failed in result[verifier]['nodelist']:
1736 feedback_fn("ssh/hostname verification failed %s -> %s" %
1737 (verifier, result[verifier]['nodelist'][failed]))
1738 raise errors.OpExecError("ssh/hostname verification failed.")
1740 # Distribute updated /etc/hosts and known_hosts to all nodes,
1741 # including the node just added
1742 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1743 dist_nodes = self.cfg.GetNodeList()
1744 if not self.op.readd:
1745 dist_nodes.append(node)
1746 if myself.name in dist_nodes:
1747 dist_nodes.remove(myself.name)
1749 logger.Debug("Copying hosts and known_hosts to all nodes")
1750 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1751 result = rpc.call_upload_file(dist_nodes, fname)
1752 for to_node in dist_nodes:
1753 if not result[to_node]:
1754 logger.Error("copy of file %s to node %s failed" %
1757 to_copy = self.sstore.GetFileList()
1758 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1759 to_copy.append(constants.VNC_PASSWORD_FILE)
1760 for fname in to_copy:
1761 result = rpc.call_upload_file([node], fname)
1762 if not result[node]:
1763 logger.Error("could not copy file %s to node %s" % (fname, node))
1766 self.context.ReaddNode(new_node)
1768 self.context.AddNode(new_node)
1771 class LUQueryClusterInfo(NoHooksLU):
1772 """Query cluster configuration.
1779 def ExpandNames(self):
1780 self.needed_locks = {}
1782 def CheckPrereq(self):
1783 """No prerequsites needed for this LU.
1788 def Exec(self, feedback_fn):
1789 """Return cluster config.
1793 "name": self.sstore.GetClusterName(),
1794 "software_version": constants.RELEASE_VERSION,
1795 "protocol_version": constants.PROTOCOL_VERSION,
1796 "config_version": constants.CONFIG_VERSION,
1797 "os_api_version": constants.OS_API_VERSION,
1798 "export_version": constants.EXPORT_VERSION,
1799 "master": self.sstore.GetMasterNode(),
1800 "architecture": (platform.architecture()[0], platform.machine()),
1801 "hypervisor_type": self.sstore.GetHypervisorType(),
1807 class LUDumpClusterConfig(NoHooksLU):
1808 """Return a text-representation of the cluster-config.
1814 def ExpandNames(self):
1815 self.needed_locks = {}
1817 def CheckPrereq(self):
1818 """No prerequisites.
1823 def Exec(self, feedback_fn):
1824 """Dump a representation of the cluster config to the standard output.
1827 return self.cfg.DumpConfig()
1830 class LUActivateInstanceDisks(NoHooksLU):
1831 """Bring up an instance's disks.
1834 _OP_REQP = ["instance_name"]
1836 def CheckPrereq(self):
1837 """Check prerequisites.
1839 This checks that the instance is in the cluster.
1842 instance = self.cfg.GetInstanceInfo(
1843 self.cfg.ExpandInstanceName(self.op.instance_name))
1844 if instance is None:
1845 raise errors.OpPrereqError("Instance '%s' not known" %
1846 self.op.instance_name)
1847 self.instance = instance
1850 def Exec(self, feedback_fn):
1851 """Activate the disks.
1854 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1856 raise errors.OpExecError("Cannot activate block devices")
1861 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1862 """Prepare the block devices for an instance.
1864 This sets up the block devices on all nodes.
1867 instance: a ganeti.objects.Instance object
1868 ignore_secondaries: if true, errors on secondary nodes won't result
1869 in an error return from the function
1872 false if the operation failed
1873 list of (host, instance_visible_name, node_visible_name) if the operation
1874 suceeded with the mapping from node devices to instance devices
1878 iname = instance.name
1879 # With the two passes mechanism we try to reduce the window of
1880 # opportunity for the race condition of switching DRBD to primary
1881 # before handshaking occured, but we do not eliminate it
1883 # The proper fix would be to wait (with some limits) until the
1884 # connection has been made and drbd transitions from WFConnection
1885 # into any other network-connected state (Connected, SyncTarget,
1888 # 1st pass, assemble on all nodes in secondary mode
1889 for inst_disk in instance.disks:
1890 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1891 cfg.SetDiskID(node_disk, node)
1892 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1894 logger.Error("could not prepare block device %s on node %s"
1895 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1896 if not ignore_secondaries:
1899 # FIXME: race condition on drbd migration to primary
1901 # 2nd pass, do only the primary node
1902 for inst_disk in instance.disks:
1903 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1904 if node != instance.primary_node:
1906 cfg.SetDiskID(node_disk, node)
1907 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1909 logger.Error("could not prepare block device %s on node %s"
1910 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1912 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1914 # leave the disks configured for the primary node
1915 # this is a workaround that would be fixed better by
1916 # improving the logical/physical id handling
1917 for disk in instance.disks:
1918 cfg.SetDiskID(disk, instance.primary_node)
1920 return disks_ok, device_info
1923 def _StartInstanceDisks(cfg, instance, force):
1924 """Start the disks of an instance.
1927 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1928 ignore_secondaries=force)
1930 _ShutdownInstanceDisks(instance, cfg)
1931 if force is not None and not force:
1932 logger.Error("If the message above refers to a secondary node,"
1933 " you can retry the operation using '--force'.")
1934 raise errors.OpExecError("Disk consistency error")
1937 class LUDeactivateInstanceDisks(NoHooksLU):
1938 """Shutdown an instance's disks.
1941 _OP_REQP = ["instance_name"]
1943 def CheckPrereq(self):
1944 """Check prerequisites.
1946 This checks that the instance is in the cluster.
1949 instance = self.cfg.GetInstanceInfo(
1950 self.cfg.ExpandInstanceName(self.op.instance_name))
1951 if instance is None:
1952 raise errors.OpPrereqError("Instance '%s' not known" %
1953 self.op.instance_name)
1954 self.instance = instance
1956 def Exec(self, feedback_fn):
1957 """Deactivate the disks
1960 instance = self.instance
1961 ins_l = rpc.call_instance_list([instance.primary_node])
1962 ins_l = ins_l[instance.primary_node]
1963 if not type(ins_l) is list:
1964 raise errors.OpExecError("Can't contact node '%s'" %
1965 instance.primary_node)
1967 if self.instance.name in ins_l:
1968 raise errors.OpExecError("Instance is running, can't shutdown"
1971 _ShutdownInstanceDisks(instance, self.cfg)
1974 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1975 """Shutdown block devices of an instance.
1977 This does the shutdown on all nodes of the instance.
1979 If the ignore_primary is false, errors on the primary node are
1984 for disk in instance.disks:
1985 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1986 cfg.SetDiskID(top_disk, node)
1987 if not rpc.call_blockdev_shutdown(node, top_disk):
1988 logger.Error("could not shutdown block device %s on node %s" %
1989 (disk.iv_name, node))
1990 if not ignore_primary or node != instance.primary_node:
1995 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1996 """Checks if a node has enough free memory.
1998 This function check if a given node has the needed amount of free
1999 memory. In case the node has less memory or we cannot get the
2000 information from the node, this function raise an OpPrereqError
2004 - cfg: a ConfigWriter instance
2005 - node: the node name
2006 - reason: string to use in the error message
2007 - requested: the amount of memory in MiB
2010 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2011 if not nodeinfo or not isinstance(nodeinfo, dict):
2012 raise errors.OpPrereqError("Could not contact node %s for resource"
2013 " information" % (node,))
2015 free_mem = nodeinfo[node].get('memory_free')
2016 if not isinstance(free_mem, int):
2017 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2018 " was '%s'" % (node, free_mem))
2019 if requested > free_mem:
2020 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2021 " needed %s MiB, available %s MiB" %
2022 (node, reason, requested, free_mem))
2025 class LUStartupInstance(LogicalUnit):
2026 """Starts an instance.
2029 HPATH = "instance-start"
2030 HTYPE = constants.HTYPE_INSTANCE
2031 _OP_REQP = ["instance_name", "force"]
2034 def ExpandNames(self):
2035 self._ExpandAndLockInstance()
2036 self.needed_locks[locking.LEVEL_NODE] = []
2037 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2039 def DeclareLocks(self, level):
2040 if level == locking.LEVEL_NODE:
2041 self._LockInstancesNodes()
2043 def BuildHooksEnv(self):
2046 This runs on master, primary and secondary nodes of the instance.
2050 "FORCE": self.op.force,
2052 env.update(_BuildInstanceHookEnvByObject(self.instance))
2053 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2054 list(self.instance.secondary_nodes))
2057 def CheckPrereq(self):
2058 """Check prerequisites.
2060 This checks that the instance is in the cluster.
2063 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2064 assert self.instance is not None, \
2065 "Cannot retrieve locked instance %s" % self.op.instance_name
2067 # check bridges existance
2068 _CheckInstanceBridgesExist(instance)
2070 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2071 "starting instance %s" % instance.name,
2074 def Exec(self, feedback_fn):
2075 """Start the instance.
2078 instance = self.instance
2079 force = self.op.force
2080 extra_args = getattr(self.op, "extra_args", "")
2082 self.cfg.MarkInstanceUp(instance.name)
2084 node_current = instance.primary_node
2086 _StartInstanceDisks(self.cfg, instance, force)
2088 if not rpc.call_instance_start(node_current, instance, extra_args):
2089 _ShutdownInstanceDisks(instance, self.cfg)
2090 raise errors.OpExecError("Could not start instance")
2093 class LURebootInstance(LogicalUnit):
2094 """Reboot an instance.
2097 HPATH = "instance-reboot"
2098 HTYPE = constants.HTYPE_INSTANCE
2099 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2102 def ExpandNames(self):
2103 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2104 constants.INSTANCE_REBOOT_HARD,
2105 constants.INSTANCE_REBOOT_FULL]:
2106 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2107 (constants.INSTANCE_REBOOT_SOFT,
2108 constants.INSTANCE_REBOOT_HARD,
2109 constants.INSTANCE_REBOOT_FULL))
2110 self._ExpandAndLockInstance()
2111 self.needed_locks[locking.LEVEL_NODE] = []
2112 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2114 def DeclareLocks(self, level):
2115 if level == locking.LEVEL_NODE:
2116 # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2117 self._LockInstancesNodes()
2119 def BuildHooksEnv(self):
2122 This runs on master, primary and secondary nodes of the instance.
2126 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2128 env.update(_BuildInstanceHookEnvByObject(self.instance))
2129 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130 list(self.instance.secondary_nodes))
2133 def CheckPrereq(self):
2134 """Check prerequisites.
2136 This checks that the instance is in the cluster.
2139 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2140 assert self.instance is not None, \
2141 "Cannot retrieve locked instance %s" % self.op.instance_name
2143 # check bridges existance
2144 _CheckInstanceBridgesExist(instance)
2146 def Exec(self, feedback_fn):
2147 """Reboot the instance.
2150 instance = self.instance
2151 ignore_secondaries = self.op.ignore_secondaries
2152 reboot_type = self.op.reboot_type
2153 extra_args = getattr(self.op, "extra_args", "")
2155 node_current = instance.primary_node
2157 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2158 constants.INSTANCE_REBOOT_HARD]:
2159 if not rpc.call_instance_reboot(node_current, instance,
2160 reboot_type, extra_args):
2161 raise errors.OpExecError("Could not reboot instance")
2163 if not rpc.call_instance_shutdown(node_current, instance):
2164 raise errors.OpExecError("could not shutdown instance for full reboot")
2165 _ShutdownInstanceDisks(instance, self.cfg)
2166 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2167 if not rpc.call_instance_start(node_current, instance, extra_args):
2168 _ShutdownInstanceDisks(instance, self.cfg)
2169 raise errors.OpExecError("Could not start instance for full reboot")
2171 self.cfg.MarkInstanceUp(instance.name)
2174 class LUShutdownInstance(LogicalUnit):
2175 """Shutdown an instance.
2178 HPATH = "instance-stop"
2179 HTYPE = constants.HTYPE_INSTANCE
2180 _OP_REQP = ["instance_name"]
2183 def ExpandNames(self):
2184 self._ExpandAndLockInstance()
2185 self.needed_locks[locking.LEVEL_NODE] = []
2186 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2188 def DeclareLocks(self, level):
2189 if level == locking.LEVEL_NODE:
2190 self._LockInstancesNodes()
2192 def BuildHooksEnv(self):
2195 This runs on master, primary and secondary nodes of the instance.
2198 env = _BuildInstanceHookEnvByObject(self.instance)
2199 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2200 list(self.instance.secondary_nodes))
2203 def CheckPrereq(self):
2204 """Check prerequisites.
2206 This checks that the instance is in the cluster.
2209 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2210 assert self.instance is not None, \
2211 "Cannot retrieve locked instance %s" % self.op.instance_name
2213 def Exec(self, feedback_fn):
2214 """Shutdown the instance.
2217 instance = self.instance
2218 node_current = instance.primary_node
2219 self.cfg.MarkInstanceDown(instance.name)
2220 if not rpc.call_instance_shutdown(node_current, instance):
2221 logger.Error("could not shutdown instance")
2223 _ShutdownInstanceDisks(instance, self.cfg)
2226 class LUReinstallInstance(LogicalUnit):
2227 """Reinstall an instance.
2230 HPATH = "instance-reinstall"
2231 HTYPE = constants.HTYPE_INSTANCE
2232 _OP_REQP = ["instance_name"]
2235 def ExpandNames(self):
2236 self._ExpandAndLockInstance()
2237 self.needed_locks[locking.LEVEL_NODE] = []
2238 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2240 def DeclareLocks(self, level):
2241 if level == locking.LEVEL_NODE:
2242 self._LockInstancesNodes()
2244 def BuildHooksEnv(self):
2247 This runs on master, primary and secondary nodes of the instance.
2250 env = _BuildInstanceHookEnvByObject(self.instance)
2251 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2252 list(self.instance.secondary_nodes))
2255 def CheckPrereq(self):
2256 """Check prerequisites.
2258 This checks that the instance is in the cluster and is not running.
2261 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2262 assert instance is not None, \
2263 "Cannot retrieve locked instance %s" % self.op.instance_name
2265 if instance.disk_template == constants.DT_DISKLESS:
2266 raise errors.OpPrereqError("Instance '%s' has no disks" %
2267 self.op.instance_name)
2268 if instance.status != "down":
2269 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2270 self.op.instance_name)
2271 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2273 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2274 (self.op.instance_name,
2275 instance.primary_node))
2277 self.op.os_type = getattr(self.op, "os_type", None)
2278 if self.op.os_type is not None:
2280 pnode = self.cfg.GetNodeInfo(
2281 self.cfg.ExpandNodeName(instance.primary_node))
2283 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2285 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2287 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2288 " primary node" % self.op.os_type)
2290 self.instance = instance
2292 def Exec(self, feedback_fn):
2293 """Reinstall the instance.
2296 inst = self.instance
2298 if self.op.os_type is not None:
2299 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2300 inst.os = self.op.os_type
2301 self.cfg.AddInstance(inst)
2303 _StartInstanceDisks(self.cfg, inst, None)
2305 feedback_fn("Running the instance OS create scripts...")
2306 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2307 raise errors.OpExecError("Could not install OS for instance %s"
2309 (inst.name, inst.primary_node))
2311 _ShutdownInstanceDisks(inst, self.cfg)
2314 class LURenameInstance(LogicalUnit):
2315 """Rename an instance.
2318 HPATH = "instance-rename"
2319 HTYPE = constants.HTYPE_INSTANCE
2320 _OP_REQP = ["instance_name", "new_name"]
2322 def BuildHooksEnv(self):
2325 This runs on master, primary and secondary nodes of the instance.
2328 env = _BuildInstanceHookEnvByObject(self.instance)
2329 env["INSTANCE_NEW_NAME"] = self.op.new_name
2330 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2331 list(self.instance.secondary_nodes))
2334 def CheckPrereq(self):
2335 """Check prerequisites.
2337 This checks that the instance is in the cluster and is not running.
2340 instance = self.cfg.GetInstanceInfo(
2341 self.cfg.ExpandInstanceName(self.op.instance_name))
2342 if instance is None:
2343 raise errors.OpPrereqError("Instance '%s' not known" %
2344 self.op.instance_name)
2345 if instance.status != "down":
2346 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2347 self.op.instance_name)
2348 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2350 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2351 (self.op.instance_name,
2352 instance.primary_node))
2353 self.instance = instance
2355 # new name verification
2356 name_info = utils.HostInfo(self.op.new_name)
2358 self.op.new_name = new_name = name_info.name
2359 instance_list = self.cfg.GetInstanceList()
2360 if new_name in instance_list:
2361 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2364 if not getattr(self.op, "ignore_ip", False):
2365 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2366 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2367 (name_info.ip, new_name))
2370 def Exec(self, feedback_fn):
2371 """Reinstall the instance.
2374 inst = self.instance
2375 old_name = inst.name
2377 if inst.disk_template == constants.DT_FILE:
2378 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2380 self.cfg.RenameInstance(inst.name, self.op.new_name)
2381 # Change the instance lock. This is definitely safe while we hold the BGL
2382 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2383 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2385 # re-read the instance from the configuration after rename
2386 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2388 if inst.disk_template == constants.DT_FILE:
2389 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2390 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2391 old_file_storage_dir,
2392 new_file_storage_dir)
2395 raise errors.OpExecError("Could not connect to node '%s' to rename"
2396 " directory '%s' to '%s' (but the instance"
2397 " has been renamed in Ganeti)" % (
2398 inst.primary_node, old_file_storage_dir,
2399 new_file_storage_dir))
2402 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2403 " (but the instance has been renamed in"
2404 " Ganeti)" % (old_file_storage_dir,
2405 new_file_storage_dir))
2407 _StartInstanceDisks(self.cfg, inst, None)
2409 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2411 msg = ("Could not run OS rename script for instance %s on node %s"
2412 " (but the instance has been renamed in Ganeti)" %
2413 (inst.name, inst.primary_node))
2416 _ShutdownInstanceDisks(inst, self.cfg)
2419 class LURemoveInstance(LogicalUnit):
2420 """Remove an instance.
2423 HPATH = "instance-remove"
2424 HTYPE = constants.HTYPE_INSTANCE
2425 _OP_REQP = ["instance_name", "ignore_failures"]
2427 def BuildHooksEnv(self):
2430 This runs on master, primary and secondary nodes of the instance.
2433 env = _BuildInstanceHookEnvByObject(self.instance)
2434 nl = [self.sstore.GetMasterNode()]
2437 def CheckPrereq(self):
2438 """Check prerequisites.
2440 This checks that the instance is in the cluster.
2443 instance = self.cfg.GetInstanceInfo(
2444 self.cfg.ExpandInstanceName(self.op.instance_name))
2445 if instance is None:
2446 raise errors.OpPrereqError("Instance '%s' not known" %
2447 self.op.instance_name)
2448 self.instance = instance
2450 def Exec(self, feedback_fn):
2451 """Remove the instance.
2454 instance = self.instance
2455 logger.Info("shutting down instance %s on node %s" %
2456 (instance.name, instance.primary_node))
2458 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2459 if self.op.ignore_failures:
2460 feedback_fn("Warning: can't shutdown instance")
2462 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2463 (instance.name, instance.primary_node))
2465 logger.Info("removing block devices for instance %s" % instance.name)
2467 if not _RemoveDisks(instance, self.cfg):
2468 if self.op.ignore_failures:
2469 feedback_fn("Warning: can't remove instance's disks")
2471 raise errors.OpExecError("Can't remove instance's disks")
2473 logger.Info("removing instance %s out of cluster config" % instance.name)
2475 self.cfg.RemoveInstance(instance.name)
2476 # Remove the new instance from the Ganeti Lock Manager
2477 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2480 class LUQueryInstances(NoHooksLU):
2481 """Logical unit for querying instances.
2484 _OP_REQP = ["output_fields", "names"]
2487 def ExpandNames(self):
2488 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2489 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2490 "admin_state", "admin_ram",
2491 "disk_template", "ip", "mac", "bridge",
2492 "sda_size", "sdb_size", "vcpus", "tags",
2494 "network_port", "kernel_path", "initrd_path",
2495 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2496 "hvm_cdrom_image_path", "hvm_nic_type",
2497 "hvm_disk_type", "vnc_bind_address"],
2498 dynamic=self.dynamic_fields,
2499 selected=self.op.output_fields)
2501 self.needed_locks = {}
2502 self.share_locks[locking.LEVEL_INSTANCE] = 1
2503 self.share_locks[locking.LEVEL_NODE] = 1
2505 # TODO: we could lock instances (and nodes) only if the user asked for
2506 # dynamic fields. For that we need atomic ways to get info for a group of
2507 # instances from the config, though.
2508 if not self.op.names:
2509 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2511 self.needed_locks[locking.LEVEL_INSTANCE] = \
2512 _GetWantedInstances(self, self.op.names)
2514 self.needed_locks[locking.LEVEL_NODE] = []
2515 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2517 def DeclareLocks(self, level):
2518 # TODO: locking of nodes could be avoided when not querying them
2519 if level == locking.LEVEL_NODE:
2520 self._LockInstancesNodes()
2522 def CheckPrereq(self):
2523 """Check prerequisites.
2526 # This of course is valid only if we locked the instances
2527 self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2529 def Exec(self, feedback_fn):
2530 """Computes the list of nodes and their attributes.
2533 instance_names = self.wanted
2534 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2537 # begin data gathering
2539 nodes = frozenset([inst.primary_node for inst in instance_list])
2542 if self.dynamic_fields.intersection(self.op.output_fields):
2544 node_data = rpc.call_all_instances_info(nodes)
2546 result = node_data[name]
2548 live_data.update(result)
2549 elif result == False:
2550 bad_nodes.append(name)
2551 # else no instance is alive
2553 live_data = dict([(name, {}) for name in instance_names])
2555 # end data gathering
2558 for instance in instance_list:
2560 for field in self.op.output_fields:
2565 elif field == "pnode":
2566 val = instance.primary_node
2567 elif field == "snodes":
2568 val = list(instance.secondary_nodes)
2569 elif field == "admin_state":
2570 val = (instance.status != "down")
2571 elif field == "oper_state":
2572 if instance.primary_node in bad_nodes:
2575 val = bool(live_data.get(instance.name))
2576 elif field == "status":
2577 if instance.primary_node in bad_nodes:
2578 val = "ERROR_nodedown"
2580 running = bool(live_data.get(instance.name))
2582 if instance.status != "down":
2587 if instance.status != "down":
2591 elif field == "admin_ram":
2592 val = instance.memory
2593 elif field == "oper_ram":
2594 if instance.primary_node in bad_nodes:
2596 elif instance.name in live_data:
2597 val = live_data[instance.name].get("memory", "?")
2600 elif field == "disk_template":
2601 val = instance.disk_template
2603 val = instance.nics[0].ip
2604 elif field == "bridge":
2605 val = instance.nics[0].bridge
2606 elif field == "mac":
2607 val = instance.nics[0].mac
2608 elif field == "sda_size" or field == "sdb_size":
2609 disk = instance.FindDisk(field[:3])
2614 elif field == "vcpus":
2615 val = instance.vcpus
2616 elif field == "tags":
2617 val = list(instance.GetTags())
2618 elif field in ("network_port", "kernel_path", "initrd_path",
2619 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2620 "hvm_cdrom_image_path", "hvm_nic_type",
2621 "hvm_disk_type", "vnc_bind_address"):
2622 val = getattr(instance, field, None)
2625 elif field in ("hvm_nic_type", "hvm_disk_type",
2626 "kernel_path", "initrd_path"):
2631 raise errors.ParameterError(field)
2638 class LUFailoverInstance(LogicalUnit):
2639 """Failover an instance.
2642 HPATH = "instance-failover"
2643 HTYPE = constants.HTYPE_INSTANCE
2644 _OP_REQP = ["instance_name", "ignore_consistency"]
2647 def ExpandNames(self):
2648 self._ExpandAndLockInstance()
2649 self.needed_locks[locking.LEVEL_NODE] = []
2650 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2652 def DeclareLocks(self, level):
2653 if level == locking.LEVEL_NODE:
2654 self._LockInstancesNodes()
2656 def BuildHooksEnv(self):
2659 This runs on master, primary and secondary nodes of the instance.
2663 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2665 env.update(_BuildInstanceHookEnvByObject(self.instance))
2666 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2669 def CheckPrereq(self):
2670 """Check prerequisites.
2672 This checks that the instance is in the cluster.
2675 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2676 assert self.instance is not None, \
2677 "Cannot retrieve locked instance %s" % self.op.instance_name
2679 if instance.disk_template not in constants.DTS_NET_MIRROR:
2680 raise errors.OpPrereqError("Instance's disk layout is not"
2681 " network mirrored, cannot failover.")
2683 secondary_nodes = instance.secondary_nodes
2684 if not secondary_nodes:
2685 raise errors.ProgrammerError("no secondary node but using "
2686 "a mirrored disk template")
2688 target_node = secondary_nodes[0]
2689 # check memory requirements on the secondary node
2690 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2691 instance.name, instance.memory)
2693 # check bridge existance
2694 brlist = [nic.bridge for nic in instance.nics]
2695 if not rpc.call_bridges_exist(target_node, brlist):
2696 raise errors.OpPrereqError("One or more target bridges %s does not"
2697 " exist on destination node '%s'" %
2698 (brlist, target_node))
2700 def Exec(self, feedback_fn):
2701 """Failover an instance.
2703 The failover is done by shutting it down on its present node and
2704 starting it on the secondary.
2707 instance = self.instance
2709 source_node = instance.primary_node
2710 target_node = instance.secondary_nodes[0]
2712 feedback_fn("* checking disk consistency between source and target")
2713 for dev in instance.disks:
2714 # for drbd, these are drbd over lvm
2715 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2716 if instance.status == "up" and not self.op.ignore_consistency:
2717 raise errors.OpExecError("Disk %s is degraded on target node,"
2718 " aborting failover." % dev.iv_name)
2720 feedback_fn("* shutting down instance on source node")
2721 logger.Info("Shutting down instance %s on node %s" %
2722 (instance.name, source_node))
2724 if not rpc.call_instance_shutdown(source_node, instance):
2725 if self.op.ignore_consistency:
2726 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2727 " anyway. Please make sure node %s is down" %
2728 (instance.name, source_node, source_node))
2730 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2731 (instance.name, source_node))
2733 feedback_fn("* deactivating the instance's disks on source node")
2734 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2735 raise errors.OpExecError("Can't shut down the instance's disks.")
2737 instance.primary_node = target_node
2738 # distribute new instance config to the other nodes
2739 self.cfg.Update(instance)
2741 # Only start the instance if it's marked as up
2742 if instance.status == "up":
2743 feedback_fn("* activating the instance's disks on target node")
2744 logger.Info("Starting instance %s on node %s" %
2745 (instance.name, target_node))
2747 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2748 ignore_secondaries=True)
2750 _ShutdownInstanceDisks(instance, self.cfg)
2751 raise errors.OpExecError("Can't activate the instance's disks")
2753 feedback_fn("* starting the instance on the target node")
2754 if not rpc.call_instance_start(target_node, instance, None):
2755 _ShutdownInstanceDisks(instance, self.cfg)
2756 raise errors.OpExecError("Could not start instance %s on node %s." %
2757 (instance.name, target_node))
2760 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2761 """Create a tree of block devices on the primary node.
2763 This always creates all devices.
2767 for child in device.children:
2768 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2771 cfg.SetDiskID(device, node)
2772 new_id = rpc.call_blockdev_create(node, device, device.size,
2773 instance.name, True, info)
2776 if device.physical_id is None:
2777 device.physical_id = new_id
2781 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2782 """Create a tree of block devices on a secondary node.
2784 If this device type has to be created on secondaries, create it and
2787 If not, just recurse to children keeping the same 'force' value.
2790 if device.CreateOnSecondary():
2793 for child in device.children:
2794 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2795 child, force, info):
2800 cfg.SetDiskID(device, node)
2801 new_id = rpc.call_blockdev_create(node, device, device.size,
2802 instance.name, False, info)
2805 if device.physical_id is None:
2806 device.physical_id = new_id
2810 def _GenerateUniqueNames(cfg, exts):
2811 """Generate a suitable LV name.
2813 This will generate a logical volume name for the given instance.
2818 new_id = cfg.GenerateUniqueID()
2819 results.append("%s%s" % (new_id, val))
2823 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2824 """Generate a drbd8 device complete with its children.
2827 port = cfg.AllocatePort()
2828 vgname = cfg.GetVGName()
2829 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2830 logical_id=(vgname, names[0]))
2831 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2832 logical_id=(vgname, names[1]))
2833 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2834 logical_id = (primary, secondary, port),
2835 children = [dev_data, dev_meta],
2840 def _GenerateDiskTemplate(cfg, template_name,
2841 instance_name, primary_node,
2842 secondary_nodes, disk_sz, swap_sz,
2843 file_storage_dir, file_driver):
2844 """Generate the entire disk layout for a given template type.
2847 #TODO: compute space requirements
2849 vgname = cfg.GetVGName()
2850 if template_name == constants.DT_DISKLESS:
2852 elif template_name == constants.DT_PLAIN:
2853 if len(secondary_nodes) != 0:
2854 raise errors.ProgrammerError("Wrong template configuration")
2856 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2857 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2858 logical_id=(vgname, names[0]),
2860 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2861 logical_id=(vgname, names[1]),
2863 disks = [sda_dev, sdb_dev]
2864 elif template_name == constants.DT_DRBD8:
2865 if len(secondary_nodes) != 1:
2866 raise errors.ProgrammerError("Wrong template configuration")
2867 remote_node = secondary_nodes[0]
2868 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2869 ".sdb_data", ".sdb_meta"])
2870 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2871 disk_sz, names[0:2], "sda")
2872 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2873 swap_sz, names[2:4], "sdb")
2874 disks = [drbd_sda_dev, drbd_sdb_dev]
2875 elif template_name == constants.DT_FILE:
2876 if len(secondary_nodes) != 0:
2877 raise errors.ProgrammerError("Wrong template configuration")
2879 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2880 iv_name="sda", logical_id=(file_driver,
2881 "%s/sda" % file_storage_dir))
2882 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2883 iv_name="sdb", logical_id=(file_driver,
2884 "%s/sdb" % file_storage_dir))
2885 disks = [file_sda_dev, file_sdb_dev]
2887 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2891 def _GetInstanceInfoText(instance):
2892 """Compute that text that should be added to the disk's metadata.
2895 return "originstname+%s" % instance.name
2898 def _CreateDisks(cfg, instance):
2899 """Create all disks for an instance.
2901 This abstracts away some work from AddInstance.
2904 instance: the instance object
2907 True or False showing the success of the creation process
2910 info = _GetInstanceInfoText(instance)
2912 if instance.disk_template == constants.DT_FILE:
2913 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2914 result = rpc.call_file_storage_dir_create(instance.primary_node,
2918 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2922 logger.Error("failed to create directory '%s'" % file_storage_dir)
2925 for device in instance.disks:
2926 logger.Info("creating volume %s for instance %s" %
2927 (device.iv_name, instance.name))
2929 for secondary_node in instance.secondary_nodes:
2930 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2931 device, False, info):
2932 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2933 (device.iv_name, device, secondary_node))
2936 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2937 instance, device, info):
2938 logger.Error("failed to create volume %s on primary!" %
2945 def _RemoveDisks(instance, cfg):
2946 """Remove all disks for an instance.
2948 This abstracts away some work from `AddInstance()` and
2949 `RemoveInstance()`. Note that in case some of the devices couldn't
2950 be removed, the removal will continue with the other ones (compare
2951 with `_CreateDisks()`).
2954 instance: the instance object
2957 True or False showing the success of the removal proces
2960 logger.Info("removing block devices for instance %s" % instance.name)
2963 for device in instance.disks:
2964 for node, disk in device.ComputeNodeTree(instance.primary_node):
2965 cfg.SetDiskID(disk, node)
2966 if not rpc.call_blockdev_remove(node, disk):
2967 logger.Error("could not remove block device %s on node %s,"
2968 " continuing anyway" %
2969 (device.iv_name, node))
2972 if instance.disk_template == constants.DT_FILE:
2973 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2974 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2976 logger.Error("could not remove directory '%s'" % file_storage_dir)
2982 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2983 """Compute disk size requirements in the volume group
2985 This is currently hard-coded for the two-drive layout.
2988 # Required free disk space as a function of disk and swap space
2990 constants.DT_DISKLESS: None,
2991 constants.DT_PLAIN: disk_size + swap_size,
2992 # 256 MB are added for drbd metadata, 128MB for each drbd device
2993 constants.DT_DRBD8: disk_size + swap_size + 256,
2994 constants.DT_FILE: None,
2997 if disk_template not in req_size_dict:
2998 raise errors.ProgrammerError("Disk template '%s' size requirement"
2999 " is unknown" % disk_template)
3001 return req_size_dict[disk_template]
3004 class LUCreateInstance(LogicalUnit):
3005 """Create an instance.
3008 HPATH = "instance-add"
3009 HTYPE = constants.HTYPE_INSTANCE
3010 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3011 "disk_template", "swap_size", "mode", "start", "vcpus",
3012 "wait_for_sync", "ip_check", "mac"]
3014 def _RunAllocator(self):
3015 """Run the allocator based on input opcode.
3018 disks = [{"size": self.op.disk_size, "mode": "w"},
3019 {"size": self.op.swap_size, "mode": "w"}]
3020 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3021 "bridge": self.op.bridge}]
3022 ial = IAllocator(self.cfg, self.sstore,
3023 mode=constants.IALLOCATOR_MODE_ALLOC,
3024 name=self.op.instance_name,
3025 disk_template=self.op.disk_template,
3028 vcpus=self.op.vcpus,
3029 mem_size=self.op.mem_size,
3034 ial.Run(self.op.iallocator)
3037 raise errors.OpPrereqError("Can't compute nodes using"
3038 " iallocator '%s': %s" % (self.op.iallocator,
3040 if len(ial.nodes) != ial.required_nodes:
3041 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3042 " of nodes (%s), required %s" %
3043 (len(ial.nodes), ial.required_nodes))
3044 self.op.pnode = ial.nodes[0]
3045 logger.ToStdout("Selected nodes for the instance: %s" %
3046 (", ".join(ial.nodes),))
3047 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3048 (self.op.instance_name, self.op.iallocator, ial.nodes))
3049 if ial.required_nodes == 2:
3050 self.op.snode = ial.nodes[1]
3052 def BuildHooksEnv(self):
3055 This runs on master, primary and secondary nodes of the instance.
3059 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3060 "INSTANCE_DISK_SIZE": self.op.disk_size,
3061 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3062 "INSTANCE_ADD_MODE": self.op.mode,
3064 if self.op.mode == constants.INSTANCE_IMPORT:
3065 env["INSTANCE_SRC_NODE"] = self.op.src_node
3066 env["INSTANCE_SRC_PATH"] = self.op.src_path
3067 env["INSTANCE_SRC_IMAGE"] = self.src_image
3069 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3070 primary_node=self.op.pnode,
3071 secondary_nodes=self.secondaries,
3072 status=self.instance_status,
3073 os_type=self.op.os_type,
3074 memory=self.op.mem_size,
3075 vcpus=self.op.vcpus,
3076 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3079 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3084 def CheckPrereq(self):
3085 """Check prerequisites.
3088 # set optional parameters to none if they don't exist
3089 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3090 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3091 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3092 if not hasattr(self.op, attr):
3093 setattr(self.op, attr, None)
3095 if self.op.mode not in (constants.INSTANCE_CREATE,
3096 constants.INSTANCE_IMPORT):
3097 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3100 if (not self.cfg.GetVGName() and
3101 self.op.disk_template not in constants.DTS_NOT_LVM):
3102 raise errors.OpPrereqError("Cluster does not support lvm-based"
3105 if self.op.mode == constants.INSTANCE_IMPORT:
3106 src_node = getattr(self.op, "src_node", None)
3107 src_path = getattr(self.op, "src_path", None)
3108 if src_node is None or src_path is None:
3109 raise errors.OpPrereqError("Importing an instance requires source"
3110 " node and path options")
3111 src_node_full = self.cfg.ExpandNodeName(src_node)
3112 if src_node_full is None:
3113 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3114 self.op.src_node = src_node = src_node_full
3116 if not os.path.isabs(src_path):
3117 raise errors.OpPrereqError("The source path must be absolute")
3119 export_info = rpc.call_export_info(src_node, src_path)
3122 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3124 if not export_info.has_section(constants.INISECT_EXP):
3125 raise errors.ProgrammerError("Corrupted export config")
3127 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3128 if (int(ei_version) != constants.EXPORT_VERSION):
3129 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3130 (ei_version, constants.EXPORT_VERSION))
3132 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3133 raise errors.OpPrereqError("Can't import instance with more than"
3136 # FIXME: are the old os-es, disk sizes, etc. useful?
3137 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3138 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3140 self.src_image = diskimage
3141 else: # INSTANCE_CREATE
3142 if getattr(self.op, "os_type", None) is None:
3143 raise errors.OpPrereqError("No guest OS specified")
3145 #### instance parameters check
3147 # disk template and mirror node verification
3148 if self.op.disk_template not in constants.DISK_TEMPLATES:
3149 raise errors.OpPrereqError("Invalid disk template name")
3151 # instance name verification
3152 hostname1 = utils.HostInfo(self.op.instance_name)
3154 self.op.instance_name = instance_name = hostname1.name
3155 instance_list = self.cfg.GetInstanceList()
3156 if instance_name in instance_list:
3157 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3160 # ip validity checks
3161 ip = getattr(self.op, "ip", None)
3162 if ip is None or ip.lower() == "none":
3164 elif ip.lower() == "auto":
3165 inst_ip = hostname1.ip
3167 if not utils.IsValidIP(ip):
3168 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3169 " like a valid IP" % ip)
3171 self.inst_ip = self.op.ip = inst_ip
3173 if self.op.start and not self.op.ip_check:
3174 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3175 " adding an instance in start mode")
3177 if self.op.ip_check:
3178 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3179 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3180 (hostname1.ip, instance_name))
3182 # MAC address verification
3183 if self.op.mac != "auto":
3184 if not utils.IsValidMac(self.op.mac.lower()):
3185 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3188 # bridge verification
3189 bridge = getattr(self.op, "bridge", None)
3191 self.op.bridge = self.cfg.GetDefBridge()
3193 self.op.bridge = bridge
3195 # boot order verification
3196 if self.op.hvm_boot_order is not None:
3197 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3198 raise errors.OpPrereqError("invalid boot order specified,"
3199 " must be one or more of [acdn]")
3200 # file storage checks
3201 if (self.op.file_driver and
3202 not self.op.file_driver in constants.FILE_DRIVER):
3203 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3204 self.op.file_driver)
3206 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3207 raise errors.OpPrereqError("File storage directory not a relative"
3211 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3212 raise errors.OpPrereqError("One and only one of iallocator and primary"
3213 " node must be given")
3215 if self.op.iallocator is not None:
3216 self._RunAllocator()
3218 #### node related checks
3220 # check primary node
3221 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3223 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3225 self.op.pnode = pnode.name
3227 self.secondaries = []
3229 # mirror node verification
3230 if self.op.disk_template in constants.DTS_NET_MIRROR:
3231 if getattr(self.op, "snode", None) is None:
3232 raise errors.OpPrereqError("The networked disk templates need"
3235 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3236 if snode_name is None:
3237 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3239 elif snode_name == pnode.name:
3240 raise errors.OpPrereqError("The secondary node cannot be"
3241 " the primary node.")
3242 self.secondaries.append(snode_name)
3244 req_size = _ComputeDiskSize(self.op.disk_template,
3245 self.op.disk_size, self.op.swap_size)
3247 # Check lv size requirements
3248 if req_size is not None:
3249 nodenames = [pnode.name] + self.secondaries
3250 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3251 for node in nodenames:
3252 info = nodeinfo.get(node, None)
3254 raise errors.OpPrereqError("Cannot get current information"
3255 " from node '%s'" % node)
3256 vg_free = info.get('vg_free', None)
3257 if not isinstance(vg_free, int):
3258 raise errors.OpPrereqError("Can't compute free disk space on"
3260 if req_size > info['vg_free']:
3261 raise errors.OpPrereqError("Not enough disk space on target node %s."
3262 " %d MB available, %d MB required" %
3263 (node, info['vg_free'], req_size))
3266 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3268 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3269 " primary node" % self.op.os_type)
3271 if self.op.kernel_path == constants.VALUE_NONE:
3272 raise errors.OpPrereqError("Can't set instance kernel to none")
3275 # bridge check on primary node
3276 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3277 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3278 " destination node '%s'" %
3279 (self.op.bridge, pnode.name))
3281 # memory check on primary node
3283 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3284 "creating instance %s" % self.op.instance_name,
3287 # hvm_cdrom_image_path verification
3288 if self.op.hvm_cdrom_image_path is not None:
3289 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3290 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3291 " be an absolute path or None, not %s" %
3292 self.op.hvm_cdrom_image_path)
3293 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3294 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3295 " regular file or a symlink pointing to"
3296 " an existing regular file, not %s" %
3297 self.op.hvm_cdrom_image_path)
3299 # vnc_bind_address verification
3300 if self.op.vnc_bind_address is not None:
3301 if not utils.IsValidIP(self.op.vnc_bind_address):
3302 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3303 " like a valid IP address" %
3304 self.op.vnc_bind_address)
3306 # Xen HVM device type checks
3307 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3308 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3309 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3310 " hypervisor" % self.op.hvm_nic_type)
3311 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3312 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3313 " hypervisor" % self.op.hvm_disk_type)
3316 self.instance_status = 'up'
3318 self.instance_status = 'down'
3320 def Exec(self, feedback_fn):
3321 """Create and add the instance to the cluster.
3324 instance = self.op.instance_name
3325 pnode_name = self.pnode.name
3327 if self.op.mac == "auto":
3328 mac_address = self.cfg.GenerateMAC()
3330 mac_address = self.op.mac
3332 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3333 if self.inst_ip is not None:
3334 nic.ip = self.inst_ip
3336 ht_kind = self.sstore.GetHypervisorType()
3337 if ht_kind in constants.HTS_REQ_PORT:
3338 network_port = self.cfg.AllocatePort()
3342 if self.op.vnc_bind_address is None:
3343 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3345 # this is needed because os.path.join does not accept None arguments
3346 if self.op.file_storage_dir is None:
3347 string_file_storage_dir = ""
3349 string_file_storage_dir = self.op.file_storage_dir
3351 # build the full file storage dir path
3352 file_storage_dir = os.path.normpath(os.path.join(
3353 self.sstore.GetFileStorageDir(),
3354 string_file_storage_dir, instance))
3357 disks = _GenerateDiskTemplate(self.cfg,
3358 self.op.disk_template,
3359 instance, pnode_name,
3360 self.secondaries, self.op.disk_size,
3363 self.op.file_driver)
3365 iobj = objects.Instance(name=instance, os=self.op.os_type,
3366 primary_node=pnode_name,
3367 memory=self.op.mem_size,
3368 vcpus=self.op.vcpus,
3369 nics=[nic], disks=disks,
3370 disk_template=self.op.disk_template,
3371 status=self.instance_status,
3372 network_port=network_port,
3373 kernel_path=self.op.kernel_path,
3374 initrd_path=self.op.initrd_path,
3375 hvm_boot_order=self.op.hvm_boot_order,
3376 hvm_acpi=self.op.hvm_acpi,
3377 hvm_pae=self.op.hvm_pae,
3378 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3379 vnc_bind_address=self.op.vnc_bind_address,
3380 hvm_nic_type=self.op.hvm_nic_type,
3381 hvm_disk_type=self.op.hvm_disk_type,
3384 feedback_fn("* creating instance disks...")
3385 if not _CreateDisks(self.cfg, iobj):
3386 _RemoveDisks(iobj, self.cfg)
3387 raise errors.OpExecError("Device creation failed, reverting...")
3389 feedback_fn("adding instance %s to cluster config" % instance)
3391 self.cfg.AddInstance(iobj)
3392 # Add the new instance to the Ganeti Lock Manager
3393 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3395 if self.op.wait_for_sync:
3396 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3397 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3398 # make sure the disks are not degraded (still sync-ing is ok)
3400 feedback_fn("* checking mirrors status")
3401 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3406 _RemoveDisks(iobj, self.cfg)
3407 self.cfg.RemoveInstance(iobj.name)
3408 # Remove the new instance from the Ganeti Lock Manager
3409 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3410 raise errors.OpExecError("There are some degraded disks for"
3413 feedback_fn("creating os for instance %s on node %s" %
3414 (instance, pnode_name))
3416 if iobj.disk_template != constants.DT_DISKLESS:
3417 if self.op.mode == constants.INSTANCE_CREATE:
3418 feedback_fn("* running the instance OS create scripts...")
3419 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3420 raise errors.OpExecError("could not add os for instance %s"
3422 (instance, pnode_name))
3424 elif self.op.mode == constants.INSTANCE_IMPORT:
3425 feedback_fn("* running the instance OS import scripts...")
3426 src_node = self.op.src_node
3427 src_image = self.src_image
3428 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3429 src_node, src_image):
3430 raise errors.OpExecError("Could not import os for instance"
3432 (instance, pnode_name))
3434 # also checked in the prereq part
3435 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3439 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3440 feedback_fn("* starting instance...")
3441 if not rpc.call_instance_start(pnode_name, iobj, None):
3442 raise errors.OpExecError("Could not start instance")
3445 class LUConnectConsole(NoHooksLU):
3446 """Connect to an instance's console.
3448 This is somewhat special in that it returns the command line that
3449 you need to run on the master node in order to connect to the
3453 _OP_REQP = ["instance_name"]
3456 def ExpandNames(self):
3457 self._ExpandAndLockInstance()
3459 def CheckPrereq(self):
3460 """Check prerequisites.
3462 This checks that the instance is in the cluster.
3465 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3466 assert self.instance is not None, \
3467 "Cannot retrieve locked instance %s" % self.op.instance_name
3469 def Exec(self, feedback_fn):
3470 """Connect to the console of an instance
3473 instance = self.instance
3474 node = instance.primary_node
3476 node_insts = rpc.call_instance_list([node])[node]
3477 if node_insts is False:
3478 raise errors.OpExecError("Can't connect to node %s." % node)
3480 if instance.name not in node_insts:
3481 raise errors.OpExecError("Instance %s is not running." % instance.name)
3483 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3485 hyper = hypervisor.GetHypervisor()
3486 console_cmd = hyper.GetShellCommandForConsole(instance)
3489 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3492 class LUReplaceDisks(LogicalUnit):
3493 """Replace the disks of an instance.
3496 HPATH = "mirrors-replace"
3497 HTYPE = constants.HTYPE_INSTANCE
3498 _OP_REQP = ["instance_name", "mode", "disks"]
3500 def _RunAllocator(self):
3501 """Compute a new secondary node using an IAllocator.
3504 ial = IAllocator(self.cfg, self.sstore,
3505 mode=constants.IALLOCATOR_MODE_RELOC,
3506 name=self.op.instance_name,
3507 relocate_from=[self.sec_node])
3509 ial.Run(self.op.iallocator)
3512 raise errors.OpPrereqError("Can't compute nodes using"
3513 " iallocator '%s': %s" % (self.op.iallocator,
3515 if len(ial.nodes) != ial.required_nodes:
3516 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3517 " of nodes (%s), required %s" %
3518 (len(ial.nodes), ial.required_nodes))
3519 self.op.remote_node = ial.nodes[0]
3520 logger.ToStdout("Selected new secondary for the instance: %s" %
3521 self.op.remote_node)
3523 def BuildHooksEnv(self):
3526 This runs on the master, the primary and all the secondaries.
3530 "MODE": self.op.mode,
3531 "NEW_SECONDARY": self.op.remote_node,
3532 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3534 env.update(_BuildInstanceHookEnvByObject(self.instance))
3536 self.sstore.GetMasterNode(),
3537 self.instance.primary_node,
3539 if self.op.remote_node is not None:
3540 nl.append(self.op.remote_node)
3543 def CheckPrereq(self):
3544 """Check prerequisites.
3546 This checks that the instance is in the cluster.
3549 if not hasattr(self.op, "remote_node"):
3550 self.op.remote_node = None
3552 instance = self.cfg.GetInstanceInfo(
3553 self.cfg.ExpandInstanceName(self.op.instance_name))
3554 if instance is None:
3555 raise errors.OpPrereqError("Instance '%s' not known" %
3556 self.op.instance_name)
3557 self.instance = instance
3558 self.op.instance_name = instance.name
3560 if instance.disk_template not in constants.DTS_NET_MIRROR:
3561 raise errors.OpPrereqError("Instance's disk layout is not"
3562 " network mirrored.")
3564 if len(instance.secondary_nodes) != 1:
3565 raise errors.OpPrereqError("The instance has a strange layout,"
3566 " expected one secondary but found %d" %
3567 len(instance.secondary_nodes))
3569 self.sec_node = instance.secondary_nodes[0]
3571 ia_name = getattr(self.op, "iallocator", None)
3572 if ia_name is not None:
3573 if self.op.remote_node is not None:
3574 raise errors.OpPrereqError("Give either the iallocator or the new"
3575 " secondary, not both")
3576 self.op.remote_node = self._RunAllocator()
3578 remote_node = self.op.remote_node
3579 if remote_node is not None:
3580 remote_node = self.cfg.ExpandNodeName(remote_node)
3581 if remote_node is None:
3582 raise errors.OpPrereqError("Node '%s' not known" %
3583 self.op.remote_node)
3584 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3586 self.remote_node_info = None
3587 if remote_node == instance.primary_node:
3588 raise errors.OpPrereqError("The specified node is the primary node of"
3590 elif remote_node == self.sec_node:
3591 if self.op.mode == constants.REPLACE_DISK_SEC:
3592 # this is for DRBD8, where we can't execute the same mode of
3593 # replacement as for drbd7 (no different port allocated)
3594 raise errors.OpPrereqError("Same secondary given, cannot execute"
3596 if instance.disk_template == constants.DT_DRBD8:
3597 if (self.op.mode == constants.REPLACE_DISK_ALL and
3598 remote_node is not None):
3599 # switch to replace secondary mode
3600 self.op.mode = constants.REPLACE_DISK_SEC
3602 if self.op.mode == constants.REPLACE_DISK_ALL:
3603 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3604 " secondary disk replacement, not"
3606 elif self.op.mode == constants.REPLACE_DISK_PRI:
3607 if remote_node is not None:
3608 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3609 " the secondary while doing a primary"
3610 " node disk replacement")
3611 self.tgt_node = instance.primary_node
3612 self.oth_node = instance.secondary_nodes[0]
3613 elif self.op.mode == constants.REPLACE_DISK_SEC:
3614 self.new_node = remote_node # this can be None, in which case
3615 # we don't change the secondary
3616 self.tgt_node = instance.secondary_nodes[0]
3617 self.oth_node = instance.primary_node
3619 raise errors.ProgrammerError("Unhandled disk replace mode")
3621 for name in self.op.disks:
3622 if instance.FindDisk(name) is None:
3623 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3624 (name, instance.name))
3625 self.op.remote_node = remote_node
3627 def _ExecD8DiskOnly(self, feedback_fn):
3628 """Replace a disk on the primary or secondary for dbrd8.
3630 The algorithm for replace is quite complicated:
3631 - for each disk to be replaced:
3632 - create new LVs on the target node with unique names
3633 - detach old LVs from the drbd device
3634 - rename old LVs to name_replaced.<time_t>
3635 - rename new LVs to old LVs
3636 - attach the new LVs (with the old names now) to the drbd device
3637 - wait for sync across all devices
3638 - for each modified disk:
3639 - remove old LVs (which have the name name_replaces.<time_t>)
3641 Failures are not very well handled.
3645 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3646 instance = self.instance
3648 vgname = self.cfg.GetVGName()
3651 tgt_node = self.tgt_node
3652 oth_node = self.oth_node
3654 # Step: check device activation
3655 self.proc.LogStep(1, steps_total, "check device existence")
3656 info("checking volume groups")
3657 my_vg = cfg.GetVGName()
3658 results = rpc.call_vg_list([oth_node, tgt_node])
3660 raise errors.OpExecError("Can't list volume groups on the nodes")
3661 for node in oth_node, tgt_node:
3662 res = results.get(node, False)
3663 if not res or my_vg not in res:
3664 raise errors.OpExecError("Volume group '%s' not found on %s" %
3666 for dev in instance.disks:
3667 if not dev.iv_name in self.op.disks:
3669 for node in tgt_node, oth_node:
3670 info("checking %s on %s" % (dev.iv_name, node))
3671 cfg.SetDiskID(dev, node)
3672 if not rpc.call_blockdev_find(node, dev):
3673 raise errors.OpExecError("Can't find device %s on node %s" %
3674 (dev.iv_name, node))
3676 # Step: check other node consistency
3677 self.proc.LogStep(2, steps_total, "check peer consistency")
3678 for dev in instance.disks:
3679 if not dev.iv_name in self.op.disks:
3681 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3682 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3683 oth_node==instance.primary_node):
3684 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3685 " to replace disks on this node (%s)" %
3686 (oth_node, tgt_node))
3688 # Step: create new storage
3689 self.proc.LogStep(3, steps_total, "allocate new storage")
3690 for dev in instance.disks:
3691 if not dev.iv_name in self.op.disks:
3694 cfg.SetDiskID(dev, tgt_node)
3695 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3696 names = _GenerateUniqueNames(cfg, lv_names)
3697 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3698 logical_id=(vgname, names[0]))
3699 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3700 logical_id=(vgname, names[1]))
3701 new_lvs = [lv_data, lv_meta]
3702 old_lvs = dev.children
3703 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3704 info("creating new local storage on %s for %s" %
3705 (tgt_node, dev.iv_name))
3706 # since we *always* want to create this LV, we use the
3707 # _Create...OnPrimary (which forces the creation), even if we
3708 # are talking about the secondary node
3709 for new_lv in new_lvs:
3710 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3711 _GetInstanceInfoText(instance)):
3712 raise errors.OpExecError("Failed to create new LV named '%s' on"
3714 (new_lv.logical_id[1], tgt_node))
3716 # Step: for each lv, detach+rename*2+attach
3717 self.proc.LogStep(4, steps_total, "change drbd configuration")
3718 for dev, old_lvs, new_lvs in iv_names.itervalues():
3719 info("detaching %s drbd from local storage" % dev.iv_name)
3720 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3721 raise errors.OpExecError("Can't detach drbd from local storage on node"
3722 " %s for device %s" % (tgt_node, dev.iv_name))
3724 #cfg.Update(instance)
3726 # ok, we created the new LVs, so now we know we have the needed
3727 # storage; as such, we proceed on the target node to rename
3728 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3729 # using the assumption that logical_id == physical_id (which in
3730 # turn is the unique_id on that node)
3732 # FIXME(iustin): use a better name for the replaced LVs
3733 temp_suffix = int(time.time())
3734 ren_fn = lambda d, suff: (d.physical_id[0],
3735 d.physical_id[1] + "_replaced-%s" % suff)
3736 # build the rename list based on what LVs exist on the node
3738 for to_ren in old_lvs:
3739 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3740 if find_res is not None: # device exists
3741 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3743 info("renaming the old LVs on the target node")
3744 if not rpc.call_blockdev_rename(tgt_node, rlist):
3745 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3746 # now we rename the new LVs to the old LVs
3747 info("renaming the new LVs on the target node")
3748 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3749 if not rpc.call_blockdev_rename(tgt_node, rlist):
3750 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3752 for old, new in zip(old_lvs, new_lvs):
3753 new.logical_id = old.logical_id
3754 cfg.SetDiskID(new, tgt_node)
3756 for disk in old_lvs:
3757 disk.logical_id = ren_fn(disk, temp_suffix)
3758 cfg.SetDiskID(disk, tgt_node)
3760 # now that the new lvs have the old name, we can add them to the device
3761 info("adding new mirror component on %s" % tgt_node)
3762 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3763 for new_lv in new_lvs:
3764 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3765 warning("Can't rollback device %s", hint="manually cleanup unused"
3767 raise errors.OpExecError("Can't add local storage to drbd")
3769 dev.children = new_lvs
3770 cfg.Update(instance)
3772 # Step: wait for sync
3774 # this can fail as the old devices are degraded and _WaitForSync
3775 # does a combined result over all disks, so we don't check its
3777 self.proc.LogStep(5, steps_total, "sync devices")
3778 _WaitForSync(cfg, instance, self.proc, unlock=True)
3780 # so check manually all the devices
3781 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3782 cfg.SetDiskID(dev, instance.primary_node)
3783 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3785 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3787 # Step: remove old storage
3788 self.proc.LogStep(6, steps_total, "removing old storage")
3789 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3790 info("remove logical volumes for %s" % name)
3792 cfg.SetDiskID(lv, tgt_node)
3793 if not rpc.call_blockdev_remove(tgt_node, lv):
3794 warning("Can't remove old LV", hint="manually remove unused LVs")
3797 def _ExecD8Secondary(self, feedback_fn):
3798 """Replace the secondary node for drbd8.
3800 The algorithm for replace is quite complicated:
3801 - for all disks of the instance:
3802 - create new LVs on the new node with same names
3803 - shutdown the drbd device on the old secondary
3804 - disconnect the drbd network on the primary
3805 - create the drbd device on the new secondary
3806 - network attach the drbd on the primary, using an artifice:
3807 the drbd code for Attach() will connect to the network if it
3808 finds a device which is connected to the good local disks but
3810 - wait for sync across all devices
3811 - remove all disks from the old secondary
3813 Failures are not very well handled.
3817 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3818 instance = self.instance
3820 vgname = self.cfg.GetVGName()
3823 old_node = self.tgt_node
3824 new_node = self.new_node
3825 pri_node = instance.primary_node
3827 # Step: check device activation
3828 self.proc.LogStep(1, steps_total, "check device existence")
3829 info("checking volume groups")
3830 my_vg = cfg.GetVGName()
3831 results = rpc.call_vg_list([pri_node, new_node])
3833 raise errors.OpExecError("Can't list volume groups on the nodes")
3834 for node in pri_node, new_node:
3835 res = results.get(node, False)
3836 if not res or my_vg not in res:
3837 raise errors.OpExecError("Volume group '%s' not found on %s" %
3839 for dev in instance.disks:
3840 if not dev.iv_name in self.op.disks:
3842 info("checking %s on %s" % (dev.iv_name, pri_node))
3843 cfg.SetDiskID(dev, pri_node)
3844 if not rpc.call_blockdev_find(pri_node, dev):
3845 raise errors.OpExecError("Can't find device %s on node %s" %
3846 (dev.iv_name, pri_node))
3848 # Step: check other node consistency
3849 self.proc.LogStep(2, steps_total, "check peer consistency")
3850 for dev in instance.disks:
3851 if not dev.iv_name in self.op.disks:
3853 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3854 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3855 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3856 " unsafe to replace the secondary" %
3859 # Step: create new storage
3860 self.proc.LogStep(3, steps_total, "allocate new storage")
3861 for dev in instance.disks:
3863 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3864 # since we *always* want to create this LV, we use the
3865 # _Create...OnPrimary (which forces the creation), even if we
3866 # are talking about the secondary node
3867 for new_lv in dev.children:
3868 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3869 _GetInstanceInfoText(instance)):
3870 raise errors.OpExecError("Failed to create new LV named '%s' on"
3872 (new_lv.logical_id[1], new_node))
3874 iv_names[dev.iv_name] = (dev, dev.children)
3876 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3877 for dev in instance.disks:
3879 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3880 # create new devices on new_node
3881 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3882 logical_id=(pri_node, new_node,
3884 children=dev.children)
3885 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3887 _GetInstanceInfoText(instance)):
3888 raise errors.OpExecError("Failed to create new DRBD on"
3889 " node '%s'" % new_node)
3891 for dev in instance.disks:
3892 # we have new devices, shutdown the drbd on the old secondary
3893 info("shutting down drbd for %s on old node" % dev.iv_name)
3894 cfg.SetDiskID(dev, old_node)
3895 if not rpc.call_blockdev_shutdown(old_node, dev):
3896 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3897 hint="Please cleanup this device manually as soon as possible")
3899 info("detaching primary drbds from the network (=> standalone)")
3901 for dev in instance.disks:
3902 cfg.SetDiskID(dev, pri_node)
3903 # set the physical (unique in bdev terms) id to None, meaning
3904 # detach from network
3905 dev.physical_id = (None,) * len(dev.physical_id)
3906 # and 'find' the device, which will 'fix' it to match the
3908 if rpc.call_blockdev_find(pri_node, dev):
3911 warning("Failed to detach drbd %s from network, unusual case" %
3915 # no detaches succeeded (very unlikely)
3916 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3918 # if we managed to detach at least one, we update all the disks of
3919 # the instance to point to the new secondary
3920 info("updating instance configuration")
3921 for dev in instance.disks:
3922 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3923 cfg.SetDiskID(dev, pri_node)
3924 cfg.Update(instance)
3926 # and now perform the drbd attach
3927 info("attaching primary drbds to new secondary (standalone => connected)")
3929 for dev in instance.disks:
3930 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3931 # since the attach is smart, it's enough to 'find' the device,
3932 # it will automatically activate the network, if the physical_id
3934 cfg.SetDiskID(dev, pri_node)
3935 if not rpc.call_blockdev_find(pri_node, dev):
3936 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3937 "please do a gnt-instance info to see the status of disks")
3939 # this can fail as the old devices are degraded and _WaitForSync
3940 # does a combined result over all disks, so we don't check its
3942 self.proc.LogStep(5, steps_total, "sync devices")
3943 _WaitForSync(cfg, instance, self.proc, unlock=True)
3945 # so check manually all the devices
3946 for name, (dev, old_lvs) in iv_names.iteritems():
3947 cfg.SetDiskID(dev, pri_node)
3948 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3950 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3952 self.proc.LogStep(6, steps_total, "removing old storage")
3953 for name, (dev, old_lvs) in iv_names.iteritems():
3954 info("remove logical volumes for %s" % name)
3956 cfg.SetDiskID(lv, old_node)
3957 if not rpc.call_blockdev_remove(old_node, lv):
3958 warning("Can't remove LV on old secondary",
3959 hint="Cleanup stale volumes by hand")
3961 def Exec(self, feedback_fn):
3962 """Execute disk replacement.
3964 This dispatches the disk replacement to the appropriate handler.
3967 instance = self.instance
3969 # Activate the instance disks if we're replacing them on a down instance
3970 if instance.status == "down":
3971 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3972 self.proc.ChainOpCode(op)
3974 if instance.disk_template == constants.DT_DRBD8:
3975 if self.op.remote_node is None:
3976 fn = self._ExecD8DiskOnly
3978 fn = self._ExecD8Secondary
3980 raise errors.ProgrammerError("Unhandled disk replacement case")
3982 ret = fn(feedback_fn)
3984 # Deactivate the instance disks if we're replacing them on a down instance
3985 if instance.status == "down":
3986 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3987 self.proc.ChainOpCode(op)
3992 class LUGrowDisk(LogicalUnit):
3993 """Grow a disk of an instance.
3997 HTYPE = constants.HTYPE_INSTANCE
3998 _OP_REQP = ["instance_name", "disk", "amount"]
4000 def BuildHooksEnv(self):
4003 This runs on the master, the primary and all the secondaries.
4007 "DISK": self.op.disk,
4008 "AMOUNT": self.op.amount,
4010 env.update(_BuildInstanceHookEnvByObject(self.instance))
4012 self.sstore.GetMasterNode(),
4013 self.instance.primary_node,
4017 def CheckPrereq(self):
4018 """Check prerequisites.
4020 This checks that the instance is in the cluster.
4023 instance = self.cfg.GetInstanceInfo(
4024 self.cfg.ExpandInstanceName(self.op.instance_name))
4025 if instance is None:
4026 raise errors.OpPrereqError("Instance '%s' not known" %
4027 self.op.instance_name)
4028 self.instance = instance
4029 self.op.instance_name = instance.name
4031 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4032 raise errors.OpPrereqError("Instance's disk layout does not support"
4035 if instance.FindDisk(self.op.disk) is None:
4036 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4037 (self.op.disk, instance.name))
4039 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4040 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4041 for node in nodenames:
4042 info = nodeinfo.get(node, None)
4044 raise errors.OpPrereqError("Cannot get current information"
4045 " from node '%s'" % node)
4046 vg_free = info.get('vg_free', None)
4047 if not isinstance(vg_free, int):
4048 raise errors.OpPrereqError("Can't compute free disk space on"
4050 if self.op.amount > info['vg_free']:
4051 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4052 " %d MiB available, %d MiB required" %
4053 (node, info['vg_free'], self.op.amount))
4055 def Exec(self, feedback_fn):
4056 """Execute disk grow.
4059 instance = self.instance
4060 disk = instance.FindDisk(self.op.disk)
4061 for node in (instance.secondary_nodes + (instance.primary_node,)):
4062 self.cfg.SetDiskID(disk, node)
4063 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4064 if not result or not isinstance(result, tuple) or len(result) != 2:
4065 raise errors.OpExecError("grow request failed to node %s" % node)
4067 raise errors.OpExecError("grow request failed to node %s: %s" %
4069 disk.RecordGrow(self.op.amount)
4070 self.cfg.Update(instance)
4074 class LUQueryInstanceData(NoHooksLU):
4075 """Query runtime instance data.
4078 _OP_REQP = ["instances"]
4080 def CheckPrereq(self):
4081 """Check prerequisites.
4083 This only checks the optional instance list against the existing names.
4086 if not isinstance(self.op.instances, list):
4087 raise errors.OpPrereqError("Invalid argument type 'instances'")
4088 if self.op.instances:
4089 self.wanted_instances = []
4090 names = self.op.instances
4092 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4093 if instance is None:
4094 raise errors.OpPrereqError("No such instance name '%s'" % name)
4095 self.wanted_instances.append(instance)
4097 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4098 in self.cfg.GetInstanceList()]
4102 def _ComputeDiskStatus(self, instance, snode, dev):
4103 """Compute block device status.
4106 self.cfg.SetDiskID(dev, instance.primary_node)
4107 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4108 if dev.dev_type in constants.LDS_DRBD:
4109 # we change the snode then (otherwise we use the one passed in)
4110 if dev.logical_id[0] == instance.primary_node:
4111 snode = dev.logical_id[1]
4113 snode = dev.logical_id[0]
4116 self.cfg.SetDiskID(dev, snode)
4117 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4122 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4123 for child in dev.children]
4128 "iv_name": dev.iv_name,
4129 "dev_type": dev.dev_type,
4130 "logical_id": dev.logical_id,
4131 "physical_id": dev.physical_id,
4132 "pstatus": dev_pstatus,
4133 "sstatus": dev_sstatus,
4134 "children": dev_children,
4139 def Exec(self, feedback_fn):
4140 """Gather and return data"""
4142 for instance in self.wanted_instances:
4143 remote_info = rpc.call_instance_info(instance.primary_node,
4145 if remote_info and "state" in remote_info:
4148 remote_state = "down"
4149 if instance.status == "down":
4150 config_state = "down"
4154 disks = [self._ComputeDiskStatus(instance, None, device)
4155 for device in instance.disks]
4158 "name": instance.name,
4159 "config_state": config_state,
4160 "run_state": remote_state,
4161 "pnode": instance.primary_node,
4162 "snodes": instance.secondary_nodes,
4164 "memory": instance.memory,
4165 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4167 "vcpus": instance.vcpus,
4170 htkind = self.sstore.GetHypervisorType()
4171 if htkind == constants.HT_XEN_PVM30:
4172 idict["kernel_path"] = instance.kernel_path
4173 idict["initrd_path"] = instance.initrd_path
4175 if htkind == constants.HT_XEN_HVM31:
4176 idict["hvm_boot_order"] = instance.hvm_boot_order
4177 idict["hvm_acpi"] = instance.hvm_acpi
4178 idict["hvm_pae"] = instance.hvm_pae
4179 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4180 idict["hvm_nic_type"] = instance.hvm_nic_type
4181 idict["hvm_disk_type"] = instance.hvm_disk_type
4183 if htkind in constants.HTS_REQ_PORT:
4184 if instance.vnc_bind_address is None:
4185 vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4187 vnc_bind_address = instance.vnc_bind_address
4188 if instance.network_port is None:
4189 vnc_console_port = None
4190 elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4191 vnc_console_port = "%s:%s" % (instance.primary_node,
4192 instance.network_port)
4193 elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4194 vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4195 instance.network_port,
4196 instance.primary_node)
4198 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4199 instance.network_port)
4200 idict["vnc_console_port"] = vnc_console_port
4201 idict["vnc_bind_address"] = vnc_bind_address
4202 idict["network_port"] = instance.network_port
4204 result[instance.name] = idict
4209 class LUSetInstanceParams(LogicalUnit):
4210 """Modifies an instances's parameters.
4213 HPATH = "instance-modify"
4214 HTYPE = constants.HTYPE_INSTANCE
4215 _OP_REQP = ["instance_name"]
4218 def ExpandNames(self):
4219 self._ExpandAndLockInstance()
4221 def BuildHooksEnv(self):
4224 This runs on the master, primary and secondaries.
4229 args['memory'] = self.mem
4231 args['vcpus'] = self.vcpus
4232 if self.do_ip or self.do_bridge or self.mac:
4236 ip = self.instance.nics[0].ip
4238 bridge = self.bridge
4240 bridge = self.instance.nics[0].bridge
4244 mac = self.instance.nics[0].mac
4245 args['nics'] = [(ip, bridge, mac)]
4246 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4247 nl = [self.sstore.GetMasterNode(),
4248 self.instance.primary_node] + list(self.instance.secondary_nodes)
4251 def CheckPrereq(self):
4252 """Check prerequisites.
4254 This only checks the instance list against the existing names.
4257 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4258 # a separate CheckArguments function, if we implement one, so the operation
4259 # can be aborted without waiting for any lock, should it have an error...
4260 self.mem = getattr(self.op, "mem", None)
4261 self.vcpus = getattr(self.op, "vcpus", None)
4262 self.ip = getattr(self.op, "ip", None)
4263 self.mac = getattr(self.op, "mac", None)
4264 self.bridge = getattr(self.op, "bridge", None)
4265 self.kernel_path = getattr(self.op, "kernel_path", None)
4266 self.initrd_path = getattr(self.op, "initrd_path", None)
4267 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4268 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4269 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4270 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4271 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4272 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4273 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4274 self.force = getattr(self.op, "force", None)
4275 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4276 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4277 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4278 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4279 if all_parms.count(None) == len(all_parms):
4280 raise errors.OpPrereqError("No changes submitted")
4281 if self.mem is not None:
4283 self.mem = int(self.mem)
4284 except ValueError, err:
4285 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4286 if self.vcpus is not None:
4288 self.vcpus = int(self.vcpus)
4289 except ValueError, err:
4290 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4291 if self.ip is not None:
4293 if self.ip.lower() == "none":
4296 if not utils.IsValidIP(self.ip):
4297 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4300 self.do_bridge = (self.bridge is not None)
4301 if self.mac is not None:
4302 if self.cfg.IsMacInUse(self.mac):
4303 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4305 if not utils.IsValidMac(self.mac):
4306 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4308 if self.kernel_path is not None:
4309 self.do_kernel_path = True
4310 if self.kernel_path == constants.VALUE_NONE:
4311 raise errors.OpPrereqError("Can't set instance to no kernel")
4313 if self.kernel_path != constants.VALUE_DEFAULT:
4314 if not os.path.isabs(self.kernel_path):
4315 raise errors.OpPrereqError("The kernel path must be an absolute"
4318 self.do_kernel_path = False
4320 if self.initrd_path is not None:
4321 self.do_initrd_path = True
4322 if self.initrd_path not in (constants.VALUE_NONE,
4323 constants.VALUE_DEFAULT):
4324 if not os.path.isabs(self.initrd_path):
4325 raise errors.OpPrereqError("The initrd path must be an absolute"
4328 self.do_initrd_path = False
4330 # boot order verification
4331 if self.hvm_boot_order is not None:
4332 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4333 if len(self.hvm_boot_order.strip("acdn")) != 0:
4334 raise errors.OpPrereqError("invalid boot order specified,"
4335 " must be one or more of [acdn]"
4338 # hvm_cdrom_image_path verification
4339 if self.op.hvm_cdrom_image_path is not None:
4340 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4341 self.op.hvm_cdrom_image_path.lower() == "none"):
4342 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4343 " be an absolute path or None, not %s" %
4344 self.op.hvm_cdrom_image_path)
4345 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4346 self.op.hvm_cdrom_image_path.lower() == "none"):
4347 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4348 " regular file or a symlink pointing to"
4349 " an existing regular file, not %s" %
4350 self.op.hvm_cdrom_image_path)
4352 # vnc_bind_address verification
4353 if self.op.vnc_bind_address is not None:
4354 if not utils.IsValidIP(self.op.vnc_bind_address):
4355 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4356 " like a valid IP address" %
4357 self.op.vnc_bind_address)
4359 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4360 assert self.instance is not None, \
4361 "Cannot retrieve locked instance %s" % self.op.instance_name
4363 if self.mem is not None and not self.force:
4364 pnode = self.instance.primary_node
4366 nodelist.extend(instance.secondary_nodes)
4367 instance_info = rpc.call_instance_info(pnode, instance.name)
4368 nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4370 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4371 # Assume the primary node is unreachable and go ahead
4372 self.warn.append("Can't get info from primary node %s" % pnode)
4375 current_mem = instance_info['memory']
4377 # Assume instance not running
4378 # (there is a slight race condition here, but it's not very probable,
4379 # and we have no other way to check)
4381 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4383 raise errors.OpPrereqError("This change will prevent the instance"
4384 " from starting, due to %d MB of memory"
4385 " missing on its primary node" % miss_mem)
4387 for node in instance.secondary_nodes:
4388 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4389 self.warn.append("Can't get info from secondary node %s" % node)
4390 elif self.mem > nodeinfo[node]['memory_free']:
4391 self.warn.append("Not enough memory to failover instance to secondary"
4394 # Xen HVM device type checks
4395 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4396 if self.op.hvm_nic_type is not None:
4397 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4398 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4399 " HVM hypervisor" % self.op.hvm_nic_type)
4400 if self.op.hvm_disk_type is not None:
4401 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4402 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4403 " HVM hypervisor" % self.op.hvm_disk_type)
4407 def Exec(self, feedback_fn):
4408 """Modifies an instance.
4410 All parameters take effect only at the next restart of the instance.
4412 # Process here the warnings from CheckPrereq, as we don't have a
4413 # feedback_fn there.
4414 for warn in self.warn:
4415 feedback_fn("WARNING: %s" % warn)
4418 instance = self.instance
4420 instance.memory = self.mem
4421 result.append(("mem", self.mem))
4423 instance.vcpus = self.vcpus
4424 result.append(("vcpus", self.vcpus))
4426 instance.nics[0].ip = self.ip
4427 result.append(("ip", self.ip))
4429 instance.nics[0].bridge = self.bridge
4430 result.append(("bridge", self.bridge))
4432 instance.nics[0].mac = self.mac
4433 result.append(("mac", self.mac))
4434 if self.do_kernel_path:
4435 instance.kernel_path = self.kernel_path
4436 result.append(("kernel_path", self.kernel_path))
4437 if self.do_initrd_path:
4438 instance.initrd_path = self.initrd_path
4439 result.append(("initrd_path", self.initrd_path))
4440 if self.hvm_boot_order:
4441 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4442 instance.hvm_boot_order = None
4444 instance.hvm_boot_order = self.hvm_boot_order
4445 result.append(("hvm_boot_order", self.hvm_boot_order))
4446 if self.hvm_acpi is not None:
4447 instance.hvm_acpi = self.hvm_acpi
4448 result.append(("hvm_acpi", self.hvm_acpi))
4449 if self.hvm_pae is not None:
4450 instance.hvm_pae = self.hvm_pae
4451 result.append(("hvm_pae", self.hvm_pae))
4452 if self.hvm_nic_type is not None:
4453 instance.hvm_nic_type = self.hvm_nic_type
4454 result.append(("hvm_nic_type", self.hvm_nic_type))
4455 if self.hvm_disk_type is not None:
4456 instance.hvm_disk_type = self.hvm_disk_type
4457 result.append(("hvm_disk_type", self.hvm_disk_type))
4458 if self.hvm_cdrom_image_path:
4459 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4460 instance.hvm_cdrom_image_path = None
4462 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4463 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4464 if self.vnc_bind_address:
4465 instance.vnc_bind_address = self.vnc_bind_address
4466 result.append(("vnc_bind_address", self.vnc_bind_address))
4468 self.cfg.Update(instance)
4473 class LUQueryExports(NoHooksLU):
4474 """Query the exports list
4477 _OP_REQP = ['nodes']
4480 def ExpandNames(self):
4481 self.needed_locks = {}
4482 self.share_locks[locking.LEVEL_NODE] = 1
4483 if not self.op.nodes:
4484 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4486 self.needed_locks[locking.LEVEL_NODE] = \
4487 _GetWantedNodes(self, self.op.nodes)
4489 def CheckPrereq(self):
4490 """Check prerequisites.
4493 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4495 def Exec(self, feedback_fn):
4496 """Compute the list of all the exported system images.
4499 a dictionary with the structure node->(export-list)
4500 where export-list is a list of the instances exported on
4504 return rpc.call_export_list(self.nodes)
4507 class LUExportInstance(LogicalUnit):
4508 """Export an instance to an image in the cluster.
4511 HPATH = "instance-export"
4512 HTYPE = constants.HTYPE_INSTANCE
4513 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4515 def BuildHooksEnv(self):
4518 This will run on the master, primary node and target node.
4522 "EXPORT_NODE": self.op.target_node,
4523 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4525 env.update(_BuildInstanceHookEnvByObject(self.instance))
4526 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4527 self.op.target_node]
4530 def CheckPrereq(self):
4531 """Check prerequisites.
4533 This checks that the instance and node names are valid.
4536 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4537 self.instance = self.cfg.GetInstanceInfo(instance_name)
4538 if self.instance is None:
4539 raise errors.OpPrereqError("Instance '%s' not found" %
4540 self.op.instance_name)
4543 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4544 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4546 if self.dst_node is None:
4547 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4548 self.op.target_node)
4549 self.op.target_node = self.dst_node.name
4551 # instance disk type verification
4552 for disk in self.instance.disks:
4553 if disk.dev_type == constants.LD_FILE:
4554 raise errors.OpPrereqError("Export not supported for instances with"
4555 " file-based disks")
4557 def Exec(self, feedback_fn):
4558 """Export an instance to an image in the cluster.
4561 instance = self.instance
4562 dst_node = self.dst_node
4563 src_node = instance.primary_node
4564 if self.op.shutdown:
4565 # shutdown the instance, but not the disks
4566 if not rpc.call_instance_shutdown(src_node, instance):
4567 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4568 (instance.name, src_node))
4570 vgname = self.cfg.GetVGName()
4575 for disk in instance.disks:
4576 if disk.iv_name == "sda":
4577 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4578 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4580 if not new_dev_name:
4581 logger.Error("could not snapshot block device %s on node %s" %
4582 (disk.logical_id[1], src_node))
4584 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4585 logical_id=(vgname, new_dev_name),
4586 physical_id=(vgname, new_dev_name),
4587 iv_name=disk.iv_name)
4588 snap_disks.append(new_dev)
4591 if self.op.shutdown and instance.status == "up":
4592 if not rpc.call_instance_start(src_node, instance, None):
4593 _ShutdownInstanceDisks(instance, self.cfg)
4594 raise errors.OpExecError("Could not start instance")
4596 # TODO: check for size
4598 for dev in snap_disks:
4599 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4600 logger.Error("could not export block device %s from node %s to node %s"
4601 % (dev.logical_id[1], src_node, dst_node.name))
4602 if not rpc.call_blockdev_remove(src_node, dev):
4603 logger.Error("could not remove snapshot block device %s from node %s" %
4604 (dev.logical_id[1], src_node))
4606 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4607 logger.Error("could not finalize export for instance %s on node %s" %
4608 (instance.name, dst_node.name))
4610 nodelist = self.cfg.GetNodeList()
4611 nodelist.remove(dst_node.name)
4613 # on one-node clusters nodelist will be empty after the removal
4614 # if we proceed the backup would be removed because OpQueryExports
4615 # substitutes an empty list with the full cluster node list.
4617 exportlist = rpc.call_export_list(nodelist)
4618 for node in exportlist:
4619 if instance.name in exportlist[node]:
4620 if not rpc.call_export_remove(node, instance.name):
4621 logger.Error("could not remove older export for instance %s"
4622 " on node %s" % (instance.name, node))
4625 class LURemoveExport(NoHooksLU):
4626 """Remove exports related to the named instance.
4629 _OP_REQP = ["instance_name"]
4631 def CheckPrereq(self):
4632 """Check prerequisites.
4636 def Exec(self, feedback_fn):
4637 """Remove any export.
4640 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4641 # If the instance was not found we'll try with the name that was passed in.
4642 # This will only work if it was an FQDN, though.
4644 if not instance_name:
4646 instance_name = self.op.instance_name
4648 exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4650 for node in exportlist:
4651 if instance_name in exportlist[node]:
4653 if not rpc.call_export_remove(node, instance_name):
4654 logger.Error("could not remove export for instance %s"
4655 " on node %s" % (instance_name, node))
4657 if fqdn_warn and not found:
4658 feedback_fn("Export not found. If trying to remove an export belonging"
4659 " to a deleted instance please use its Fully Qualified"
4663 class TagsLU(NoHooksLU):
4666 This is an abstract class which is the parent of all the other tags LUs.
4669 def CheckPrereq(self):
4670 """Check prerequisites.
4673 if self.op.kind == constants.TAG_CLUSTER:
4674 self.target = self.cfg.GetClusterInfo()
4675 elif self.op.kind == constants.TAG_NODE:
4676 name = self.cfg.ExpandNodeName(self.op.name)
4678 raise errors.OpPrereqError("Invalid node name (%s)" %
4681 self.target = self.cfg.GetNodeInfo(name)
4682 elif self.op.kind == constants.TAG_INSTANCE:
4683 name = self.cfg.ExpandInstanceName(self.op.name)
4685 raise errors.OpPrereqError("Invalid instance name (%s)" %
4688 self.target = self.cfg.GetInstanceInfo(name)
4690 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4694 class LUGetTags(TagsLU):
4695 """Returns the tags of a given object.
4698 _OP_REQP = ["kind", "name"]
4700 def Exec(self, feedback_fn):
4701 """Returns the tag list.
4704 return list(self.target.GetTags())
4707 class LUSearchTags(NoHooksLU):
4708 """Searches the tags for a given pattern.
4711 _OP_REQP = ["pattern"]
4713 def CheckPrereq(self):
4714 """Check prerequisites.
4716 This checks the pattern passed for validity by compiling it.
4720 self.re = re.compile(self.op.pattern)
4721 except re.error, err:
4722 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4723 (self.op.pattern, err))
4725 def Exec(self, feedback_fn):
4726 """Returns the tag list.
4730 tgts = [("/cluster", cfg.GetClusterInfo())]
4731 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4732 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4733 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4734 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4736 for path, target in tgts:
4737 for tag in target.GetTags():
4738 if self.re.search(tag):
4739 results.append((path, tag))
4743 class LUAddTags(TagsLU):
4744 """Sets a tag on a given object.
4747 _OP_REQP = ["kind", "name", "tags"]
4749 def CheckPrereq(self):
4750 """Check prerequisites.
4752 This checks the type and length of the tag name and value.
4755 TagsLU.CheckPrereq(self)
4756 for tag in self.op.tags:
4757 objects.TaggableObject.ValidateTag(tag)
4759 def Exec(self, feedback_fn):
4764 for tag in self.op.tags:
4765 self.target.AddTag(tag)
4766 except errors.TagError, err:
4767 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4769 self.cfg.Update(self.target)
4770 except errors.ConfigurationError:
4771 raise errors.OpRetryError("There has been a modification to the"
4772 " config file and the operation has been"
4773 " aborted. Please retry.")
4776 class LUDelTags(TagsLU):
4777 """Delete a list of tags from a given object.
4780 _OP_REQP = ["kind", "name", "tags"]
4782 def CheckPrereq(self):
4783 """Check prerequisites.
4785 This checks that we have the given tag.
4788 TagsLU.CheckPrereq(self)
4789 for tag in self.op.tags:
4790 objects.TaggableObject.ValidateTag(tag)
4791 del_tags = frozenset(self.op.tags)
4792 cur_tags = self.target.GetTags()
4793 if not del_tags <= cur_tags:
4794 diff_tags = del_tags - cur_tags
4795 diff_names = ["'%s'" % tag for tag in diff_tags]
4797 raise errors.OpPrereqError("Tag(s) %s not found" %
4798 (",".join(diff_names)))
4800 def Exec(self, feedback_fn):
4801 """Remove the tag from the object.
4804 for tag in self.op.tags:
4805 self.target.RemoveTag(tag)
4807 self.cfg.Update(self.target)
4808 except errors.ConfigurationError:
4809 raise errors.OpRetryError("There has been a modification to the"
4810 " config file and the operation has been"
4811 " aborted. Please retry.")
4814 class LUTestDelay(NoHooksLU):
4815 """Sleep for a specified amount of time.
4817 This LU sleeps on the master and/or nodes for a specified amount of
4821 _OP_REQP = ["duration", "on_master", "on_nodes"]
4824 def ExpandNames(self):
4825 """Expand names and set required locks.
4827 This expands the node list, if any.
4830 self.needed_locks = {}
4831 if self.op.on_nodes:
4832 # _GetWantedNodes can be used here, but is not always appropriate to use
4833 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4835 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4836 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4838 def CheckPrereq(self):
4839 """Check prerequisites.
4843 def Exec(self, feedback_fn):
4844 """Do the actual sleep.
4847 if self.op.on_master:
4848 if not utils.TestDelay(self.op.duration):
4849 raise errors.OpExecError("Error during master delay test")
4850 if self.op.on_nodes:
4851 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4853 raise errors.OpExecError("Complete failure from rpc call")
4854 for node, node_result in result.items():
4856 raise errors.OpExecError("Failure during rpc call to node %s,"
4857 " result: %s" % (node, node_result))
4860 class IAllocator(object):
4861 """IAllocator framework.
4863 An IAllocator instance has three sets of attributes:
4864 - cfg/sstore that are needed to query the cluster
4865 - input data (all members of the _KEYS class attribute are required)
4866 - four buffer attributes (in|out_data|text), that represent the
4867 input (to the external script) in text and data structure format,
4868 and the output from it, again in two formats
4869 - the result variables from the script (success, info, nodes) for
4874 "mem_size", "disks", "disk_template",
4875 "os", "tags", "nics", "vcpus",
4881 def __init__(self, cfg, sstore, mode, name, **kwargs):
4883 self.sstore = sstore
4884 # init buffer variables
4885 self.in_text = self.out_text = self.in_data = self.out_data = None
4886 # init all input fields so that pylint is happy
4889 self.mem_size = self.disks = self.disk_template = None
4890 self.os = self.tags = self.nics = self.vcpus = None
4891 self.relocate_from = None
4893 self.required_nodes = None
4894 # init result fields
4895 self.success = self.info = self.nodes = None
4896 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4897 keyset = self._ALLO_KEYS
4898 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4899 keyset = self._RELO_KEYS
4901 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4902 " IAllocator" % self.mode)
4904 if key not in keyset:
4905 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4906 " IAllocator" % key)
4907 setattr(self, key, kwargs[key])
4909 if key not in kwargs:
4910 raise errors.ProgrammerError("Missing input parameter '%s' to"
4911 " IAllocator" % key)
4912 self._BuildInputData()
4914 def _ComputeClusterData(self):
4915 """Compute the generic allocator input data.
4917 This is the data that is independent of the actual operation.
4924 "cluster_name": self.sstore.GetClusterName(),
4925 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4926 "hypervisor_type": self.sstore.GetHypervisorType(),
4927 # we don't have job IDs
4930 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4934 node_list = cfg.GetNodeList()
4935 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4936 for nname in node_list:
4937 ninfo = cfg.GetNodeInfo(nname)
4938 if nname not in node_data or not isinstance(node_data[nname], dict):
4939 raise errors.OpExecError("Can't get data for node %s" % nname)
4940 remote_info = node_data[nname]
4941 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4942 'vg_size', 'vg_free', 'cpu_total']:
4943 if attr not in remote_info:
4944 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4947 remote_info[attr] = int(remote_info[attr])
4948 except ValueError, err:
4949 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4950 " %s" % (nname, attr, str(err)))
4951 # compute memory used by primary instances
4952 i_p_mem = i_p_up_mem = 0
4953 for iinfo in i_list:
4954 if iinfo.primary_node == nname:
4955 i_p_mem += iinfo.memory
4956 if iinfo.status == "up":
4957 i_p_up_mem += iinfo.memory
4959 # compute memory used by instances
4961 "tags": list(ninfo.GetTags()),
4962 "total_memory": remote_info['memory_total'],
4963 "reserved_memory": remote_info['memory_dom0'],
4964 "free_memory": remote_info['memory_free'],
4965 "i_pri_memory": i_p_mem,
4966 "i_pri_up_memory": i_p_up_mem,
4967 "total_disk": remote_info['vg_size'],
4968 "free_disk": remote_info['vg_free'],
4969 "primary_ip": ninfo.primary_ip,
4970 "secondary_ip": ninfo.secondary_ip,
4971 "total_cpus": remote_info['cpu_total'],
4973 node_results[nname] = pnr
4974 data["nodes"] = node_results
4978 for iinfo in i_list:
4979 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4980 for n in iinfo.nics]
4982 "tags": list(iinfo.GetTags()),
4983 "should_run": iinfo.status == "up",
4984 "vcpus": iinfo.vcpus,
4985 "memory": iinfo.memory,
4987 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4989 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4990 "disk_template": iinfo.disk_template,
4992 instance_data[iinfo.name] = pir
4994 data["instances"] = instance_data
4998 def _AddNewInstance(self):
4999 """Add new instance data to allocator structure.
5001 This in combination with _AllocatorGetClusterData will create the
5002 correct structure needed as input for the allocator.
5004 The checks for the completeness of the opcode must have already been
5009 if len(self.disks) != 2:
5010 raise errors.OpExecError("Only two-disk configurations supported")
5012 disk_space = _ComputeDiskSize(self.disk_template,
5013 self.disks[0]["size"], self.disks[1]["size"])
5015 if self.disk_template in constants.DTS_NET_MIRROR:
5016 self.required_nodes = 2
5018 self.required_nodes = 1
5022 "disk_template": self.disk_template,
5025 "vcpus": self.vcpus,
5026 "memory": self.mem_size,
5027 "disks": self.disks,
5028 "disk_space_total": disk_space,
5030 "required_nodes": self.required_nodes,
5032 data["request"] = request
5034 def _AddRelocateInstance(self):
5035 """Add relocate instance data to allocator structure.
5037 This in combination with _IAllocatorGetClusterData will create the
5038 correct structure needed as input for the allocator.
5040 The checks for the completeness of the opcode must have already been
5044 instance = self.cfg.GetInstanceInfo(self.name)
5045 if instance is None:
5046 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5047 " IAllocator" % self.name)
5049 if instance.disk_template not in constants.DTS_NET_MIRROR:
5050 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5052 if len(instance.secondary_nodes) != 1:
5053 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5055 self.required_nodes = 1
5057 disk_space = _ComputeDiskSize(instance.disk_template,
5058 instance.disks[0].size,
5059 instance.disks[1].size)
5064 "disk_space_total": disk_space,
5065 "required_nodes": self.required_nodes,
5066 "relocate_from": self.relocate_from,
5068 self.in_data["request"] = request
5070 def _BuildInputData(self):
5071 """Build input data structures.
5074 self._ComputeClusterData()
5076 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5077 self._AddNewInstance()
5079 self._AddRelocateInstance()
5081 self.in_text = serializer.Dump(self.in_data)
5083 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5084 """Run an instance allocator and return the results.
5089 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5091 if not isinstance(result, tuple) or len(result) != 4:
5092 raise errors.OpExecError("Invalid result from master iallocator runner")
5094 rcode, stdout, stderr, fail = result
5096 if rcode == constants.IARUN_NOTFOUND:
5097 raise errors.OpExecError("Can't find allocator '%s'" % name)
5098 elif rcode == constants.IARUN_FAILURE:
5099 raise errors.OpExecError("Instance allocator call failed: %s,"
5100 " output: %s" % (fail, stdout+stderr))
5101 self.out_text = stdout
5103 self._ValidateResult()
5105 def _ValidateResult(self):
5106 """Process the allocator results.
5108 This will process and if successful save the result in
5109 self.out_data and the other parameters.
5113 rdict = serializer.Load(self.out_text)
5114 except Exception, err:
5115 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5117 if not isinstance(rdict, dict):
5118 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5120 for key in "success", "info", "nodes":
5121 if key not in rdict:
5122 raise errors.OpExecError("Can't parse iallocator results:"
5123 " missing key '%s'" % key)
5124 setattr(self, key, rdict[key])
5126 if not isinstance(rdict["nodes"], list):
5127 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5129 self.out_data = rdict
5132 class LUTestAllocator(NoHooksLU):
5133 """Run allocator tests.
5135 This LU runs the allocator tests
5138 _OP_REQP = ["direction", "mode", "name"]
5140 def CheckPrereq(self):
5141 """Check prerequisites.
5143 This checks the opcode parameters depending on the director and mode test.
5146 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5147 for attr in ["name", "mem_size", "disks", "disk_template",
5148 "os", "tags", "nics", "vcpus"]:
5149 if not hasattr(self.op, attr):
5150 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5152 iname = self.cfg.ExpandInstanceName(self.op.name)
5153 if iname is not None:
5154 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5156 if not isinstance(self.op.nics, list):
5157 raise errors.OpPrereqError("Invalid parameter 'nics'")
5158 for row in self.op.nics:
5159 if (not isinstance(row, dict) or
5162 "bridge" not in row):
5163 raise errors.OpPrereqError("Invalid contents of the"
5164 " 'nics' parameter")
5165 if not isinstance(self.op.disks, list):
5166 raise errors.OpPrereqError("Invalid parameter 'disks'")
5167 if len(self.op.disks) != 2:
5168 raise errors.OpPrereqError("Only two-disk configurations supported")
5169 for row in self.op.disks:
5170 if (not isinstance(row, dict) or
5171 "size" not in row or
5172 not isinstance(row["size"], int) or
5173 "mode" not in row or
5174 row["mode"] not in ['r', 'w']):
5175 raise errors.OpPrereqError("Invalid contents of the"
5176 " 'disks' parameter")
5177 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5178 if not hasattr(self.op, "name"):
5179 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5180 fname = self.cfg.ExpandInstanceName(self.op.name)
5182 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5184 self.op.name = fname
5185 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5187 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5190 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5191 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5192 raise errors.OpPrereqError("Missing allocator name")
5193 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5194 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5197 def Exec(self, feedback_fn):
5198 """Run the allocator test.
5201 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5202 ial = IAllocator(self.cfg, self.sstore,
5205 mem_size=self.op.mem_size,
5206 disks=self.op.disks,
5207 disk_template=self.op.disk_template,
5211 vcpus=self.op.vcpus,
5214 ial = IAllocator(self.cfg, self.sstore,
5217 relocate_from=list(self.relocate_from),
5220 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5221 result = ial.in_text
5223 ial.Run(self.op.allocator, validate=False)
5224 result = ial.out_text