4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_WSSTORE: the LU needs a writable SimpleStore
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
71 def __init__(self, processor, op, context, sstore):
72 """Constructor for LogicalUnit.
74 This needs to be overriden in derived classes in order to check op
80 self.cfg = context.cfg
82 self.context = context
83 self.needed_locks = None
84 self.acquired_locks = {}
85 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 for attr_name in self._OP_REQP:
91 attr_val = getattr(op, attr_name, None)
93 raise errors.OpPrereqError("Required parameter '%s' missing" %
96 if not self.cfg.IsCluster():
97 raise errors.OpPrereqError("Cluster not initialized yet,"
98 " use 'gnt-cluster init' first.")
100 master = sstore.GetMasterNode()
101 if master != utils.HostInfo().name:
102 raise errors.OpPrereqError("Commands must be run on the master"
106 """Returns the SshRunner object
110 self.__ssh = ssh.SshRunner(self.sstore)
113 ssh = property(fget=__GetSSH)
115 def ExpandNames(self):
116 """Expand names for this LU.
118 This method is called before starting to execute the opcode, and it should
119 update all the parameters of the opcode to their canonical form (e.g. a
120 short node name must be fully expanded after this method has successfully
121 completed). This way locking, hooks, logging, ecc. can work correctly.
123 LUs which implement this method must also populate the self.needed_locks
124 member, as a dict with lock levels as keys, and a list of needed lock names
126 - Use an empty dict if you don't need any lock
127 - If you don't need any lock at a particular level omit that level
128 - Don't put anything for the BGL level
129 - If you want all locks at a level use locking.ALL_SET as a value
131 If you need to share locks (rather than acquire them exclusively) at one
132 level you can modify self.share_locks, setting a true value (usually 1) for
133 that level. By default locks are not shared.
136 # Acquire all nodes and one instance
137 self.needed_locks = {
138 locking.LEVEL_NODE: locking.ALL_SET,
139 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141 # Acquire just two nodes
142 self.needed_locks = {
143 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146 self.needed_locks = {} # No, you can't leave it to the default value None
149 # The implementation of this method is mandatory only if the new LU is
150 # concurrent, so that old LUs don't need to be changed all at the same
153 self.needed_locks = {} # Exclusive LUs don't need locks.
155 raise NotImplementedError
157 def DeclareLocks(self, level):
158 """Declare LU locking needs for a level
160 While most LUs can just declare their locking needs at ExpandNames time,
161 sometimes there's the need to calculate some locks after having acquired
162 the ones before. This function is called just before acquiring locks at a
163 particular level, but after acquiring the ones at lower levels, and permits
164 such calculations. It can be used to modify self.needed_locks, and by
165 default it does nothing.
167 This function is only called if you have something already set in
168 self.needed_locks for the level.
170 @param level: Locking level which is going to be locked
171 @type level: member of ganeti.locking.LEVELS
175 def CheckPrereq(self):
176 """Check prerequisites for this LU.
178 This method should check that the prerequisites for the execution
179 of this LU are fulfilled. It can do internode communication, but
180 it should be idempotent - no cluster or system changes are
183 The method should raise errors.OpPrereqError in case something is
184 not fulfilled. Its return value is ignored.
186 This method should also update all the parameters of the opcode to
187 their canonical form if it hasn't been done by ExpandNames before.
190 raise NotImplementedError
192 def Exec(self, feedback_fn):
195 This method should implement the actual work. It should raise
196 errors.OpExecError for failures that are somewhat dealt with in
200 raise NotImplementedError
202 def BuildHooksEnv(self):
203 """Build hooks environment for this LU.
205 This method should return a three-node tuple consisting of: a dict
206 containing the environment that will be used for running the
207 specific hook for this LU, a list of node names on which the hook
208 should run before the execution, and a list of node names on which
209 the hook should run after the execution.
211 The keys of the dict must not have 'GANETI_' prefixed as this will
212 be handled in the hooks runner. Also note additional keys will be
213 added by the hooks runner. If the LU doesn't define any
214 environment, an empty dict (and not None) should be returned.
216 No nodes should be returned as an empty list (and not None).
218 Note that if the HPATH for a LU class is None, this function will
222 raise NotImplementedError
224 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225 """Notify the LU about the results of its hooks.
227 This method is called every time a hooks phase is executed, and notifies
228 the Logical Unit about the hooks' result. The LU can then use it to alter
229 its result based on the hooks. By default the method does nothing and the
230 previous result is passed back unchanged but any LU can define it if it
231 wants to use the local cluster hook-scripts somehow.
234 phase: the hooks phase that has just been run
235 hooks_results: the results of the multi-node hooks rpc call
236 feedback_fn: function to send feedback back to the caller
237 lu_result: the previous result this LU had, or None in the PRE phase.
242 def _ExpandAndLockInstance(self):
243 """Helper function to expand and lock an instance.
245 Many LUs that work on an instance take its name in self.op.instance_name
246 and need to expand it and then declare the expanded name for locking. This
247 function does it, and then updates self.op.instance_name to the expanded
248 name. It also initializes needed_locks as a dict, if this hasn't been done
252 if self.needed_locks is None:
253 self.needed_locks = {}
255 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256 "_ExpandAndLockInstance called with instance-level locks set"
257 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258 if expanded_name is None:
259 raise errors.OpPrereqError("Instance '%s' not known" %
260 self.op.instance_name)
261 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262 self.op.instance_name = expanded_name
264 def _LockInstancesNodes(self, primary_only=False):
265 """Helper function to declare instances' nodes for locking.
267 This function should be called after locking one or more instances to lock
268 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269 with all primary or secondary nodes for instances already locked and
270 present in self.needed_locks[locking.LEVEL_INSTANCE].
272 It should be called from DeclareLocks, and for safety only works if
273 self.recalculate_locks[locking.LEVEL_NODE] is set.
275 In the future it may grow parameters to just lock some instance's nodes, or
276 to just lock primaries or secondary nodes, if needed.
278 If should be called in DeclareLocks in a way similar to:
280 if level == locking.LEVEL_NODE:
281 self._LockInstancesNodes()
283 @type primary_only: boolean
284 @param primary_only: only lock primary nodes of locked instances
287 assert locking.LEVEL_NODE in self.recalculate_locks, \
288 "_LockInstancesNodes helper function called with no nodes to recalculate"
290 # TODO: check if we're really been called with the instance locks held
292 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293 # future we might want to have different behaviors depending on the value
294 # of self.recalculate_locks[locking.LEVEL_NODE]
296 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297 instance = self.context.cfg.GetInstanceInfo(instance_name)
298 wanted_nodes.append(instance.primary_node)
300 wanted_nodes.extend(instance.secondary_nodes)
301 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
303 del self.recalculate_locks[locking.LEVEL_NODE]
306 class NoHooksLU(LogicalUnit):
307 """Simple LU which runs no hooks.
309 This LU is intended as a parent for other LogicalUnits which will
310 run no hooks, in order to reduce duplicate code.
317 def _GetWantedNodes(lu, nodes):
318 """Returns list of checked and expanded node names.
321 nodes: List of nodes (strings) or None for all
324 if not isinstance(nodes, list):
325 raise errors.OpPrereqError("Invalid argument type 'nodes'")
328 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
329 " non-empty list of nodes whose name is to be expanded.")
333 node = lu.cfg.ExpandNodeName(name)
335 raise errors.OpPrereqError("No such node name '%s'" % name)
338 return utils.NiceSort(wanted)
341 def _GetWantedInstances(lu, instances):
342 """Returns list of checked and expanded instance names.
345 instances: List of instances (strings) or None for all
348 if not isinstance(instances, list):
349 raise errors.OpPrereqError("Invalid argument type 'instances'")
354 for name in instances:
355 instance = lu.cfg.ExpandInstanceName(name)
357 raise errors.OpPrereqError("No such instance name '%s'" % name)
358 wanted.append(instance)
361 wanted = lu.cfg.GetInstanceList()
362 return utils.NiceSort(wanted)
365 def _CheckOutputFields(static, dynamic, selected):
366 """Checks whether all selected fields are valid.
369 static: Static fields
370 dynamic: Dynamic fields
373 static_fields = frozenset(static)
374 dynamic_fields = frozenset(dynamic)
376 all_fields = static_fields | dynamic_fields
378 if not all_fields.issuperset(selected):
379 raise errors.OpPrereqError("Unknown output fields selected: %s"
380 % ",".join(frozenset(selected).
381 difference(all_fields)))
384 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
385 memory, vcpus, nics):
386 """Builds instance related env variables for hooks from single variables.
389 secondary_nodes: List of secondary nodes as strings
393 "INSTANCE_NAME": name,
394 "INSTANCE_PRIMARY": primary_node,
395 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
396 "INSTANCE_OS_TYPE": os_type,
397 "INSTANCE_STATUS": status,
398 "INSTANCE_MEMORY": memory,
399 "INSTANCE_VCPUS": vcpus,
403 nic_count = len(nics)
404 for idx, (ip, bridge, mac) in enumerate(nics):
407 env["INSTANCE_NIC%d_IP" % idx] = ip
408 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
409 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
413 env["INSTANCE_NIC_COUNT"] = nic_count
418 def _BuildInstanceHookEnvByObject(instance, override=None):
419 """Builds instance related env variables for hooks from an object.
422 instance: objects.Instance object of instance
423 override: dict of values to override
426 'name': instance.name,
427 'primary_node': instance.primary_node,
428 'secondary_nodes': instance.secondary_nodes,
429 'os_type': instance.os,
430 'status': instance.os,
431 'memory': instance.memory,
432 'vcpus': instance.vcpus,
433 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
436 args.update(override)
437 return _BuildInstanceHookEnv(**args)
440 def _CheckInstanceBridgesExist(instance):
441 """Check that the brigdes needed by an instance exist.
444 # check bridges existance
445 brlist = [nic.bridge for nic in instance.nics]
446 if not rpc.call_bridges_exist(instance.primary_node, brlist):
447 raise errors.OpPrereqError("one or more target bridges %s does not"
448 " exist on destination node '%s'" %
449 (brlist, instance.primary_node))
452 class LUDestroyCluster(NoHooksLU):
453 """Logical unit for destroying the cluster.
458 def CheckPrereq(self):
459 """Check prerequisites.
461 This checks whether the cluster is empty.
463 Any errors are signalled by raising errors.OpPrereqError.
466 master = self.sstore.GetMasterNode()
468 nodelist = self.cfg.GetNodeList()
469 if len(nodelist) != 1 or nodelist[0] != master:
470 raise errors.OpPrereqError("There are still %d node(s) in"
471 " this cluster." % (len(nodelist) - 1))
472 instancelist = self.cfg.GetInstanceList()
474 raise errors.OpPrereqError("There are still %d instance(s) in"
475 " this cluster." % len(instancelist))
477 def Exec(self, feedback_fn):
478 """Destroys the cluster.
481 master = self.sstore.GetMasterNode()
482 if not rpc.call_node_stop_master(master, False):
483 raise errors.OpExecError("Could not disable the master role")
484 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
485 utils.CreateBackup(priv_key)
486 utils.CreateBackup(pub_key)
490 class LUVerifyCluster(LogicalUnit):
491 """Verifies the cluster status.
494 HPATH = "cluster-verify"
495 HTYPE = constants.HTYPE_CLUSTER
496 _OP_REQP = ["skip_checks"]
498 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
499 remote_version, feedback_fn):
500 """Run multiple tests against a node.
503 - compares ganeti version
504 - checks vg existance and size > 20G
505 - checks config file checksum
506 - checks ssh to other nodes
509 node: name of the node to check
510 file_list: required list of files
511 local_cksum: dictionary of local files and their checksums
514 # compares ganeti version
515 local_version = constants.PROTOCOL_VERSION
516 if not remote_version:
517 feedback_fn(" - ERROR: connection to %s failed" % (node))
520 if local_version != remote_version:
521 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
522 (local_version, node, remote_version))
525 # checks vg existance and size > 20G
529 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
533 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
534 constants.MIN_VG_SIZE)
536 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
539 # checks config file checksum
542 if 'filelist' not in node_result:
544 feedback_fn(" - ERROR: node hasn't returned file checksum data")
546 remote_cksum = node_result['filelist']
547 for file_name in file_list:
548 if file_name not in remote_cksum:
550 feedback_fn(" - ERROR: file '%s' missing" % file_name)
551 elif remote_cksum[file_name] != local_cksum[file_name]:
553 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
555 if 'nodelist' not in node_result:
557 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
559 if node_result['nodelist']:
561 for node in node_result['nodelist']:
562 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
563 (node, node_result['nodelist'][node]))
564 if 'node-net-test' not in node_result:
566 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
568 if node_result['node-net-test']:
570 nlist = utils.NiceSort(node_result['node-net-test'].keys())
572 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
573 (node, node_result['node-net-test'][node]))
575 hyp_result = node_result.get('hypervisor', None)
576 if hyp_result is not None:
577 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
580 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
581 node_instance, feedback_fn):
582 """Verify an instance.
584 This function checks to see if the required block devices are
585 available on the instance's node.
590 node_current = instanceconfig.primary_node
593 instanceconfig.MapLVsByNode(node_vol_should)
595 for node in node_vol_should:
596 for volume in node_vol_should[node]:
597 if node not in node_vol_is or volume not in node_vol_is[node]:
598 feedback_fn(" - ERROR: volume %s missing on node %s" %
602 if not instanceconfig.status == 'down':
603 if (node_current not in node_instance or
604 not instance in node_instance[node_current]):
605 feedback_fn(" - ERROR: instance %s not running on node %s" %
606 (instance, node_current))
609 for node in node_instance:
610 if (not node == node_current):
611 if instance in node_instance[node]:
612 feedback_fn(" - ERROR: instance %s should not run on node %s" %
618 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
619 """Verify if there are any unknown volumes in the cluster.
621 The .os, .swap and backup volumes are ignored. All other volumes are
627 for node in node_vol_is:
628 for volume in node_vol_is[node]:
629 if node not in node_vol_should or volume not in node_vol_should[node]:
630 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
635 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
636 """Verify the list of running instances.
638 This checks what instances are running but unknown to the cluster.
642 for node in node_instance:
643 for runninginstance in node_instance[node]:
644 if runninginstance not in instancelist:
645 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
646 (runninginstance, node))
650 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
651 """Verify N+1 Memory Resilience.
653 Check that if one single node dies we can still start all the instances it
659 for node, nodeinfo in node_info.iteritems():
660 # This code checks that every node which is now listed as secondary has
661 # enough memory to host all instances it is supposed to should a single
662 # other node in the cluster fail.
663 # FIXME: not ready for failover to an arbitrary node
664 # FIXME: does not support file-backed instances
665 # WARNING: we currently take into account down instances as well as up
666 # ones, considering that even if they're down someone might want to start
667 # them even in the event of a node failure.
668 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
670 for instance in instances:
671 needed_mem += instance_cfg[instance].memory
672 if nodeinfo['mfree'] < needed_mem:
673 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
674 " failovers should node %s fail" % (node, prinode))
678 def CheckPrereq(self):
679 """Check prerequisites.
681 Transform the list of checks we're going to skip into a set and check that
682 all its members are valid.
685 self.skip_set = frozenset(self.op.skip_checks)
686 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
687 raise errors.OpPrereqError("Invalid checks to be skipped specified")
689 def BuildHooksEnv(self):
692 Cluster-Verify hooks just rone in the post phase and their failure makes
693 the output be logged in the verify output and the verification to fail.
696 all_nodes = self.cfg.GetNodeList()
697 # TODO: populate the environment with useful information for verify hooks
699 return env, [], all_nodes
701 def Exec(self, feedback_fn):
702 """Verify integrity of cluster, performing various test on nodes.
706 feedback_fn("* Verifying global settings")
707 for msg in self.cfg.VerifyConfig():
708 feedback_fn(" - ERROR: %s" % msg)
710 vg_name = self.cfg.GetVGName()
711 nodelist = utils.NiceSort(self.cfg.GetNodeList())
712 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
713 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
714 i_non_redundant = [] # Non redundant instances
720 # FIXME: verify OS list
722 file_names = list(self.sstore.GetFileList())
723 file_names.append(constants.SSL_CERT_FILE)
724 file_names.append(constants.CLUSTER_CONF_FILE)
725 local_checksums = utils.FingerprintFiles(file_names)
727 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
728 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
729 all_instanceinfo = rpc.call_instance_list(nodelist)
730 all_vglist = rpc.call_vg_list(nodelist)
731 node_verify_param = {
732 'filelist': file_names,
733 'nodelist': nodelist,
735 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
736 for node in nodeinfo]
738 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
739 all_rversion = rpc.call_version(nodelist)
740 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
742 for node in nodelist:
743 feedback_fn("* Verifying node %s" % node)
744 result = self._VerifyNode(node, file_names, local_checksums,
745 all_vglist[node], all_nvinfo[node],
746 all_rversion[node], feedback_fn)
750 volumeinfo = all_volumeinfo[node]
752 if isinstance(volumeinfo, basestring):
753 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
754 (node, volumeinfo[-400:].encode('string_escape')))
756 node_volume[node] = {}
757 elif not isinstance(volumeinfo, dict):
758 feedback_fn(" - ERROR: connection to %s failed" % (node,))
762 node_volume[node] = volumeinfo
765 nodeinstance = all_instanceinfo[node]
766 if type(nodeinstance) != list:
767 feedback_fn(" - ERROR: connection to %s failed" % (node,))
771 node_instance[node] = nodeinstance
774 nodeinfo = all_ninfo[node]
775 if not isinstance(nodeinfo, dict):
776 feedback_fn(" - ERROR: connection to %s failed" % (node,))
782 "mfree": int(nodeinfo['memory_free']),
783 "dfree": int(nodeinfo['vg_free']),
786 # dictionary holding all instances this node is secondary for,
787 # grouped by their primary node. Each key is a cluster node, and each
788 # value is a list of instances which have the key as primary and the
789 # current node as secondary. this is handy to calculate N+1 memory
790 # availability if you can only failover from a primary to its
792 "sinst-by-pnode": {},
795 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
801 for instance in instancelist:
802 feedback_fn("* Verifying instance %s" % instance)
803 inst_config = self.cfg.GetInstanceInfo(instance)
804 result = self._VerifyInstance(instance, inst_config, node_volume,
805 node_instance, feedback_fn)
808 inst_config.MapLVsByNode(node_vol_should)
810 instance_cfg[instance] = inst_config
812 pnode = inst_config.primary_node
813 if pnode in node_info:
814 node_info[pnode]['pinst'].append(instance)
816 feedback_fn(" - ERROR: instance %s, connection to primary node"
817 " %s failed" % (instance, pnode))
820 # If the instance is non-redundant we cannot survive losing its primary
821 # node, so we are not N+1 compliant. On the other hand we have no disk
822 # templates with more than one secondary so that situation is not well
824 # FIXME: does not support file-backed instances
825 if len(inst_config.secondary_nodes) == 0:
826 i_non_redundant.append(instance)
827 elif len(inst_config.secondary_nodes) > 1:
828 feedback_fn(" - WARNING: multiple secondaries for instance %s"
831 for snode in inst_config.secondary_nodes:
832 if snode in node_info:
833 node_info[snode]['sinst'].append(instance)
834 if pnode not in node_info[snode]['sinst-by-pnode']:
835 node_info[snode]['sinst-by-pnode'][pnode] = []
836 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
838 feedback_fn(" - ERROR: instance %s, connection to secondary node"
839 " %s failed" % (instance, snode))
841 feedback_fn("* Verifying orphan volumes")
842 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
846 feedback_fn("* Verifying remaining instances")
847 result = self._VerifyOrphanInstances(instancelist, node_instance,
851 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
852 feedback_fn("* Verifying N+1 Memory redundancy")
853 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
856 feedback_fn("* Other Notes")
858 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
859 % len(i_non_redundant))
863 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
864 """Analize the post-hooks' result, handle it, and send some
865 nicely-formatted feedback back to the user.
868 phase: the hooks phase that has just been run
869 hooks_results: the results of the multi-node hooks rpc call
870 feedback_fn: function to send feedback back to the caller
871 lu_result: previous Exec result
874 # We only really run POST phase hooks, and are only interested in
876 if phase == constants.HOOKS_PHASE_POST:
877 # Used to change hooks' output to proper indentation
878 indent_re = re.compile('^', re.M)
879 feedback_fn("* Hooks Results")
880 if not hooks_results:
881 feedback_fn(" - ERROR: general communication failure")
884 for node_name in hooks_results:
885 show_node_header = True
886 res = hooks_results[node_name]
887 if res is False or not isinstance(res, list):
888 feedback_fn(" Communication failure")
891 for script, hkr, output in res:
892 if hkr == constants.HKR_FAIL:
893 # The node header is only shown once, if there are
894 # failing hooks on that node
896 feedback_fn(" Node %s:" % node_name)
897 show_node_header = False
898 feedback_fn(" ERROR: Script %s failed, output:" % script)
899 output = indent_re.sub(' ', output)
900 feedback_fn("%s" % output)
906 class LUVerifyDisks(NoHooksLU):
907 """Verifies the cluster disks status.
912 def CheckPrereq(self):
913 """Check prerequisites.
915 This has no prerequisites.
920 def Exec(self, feedback_fn):
921 """Verify integrity of cluster disks.
924 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
926 vg_name = self.cfg.GetVGName()
927 nodes = utils.NiceSort(self.cfg.GetNodeList())
928 instances = [self.cfg.GetInstanceInfo(name)
929 for name in self.cfg.GetInstanceList()]
932 for inst in instances:
934 if (inst.status != "up" or
935 inst.disk_template not in constants.DTS_NET_MIRROR):
937 inst.MapLVsByNode(inst_lvs)
938 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
939 for node, vol_list in inst_lvs.iteritems():
941 nv_dict[(node, vol)] = inst
946 node_lvs = rpc.call_volume_list(nodes, vg_name)
953 if isinstance(lvs, basestring):
954 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
956 elif not isinstance(lvs, dict):
957 logger.Info("connection to node %s failed or invalid data returned" %
959 res_nodes.append(node)
962 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
963 inst = nv_dict.pop((node, lv_name), None)
964 if (not lv_online and inst is not None
965 and inst.name not in res_instances):
966 res_instances.append(inst.name)
968 # any leftover items in nv_dict are missing LVs, let's arrange the
970 for key, inst in nv_dict.iteritems():
971 if inst.name not in res_missing:
972 res_missing[inst.name] = []
973 res_missing[inst.name].append(key)
978 class LURenameCluster(LogicalUnit):
979 """Rename the cluster.
982 HPATH = "cluster-rename"
983 HTYPE = constants.HTYPE_CLUSTER
987 def BuildHooksEnv(self):
992 "OP_TARGET": self.sstore.GetClusterName(),
993 "NEW_NAME": self.op.name,
995 mn = self.sstore.GetMasterNode()
996 return env, [mn], [mn]
998 def CheckPrereq(self):
999 """Verify that the passed name is a valid one.
1002 hostname = utils.HostInfo(self.op.name)
1004 new_name = hostname.name
1005 self.ip = new_ip = hostname.ip
1006 old_name = self.sstore.GetClusterName()
1007 old_ip = self.sstore.GetMasterIP()
1008 if new_name == old_name and new_ip == old_ip:
1009 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1010 " cluster has changed")
1011 if new_ip != old_ip:
1012 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1013 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1014 " reachable on the network. Aborting." %
1017 self.op.name = new_name
1019 def Exec(self, feedback_fn):
1020 """Rename the cluster.
1023 clustername = self.op.name
1027 # shutdown the master IP
1028 master = ss.GetMasterNode()
1029 if not rpc.call_node_stop_master(master, False):
1030 raise errors.OpExecError("Could not disable the master role")
1034 ss.SetKey(ss.SS_MASTER_IP, ip)
1035 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1037 # Distribute updated ss config to all nodes
1038 myself = self.cfg.GetNodeInfo(master)
1039 dist_nodes = self.cfg.GetNodeList()
1040 if myself.name in dist_nodes:
1041 dist_nodes.remove(myself.name)
1043 logger.Debug("Copying updated ssconf data to all nodes")
1044 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1045 fname = ss.KeyToFilename(keyname)
1046 result = rpc.call_upload_file(dist_nodes, fname)
1047 for to_node in dist_nodes:
1048 if not result[to_node]:
1049 logger.Error("copy of file %s to node %s failed" %
1052 if not rpc.call_node_start_master(master, False):
1053 logger.Error("Could not re-enable the master role on the master,"
1054 " please restart manually.")
1057 def _RecursiveCheckIfLVMBased(disk):
1058 """Check if the given disk or its children are lvm-based.
1061 disk: ganeti.objects.Disk object
1064 boolean indicating whether a LD_LV dev_type was found or not
1068 for chdisk in disk.children:
1069 if _RecursiveCheckIfLVMBased(chdisk):
1071 return disk.dev_type == constants.LD_LV
1074 class LUSetClusterParams(LogicalUnit):
1075 """Change the parameters of the cluster.
1078 HPATH = "cluster-modify"
1079 HTYPE = constants.HTYPE_CLUSTER
1082 def BuildHooksEnv(self):
1087 "OP_TARGET": self.sstore.GetClusterName(),
1088 "NEW_VG_NAME": self.op.vg_name,
1090 mn = self.sstore.GetMasterNode()
1091 return env, [mn], [mn]
1093 def CheckPrereq(self):
1094 """Check prerequisites.
1096 This checks whether the given params don't conflict and
1097 if the given volume group is valid.
1100 if not self.op.vg_name:
1101 instances = [self.cfg.GetInstanceInfo(name)
1102 for name in self.cfg.GetInstanceList()]
1103 for inst in instances:
1104 for disk in inst.disks:
1105 if _RecursiveCheckIfLVMBased(disk):
1106 raise errors.OpPrereqError("Cannot disable lvm storage while"
1107 " lvm-based instances exist")
1109 # if vg_name not None, checks given volume group on all nodes
1111 node_list = self.cfg.GetNodeList()
1112 vglist = rpc.call_vg_list(node_list)
1113 for node in node_list:
1114 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1115 constants.MIN_VG_SIZE)
1117 raise errors.OpPrereqError("Error on node '%s': %s" %
1120 def Exec(self, feedback_fn):
1121 """Change the parameters of the cluster.
1124 if self.op.vg_name != self.cfg.GetVGName():
1125 self.cfg.SetVGName(self.op.vg_name)
1127 feedback_fn("Cluster LVM configuration already in desired"
1128 " state, not changing")
1131 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1132 """Sleep and poll for an instance's disk to sync.
1135 if not instance.disks:
1139 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1141 node = instance.primary_node
1143 for dev in instance.disks:
1144 cfgw.SetDiskID(dev, node)
1150 cumul_degraded = False
1151 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1153 proc.LogWarning("Can't get any data from node %s" % node)
1156 raise errors.RemoteError("Can't contact node %s for mirror data,"
1157 " aborting." % node)
1161 for i in range(len(rstats)):
1164 proc.LogWarning("Can't compute data for node %s/%s" %
1165 (node, instance.disks[i].iv_name))
1167 # we ignore the ldisk parameter
1168 perc_done, est_time, is_degraded, _ = mstat
1169 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1170 if perc_done is not None:
1172 if est_time is not None:
1173 rem_time = "%d estimated seconds remaining" % est_time
1176 rem_time = "no time estimate"
1177 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1178 (instance.disks[i].iv_name, perc_done, rem_time))
1182 time.sleep(min(60, max_time))
1185 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1186 return not cumul_degraded
1189 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1190 """Check that mirrors are not degraded.
1192 The ldisk parameter, if True, will change the test from the
1193 is_degraded attribute (which represents overall non-ok status for
1194 the device(s)) to the ldisk (representing the local storage status).
1197 cfgw.SetDiskID(dev, node)
1204 if on_primary or dev.AssembleOnSecondary():
1205 rstats = rpc.call_blockdev_find(node, dev)
1207 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1210 result = result and (not rstats[idx])
1212 for child in dev.children:
1213 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1218 class LUDiagnoseOS(NoHooksLU):
1219 """Logical unit for OS diagnose/query.
1222 _OP_REQP = ["output_fields", "names"]
1225 def ExpandNames(self):
1227 raise errors.OpPrereqError("Selective OS query not supported")
1229 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1230 _CheckOutputFields(static=[],
1231 dynamic=self.dynamic_fields,
1232 selected=self.op.output_fields)
1234 # Lock all nodes, in shared mode
1235 self.needed_locks = {}
1236 self.share_locks[locking.LEVEL_NODE] = 1
1237 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1239 def CheckPrereq(self):
1240 """Check prerequisites.
1245 def _DiagnoseByOS(node_list, rlist):
1246 """Remaps a per-node return list into an a per-os per-node dictionary
1249 node_list: a list with the names of all nodes
1250 rlist: a map with node names as keys and OS objects as values
1253 map: a map with osnames as keys and as value another map, with
1255 keys and list of OS objects as values
1256 e.g. {"debian-etch": {"node1": [<object>,...],
1257 "node2": [<object>,]}
1262 for node_name, nr in rlist.iteritems():
1266 if os_obj.name not in all_os:
1267 # build a list of nodes for this os containing empty lists
1268 # for each node in node_list
1269 all_os[os_obj.name] = {}
1270 for nname in node_list:
1271 all_os[os_obj.name][nname] = []
1272 all_os[os_obj.name][node_name].append(os_obj)
1275 def Exec(self, feedback_fn):
1276 """Compute the list of OSes.
1279 node_list = self.acquired_locks[locking.LEVEL_NODE]
1280 node_data = rpc.call_os_diagnose(node_list)
1281 if node_data == False:
1282 raise errors.OpExecError("Can't gather the list of OSes")
1283 pol = self._DiagnoseByOS(node_list, node_data)
1285 for os_name, os_data in pol.iteritems():
1287 for field in self.op.output_fields:
1290 elif field == "valid":
1291 val = utils.all([osl and osl[0] for osl in os_data.values()])
1292 elif field == "node_status":
1294 for node_name, nos_list in os_data.iteritems():
1295 val[node_name] = [(v.status, v.path) for v in nos_list]
1297 raise errors.ParameterError(field)
1304 class LURemoveNode(LogicalUnit):
1305 """Logical unit for removing a node.
1308 HPATH = "node-remove"
1309 HTYPE = constants.HTYPE_NODE
1310 _OP_REQP = ["node_name"]
1312 def BuildHooksEnv(self):
1315 This doesn't run on the target node in the pre phase as a failed
1316 node would then be impossible to remove.
1320 "OP_TARGET": self.op.node_name,
1321 "NODE_NAME": self.op.node_name,
1323 all_nodes = self.cfg.GetNodeList()
1324 all_nodes.remove(self.op.node_name)
1325 return env, all_nodes, all_nodes
1327 def CheckPrereq(self):
1328 """Check prerequisites.
1331 - the node exists in the configuration
1332 - it does not have primary or secondary instances
1333 - it's not the master
1335 Any errors are signalled by raising errors.OpPrereqError.
1338 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1340 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1342 instance_list = self.cfg.GetInstanceList()
1344 masternode = self.sstore.GetMasterNode()
1345 if node.name == masternode:
1346 raise errors.OpPrereqError("Node is the master node,"
1347 " you need to failover first.")
1349 for instance_name in instance_list:
1350 instance = self.cfg.GetInstanceInfo(instance_name)
1351 if node.name == instance.primary_node:
1352 raise errors.OpPrereqError("Instance %s still running on the node,"
1353 " please remove first." % instance_name)
1354 if node.name in instance.secondary_nodes:
1355 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1356 " please remove first." % instance_name)
1357 self.op.node_name = node.name
1360 def Exec(self, feedback_fn):
1361 """Removes the node from the cluster.
1365 logger.Info("stopping the node daemon and removing configs from node %s" %
1368 self.context.RemoveNode(node.name)
1370 rpc.call_node_leave_cluster(node.name)
1373 class LUQueryNodes(NoHooksLU):
1374 """Logical unit for querying nodes.
1377 _OP_REQP = ["output_fields", "names"]
1380 def ExpandNames(self):
1381 self.dynamic_fields = frozenset([
1383 "mtotal", "mnode", "mfree",
1388 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1389 "pinst_list", "sinst_list",
1390 "pip", "sip", "tags"],
1391 dynamic=self.dynamic_fields,
1392 selected=self.op.output_fields)
1394 self.needed_locks = {}
1395 self.share_locks[locking.LEVEL_NODE] = 1
1396 # TODO: we could lock nodes only if the user asked for dynamic fields. For
1397 # that we need atomic ways to get info for a group of nodes from the
1399 if not self.op.names:
1400 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1402 self.needed_locks[locking.LEVEL_NODE] = \
1403 _GetWantedNodes(self, self.op.names)
1405 def CheckPrereq(self):
1406 """Check prerequisites.
1409 # This of course is valid only if we locked the nodes
1410 self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1412 def Exec(self, feedback_fn):
1413 """Computes the list of nodes and their attributes.
1416 nodenames = self.wanted
1417 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1419 # begin data gathering
1421 if self.dynamic_fields.intersection(self.op.output_fields):
1423 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1424 for name in nodenames:
1425 nodeinfo = node_data.get(name, None)
1428 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1429 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1430 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1431 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1432 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1433 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1434 "bootid": nodeinfo['bootid'],
1437 live_data[name] = {}
1439 live_data = dict.fromkeys(nodenames, {})
1441 node_to_primary = dict([(name, set()) for name in nodenames])
1442 node_to_secondary = dict([(name, set()) for name in nodenames])
1444 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1445 "sinst_cnt", "sinst_list"))
1446 if inst_fields & frozenset(self.op.output_fields):
1447 instancelist = self.cfg.GetInstanceList()
1449 for instance_name in instancelist:
1450 inst = self.cfg.GetInstanceInfo(instance_name)
1451 if inst.primary_node in node_to_primary:
1452 node_to_primary[inst.primary_node].add(inst.name)
1453 for secnode in inst.secondary_nodes:
1454 if secnode in node_to_secondary:
1455 node_to_secondary[secnode].add(inst.name)
1457 # end data gathering
1460 for node in nodelist:
1462 for field in self.op.output_fields:
1465 elif field == "pinst_list":
1466 val = list(node_to_primary[node.name])
1467 elif field == "sinst_list":
1468 val = list(node_to_secondary[node.name])
1469 elif field == "pinst_cnt":
1470 val = len(node_to_primary[node.name])
1471 elif field == "sinst_cnt":
1472 val = len(node_to_secondary[node.name])
1473 elif field == "pip":
1474 val = node.primary_ip
1475 elif field == "sip":
1476 val = node.secondary_ip
1477 elif field == "tags":
1478 val = list(node.GetTags())
1479 elif field in self.dynamic_fields:
1480 val = live_data[node.name].get(field, None)
1482 raise errors.ParameterError(field)
1483 node_output.append(val)
1484 output.append(node_output)
1489 class LUQueryNodeVolumes(NoHooksLU):
1490 """Logical unit for getting volumes on node(s).
1493 _OP_REQP = ["nodes", "output_fields"]
1496 def ExpandNames(self):
1497 _CheckOutputFields(static=["node"],
1498 dynamic=["phys", "vg", "name", "size", "instance"],
1499 selected=self.op.output_fields)
1501 self.needed_locks = {}
1502 self.share_locks[locking.LEVEL_NODE] = 1
1503 if not self.op.nodes:
1504 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1506 self.needed_locks[locking.LEVEL_NODE] = \
1507 _GetWantedNodes(self, self.op.nodes)
1509 def CheckPrereq(self):
1510 """Check prerequisites.
1512 This checks that the fields required are valid output fields.
1515 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1517 def Exec(self, feedback_fn):
1518 """Computes the list of nodes and their attributes.
1521 nodenames = self.nodes
1522 volumes = rpc.call_node_volumes(nodenames)
1524 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1525 in self.cfg.GetInstanceList()]
1527 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1530 for node in nodenames:
1531 if node not in volumes or not volumes[node]:
1534 node_vols = volumes[node][:]
1535 node_vols.sort(key=lambda vol: vol['dev'])
1537 for vol in node_vols:
1539 for field in self.op.output_fields:
1542 elif field == "phys":
1546 elif field == "name":
1548 elif field == "size":
1549 val = int(float(vol['size']))
1550 elif field == "instance":
1552 if node not in lv_by_node[inst]:
1554 if vol['name'] in lv_by_node[inst][node]:
1560 raise errors.ParameterError(field)
1561 node_output.append(str(val))
1563 output.append(node_output)
1568 class LUAddNode(LogicalUnit):
1569 """Logical unit for adding node to the cluster.
1573 HTYPE = constants.HTYPE_NODE
1574 _OP_REQP = ["node_name"]
1576 def BuildHooksEnv(self):
1579 This will run on all nodes before, and on all nodes + the new node after.
1583 "OP_TARGET": self.op.node_name,
1584 "NODE_NAME": self.op.node_name,
1585 "NODE_PIP": self.op.primary_ip,
1586 "NODE_SIP": self.op.secondary_ip,
1588 nodes_0 = self.cfg.GetNodeList()
1589 nodes_1 = nodes_0 + [self.op.node_name, ]
1590 return env, nodes_0, nodes_1
1592 def CheckPrereq(self):
1593 """Check prerequisites.
1596 - the new node is not already in the config
1598 - its parameters (single/dual homed) matches the cluster
1600 Any errors are signalled by raising errors.OpPrereqError.
1603 node_name = self.op.node_name
1606 dns_data = utils.HostInfo(node_name)
1608 node = dns_data.name
1609 primary_ip = self.op.primary_ip = dns_data.ip
1610 secondary_ip = getattr(self.op, "secondary_ip", None)
1611 if secondary_ip is None:
1612 secondary_ip = primary_ip
1613 if not utils.IsValidIP(secondary_ip):
1614 raise errors.OpPrereqError("Invalid secondary IP given")
1615 self.op.secondary_ip = secondary_ip
1617 node_list = cfg.GetNodeList()
1618 if not self.op.readd and node in node_list:
1619 raise errors.OpPrereqError("Node %s is already in the configuration" %
1621 elif self.op.readd and node not in node_list:
1622 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1624 for existing_node_name in node_list:
1625 existing_node = cfg.GetNodeInfo(existing_node_name)
1627 if self.op.readd and node == existing_node_name:
1628 if (existing_node.primary_ip != primary_ip or
1629 existing_node.secondary_ip != secondary_ip):
1630 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1631 " address configuration as before")
1634 if (existing_node.primary_ip == primary_ip or
1635 existing_node.secondary_ip == primary_ip or
1636 existing_node.primary_ip == secondary_ip or
1637 existing_node.secondary_ip == secondary_ip):
1638 raise errors.OpPrereqError("New node ip address(es) conflict with"
1639 " existing node %s" % existing_node.name)
1641 # check that the type of the node (single versus dual homed) is the
1642 # same as for the master
1643 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1644 master_singlehomed = myself.secondary_ip == myself.primary_ip
1645 newbie_singlehomed = secondary_ip == primary_ip
1646 if master_singlehomed != newbie_singlehomed:
1647 if master_singlehomed:
1648 raise errors.OpPrereqError("The master has no private ip but the"
1649 " new node has one")
1651 raise errors.OpPrereqError("The master has a private ip but the"
1652 " new node doesn't have one")
1654 # checks reachablity
1655 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1656 raise errors.OpPrereqError("Node not reachable by ping")
1658 if not newbie_singlehomed:
1659 # check reachability from my secondary ip to newbie's secondary ip
1660 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1661 source=myself.secondary_ip):
1662 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1663 " based ping to noded port")
1665 self.new_node = objects.Node(name=node,
1666 primary_ip=primary_ip,
1667 secondary_ip=secondary_ip)
1669 def Exec(self, feedback_fn):
1670 """Adds the new node to the cluster.
1673 new_node = self.new_node
1674 node = new_node.name
1676 # check connectivity
1677 result = rpc.call_version([node])[node]
1679 if constants.PROTOCOL_VERSION == result:
1680 logger.Info("communication to node %s fine, sw version %s match" %
1683 raise errors.OpExecError("Version mismatch master version %s,"
1684 " node version %s" %
1685 (constants.PROTOCOL_VERSION, result))
1687 raise errors.OpExecError("Cannot get version from the new node")
1690 logger.Info("copy ssh key to node %s" % node)
1691 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1693 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1694 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1700 keyarray.append(f.read())
1704 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1705 keyarray[3], keyarray[4], keyarray[5])
1708 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1710 # Add node to our /etc/hosts, and add key to known_hosts
1711 utils.AddHostToEtcHosts(new_node.name)
1713 if new_node.secondary_ip != new_node.primary_ip:
1714 if not rpc.call_node_tcp_ping(new_node.name,
1715 constants.LOCALHOST_IP_ADDRESS,
1716 new_node.secondary_ip,
1717 constants.DEFAULT_NODED_PORT,
1719 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1720 " you gave (%s). Please fix and re-run this"
1721 " command." % new_node.secondary_ip)
1723 node_verify_list = [self.sstore.GetMasterNode()]
1724 node_verify_param = {
1726 # TODO: do a node-net-test as well?
1729 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1730 for verifier in node_verify_list:
1731 if not result[verifier]:
1732 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1733 " for remote verification" % verifier)
1734 if result[verifier]['nodelist']:
1735 for failed in result[verifier]['nodelist']:
1736 feedback_fn("ssh/hostname verification failed %s -> %s" %
1737 (verifier, result[verifier]['nodelist'][failed]))
1738 raise errors.OpExecError("ssh/hostname verification failed.")
1740 # Distribute updated /etc/hosts and known_hosts to all nodes,
1741 # including the node just added
1742 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1743 dist_nodes = self.cfg.GetNodeList()
1744 if not self.op.readd:
1745 dist_nodes.append(node)
1746 if myself.name in dist_nodes:
1747 dist_nodes.remove(myself.name)
1749 logger.Debug("Copying hosts and known_hosts to all nodes")
1750 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1751 result = rpc.call_upload_file(dist_nodes, fname)
1752 for to_node in dist_nodes:
1753 if not result[to_node]:
1754 logger.Error("copy of file %s to node %s failed" %
1757 to_copy = self.sstore.GetFileList()
1758 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1759 to_copy.append(constants.VNC_PASSWORD_FILE)
1760 for fname in to_copy:
1761 result = rpc.call_upload_file([node], fname)
1762 if not result[node]:
1763 logger.Error("could not copy file %s to node %s" % (fname, node))
1766 self.context.ReaddNode(new_node)
1768 self.context.AddNode(new_node)
1771 class LUQueryClusterInfo(NoHooksLU):
1772 """Query cluster configuration.
1779 def ExpandNames(self):
1780 self.needed_locks = {}
1782 def CheckPrereq(self):
1783 """No prerequsites needed for this LU.
1788 def Exec(self, feedback_fn):
1789 """Return cluster config.
1793 "name": self.sstore.GetClusterName(),
1794 "software_version": constants.RELEASE_VERSION,
1795 "protocol_version": constants.PROTOCOL_VERSION,
1796 "config_version": constants.CONFIG_VERSION,
1797 "os_api_version": constants.OS_API_VERSION,
1798 "export_version": constants.EXPORT_VERSION,
1799 "master": self.sstore.GetMasterNode(),
1800 "architecture": (platform.architecture()[0], platform.machine()),
1801 "hypervisor_type": self.sstore.GetHypervisorType(),
1807 class LUDumpClusterConfig(NoHooksLU):
1808 """Return a text-representation of the cluster-config.
1814 def ExpandNames(self):
1815 self.needed_locks = {}
1817 def CheckPrereq(self):
1818 """No prerequisites.
1823 def Exec(self, feedback_fn):
1824 """Dump a representation of the cluster config to the standard output.
1827 return self.cfg.DumpConfig()
1830 class LUActivateInstanceDisks(NoHooksLU):
1831 """Bring up an instance's disks.
1834 _OP_REQP = ["instance_name"]
1837 def ExpandNames(self):
1838 self._ExpandAndLockInstance()
1839 self.needed_locks[locking.LEVEL_NODE] = []
1840 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1842 def DeclareLocks(self, level):
1843 if level == locking.LEVEL_NODE:
1844 self._LockInstancesNodes()
1846 def CheckPrereq(self):
1847 """Check prerequisites.
1849 This checks that the instance is in the cluster.
1852 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1853 assert self.instance is not None, \
1854 "Cannot retrieve locked instance %s" % self.op.instance_name
1856 def Exec(self, feedback_fn):
1857 """Activate the disks.
1860 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1862 raise errors.OpExecError("Cannot activate block devices")
1867 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1868 """Prepare the block devices for an instance.
1870 This sets up the block devices on all nodes.
1873 instance: a ganeti.objects.Instance object
1874 ignore_secondaries: if true, errors on secondary nodes won't result
1875 in an error return from the function
1878 false if the operation failed
1879 list of (host, instance_visible_name, node_visible_name) if the operation
1880 suceeded with the mapping from node devices to instance devices
1884 iname = instance.name
1885 # With the two passes mechanism we try to reduce the window of
1886 # opportunity for the race condition of switching DRBD to primary
1887 # before handshaking occured, but we do not eliminate it
1889 # The proper fix would be to wait (with some limits) until the
1890 # connection has been made and drbd transitions from WFConnection
1891 # into any other network-connected state (Connected, SyncTarget,
1894 # 1st pass, assemble on all nodes in secondary mode
1895 for inst_disk in instance.disks:
1896 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1897 cfg.SetDiskID(node_disk, node)
1898 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1900 logger.Error("could not prepare block device %s on node %s"
1901 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1902 if not ignore_secondaries:
1905 # FIXME: race condition on drbd migration to primary
1907 # 2nd pass, do only the primary node
1908 for inst_disk in instance.disks:
1909 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910 if node != instance.primary_node:
1912 cfg.SetDiskID(node_disk, node)
1913 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1915 logger.Error("could not prepare block device %s on node %s"
1916 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1918 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1920 # leave the disks configured for the primary node
1921 # this is a workaround that would be fixed better by
1922 # improving the logical/physical id handling
1923 for disk in instance.disks:
1924 cfg.SetDiskID(disk, instance.primary_node)
1926 return disks_ok, device_info
1929 def _StartInstanceDisks(cfg, instance, force):
1930 """Start the disks of an instance.
1933 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1934 ignore_secondaries=force)
1936 _ShutdownInstanceDisks(instance, cfg)
1937 if force is not None and not force:
1938 logger.Error("If the message above refers to a secondary node,"
1939 " you can retry the operation using '--force'.")
1940 raise errors.OpExecError("Disk consistency error")
1943 class LUDeactivateInstanceDisks(NoHooksLU):
1944 """Shutdown an instance's disks.
1947 _OP_REQP = ["instance_name"]
1950 def ExpandNames(self):
1951 self._ExpandAndLockInstance()
1952 self.needed_locks[locking.LEVEL_NODE] = []
1953 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1955 def DeclareLocks(self, level):
1956 if level == locking.LEVEL_NODE:
1957 self._LockInstancesNodes()
1959 def CheckPrereq(self):
1960 """Check prerequisites.
1962 This checks that the instance is in the cluster.
1965 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1966 assert self.instance is not None, \
1967 "Cannot retrieve locked instance %s" % self.op.instance_name
1969 def Exec(self, feedback_fn):
1970 """Deactivate the disks
1973 instance = self.instance
1974 _SafeShutdownInstanceDisks(instance, self.cfg)
1977 def _SafeShutdownInstanceDisks(instance, cfg):
1978 """Shutdown block devices of an instance.
1980 This function checks if an instance is running, before calling
1981 _ShutdownInstanceDisks.
1984 ins_l = rpc.call_instance_list([instance.primary_node])
1985 ins_l = ins_l[instance.primary_node]
1986 if not type(ins_l) is list:
1987 raise errors.OpExecError("Can't contact node '%s'" %
1988 instance.primary_node)
1990 if instance.name in ins_l:
1991 raise errors.OpExecError("Instance is running, can't shutdown"
1994 _ShutdownInstanceDisks(instance, cfg)
1997 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1998 """Shutdown block devices of an instance.
2000 This does the shutdown on all nodes of the instance.
2002 If the ignore_primary is false, errors on the primary node are
2007 for disk in instance.disks:
2008 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2009 cfg.SetDiskID(top_disk, node)
2010 if not rpc.call_blockdev_shutdown(node, top_disk):
2011 logger.Error("could not shutdown block device %s on node %s" %
2012 (disk.iv_name, node))
2013 if not ignore_primary or node != instance.primary_node:
2018 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2019 """Checks if a node has enough free memory.
2021 This function check if a given node has the needed amount of free
2022 memory. In case the node has less memory or we cannot get the
2023 information from the node, this function raise an OpPrereqError
2027 - cfg: a ConfigWriter instance
2028 - node: the node name
2029 - reason: string to use in the error message
2030 - requested: the amount of memory in MiB
2033 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2034 if not nodeinfo or not isinstance(nodeinfo, dict):
2035 raise errors.OpPrereqError("Could not contact node %s for resource"
2036 " information" % (node,))
2038 free_mem = nodeinfo[node].get('memory_free')
2039 if not isinstance(free_mem, int):
2040 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2041 " was '%s'" % (node, free_mem))
2042 if requested > free_mem:
2043 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2044 " needed %s MiB, available %s MiB" %
2045 (node, reason, requested, free_mem))
2048 class LUStartupInstance(LogicalUnit):
2049 """Starts an instance.
2052 HPATH = "instance-start"
2053 HTYPE = constants.HTYPE_INSTANCE
2054 _OP_REQP = ["instance_name", "force"]
2057 def ExpandNames(self):
2058 self._ExpandAndLockInstance()
2059 self.needed_locks[locking.LEVEL_NODE] = []
2060 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2062 def DeclareLocks(self, level):
2063 if level == locking.LEVEL_NODE:
2064 self._LockInstancesNodes()
2066 def BuildHooksEnv(self):
2069 This runs on master, primary and secondary nodes of the instance.
2073 "FORCE": self.op.force,
2075 env.update(_BuildInstanceHookEnvByObject(self.instance))
2076 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2077 list(self.instance.secondary_nodes))
2080 def CheckPrereq(self):
2081 """Check prerequisites.
2083 This checks that the instance is in the cluster.
2086 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2087 assert self.instance is not None, \
2088 "Cannot retrieve locked instance %s" % self.op.instance_name
2090 # check bridges existance
2091 _CheckInstanceBridgesExist(instance)
2093 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2094 "starting instance %s" % instance.name,
2097 def Exec(self, feedback_fn):
2098 """Start the instance.
2101 instance = self.instance
2102 force = self.op.force
2103 extra_args = getattr(self.op, "extra_args", "")
2105 self.cfg.MarkInstanceUp(instance.name)
2107 node_current = instance.primary_node
2109 _StartInstanceDisks(self.cfg, instance, force)
2111 if not rpc.call_instance_start(node_current, instance, extra_args):
2112 _ShutdownInstanceDisks(instance, self.cfg)
2113 raise errors.OpExecError("Could not start instance")
2116 class LURebootInstance(LogicalUnit):
2117 """Reboot an instance.
2120 HPATH = "instance-reboot"
2121 HTYPE = constants.HTYPE_INSTANCE
2122 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2125 def ExpandNames(self):
2126 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127 constants.INSTANCE_REBOOT_HARD,
2128 constants.INSTANCE_REBOOT_FULL]:
2129 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130 (constants.INSTANCE_REBOOT_SOFT,
2131 constants.INSTANCE_REBOOT_HARD,
2132 constants.INSTANCE_REBOOT_FULL))
2133 self._ExpandAndLockInstance()
2134 self.needed_locks[locking.LEVEL_NODE] = []
2135 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2137 def DeclareLocks(self, level):
2138 if level == locking.LEVEL_NODE:
2139 primary_only = not constants.INSTANCE_REBOOT_FULL
2140 self._LockInstancesNodes(primary_only=primary_only)
2142 def BuildHooksEnv(self):
2145 This runs on master, primary and secondary nodes of the instance.
2149 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2151 env.update(_BuildInstanceHookEnvByObject(self.instance))
2152 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2153 list(self.instance.secondary_nodes))
2156 def CheckPrereq(self):
2157 """Check prerequisites.
2159 This checks that the instance is in the cluster.
2162 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2163 assert self.instance is not None, \
2164 "Cannot retrieve locked instance %s" % self.op.instance_name
2166 # check bridges existance
2167 _CheckInstanceBridgesExist(instance)
2169 def Exec(self, feedback_fn):
2170 """Reboot the instance.
2173 instance = self.instance
2174 ignore_secondaries = self.op.ignore_secondaries
2175 reboot_type = self.op.reboot_type
2176 extra_args = getattr(self.op, "extra_args", "")
2178 node_current = instance.primary_node
2180 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2181 constants.INSTANCE_REBOOT_HARD]:
2182 if not rpc.call_instance_reboot(node_current, instance,
2183 reboot_type, extra_args):
2184 raise errors.OpExecError("Could not reboot instance")
2186 if not rpc.call_instance_shutdown(node_current, instance):
2187 raise errors.OpExecError("could not shutdown instance for full reboot")
2188 _ShutdownInstanceDisks(instance, self.cfg)
2189 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2190 if not rpc.call_instance_start(node_current, instance, extra_args):
2191 _ShutdownInstanceDisks(instance, self.cfg)
2192 raise errors.OpExecError("Could not start instance for full reboot")
2194 self.cfg.MarkInstanceUp(instance.name)
2197 class LUShutdownInstance(LogicalUnit):
2198 """Shutdown an instance.
2201 HPATH = "instance-stop"
2202 HTYPE = constants.HTYPE_INSTANCE
2203 _OP_REQP = ["instance_name"]
2206 def ExpandNames(self):
2207 self._ExpandAndLockInstance()
2208 self.needed_locks[locking.LEVEL_NODE] = []
2209 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2211 def DeclareLocks(self, level):
2212 if level == locking.LEVEL_NODE:
2213 self._LockInstancesNodes()
2215 def BuildHooksEnv(self):
2218 This runs on master, primary and secondary nodes of the instance.
2221 env = _BuildInstanceHookEnvByObject(self.instance)
2222 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223 list(self.instance.secondary_nodes))
2226 def CheckPrereq(self):
2227 """Check prerequisites.
2229 This checks that the instance is in the cluster.
2232 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2233 assert self.instance is not None, \
2234 "Cannot retrieve locked instance %s" % self.op.instance_name
2236 def Exec(self, feedback_fn):
2237 """Shutdown the instance.
2240 instance = self.instance
2241 node_current = instance.primary_node
2242 self.cfg.MarkInstanceDown(instance.name)
2243 if not rpc.call_instance_shutdown(node_current, instance):
2244 logger.Error("could not shutdown instance")
2246 _ShutdownInstanceDisks(instance, self.cfg)
2249 class LUReinstallInstance(LogicalUnit):
2250 """Reinstall an instance.
2253 HPATH = "instance-reinstall"
2254 HTYPE = constants.HTYPE_INSTANCE
2255 _OP_REQP = ["instance_name"]
2258 def ExpandNames(self):
2259 self._ExpandAndLockInstance()
2260 self.needed_locks[locking.LEVEL_NODE] = []
2261 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2263 def DeclareLocks(self, level):
2264 if level == locking.LEVEL_NODE:
2265 self._LockInstancesNodes()
2267 def BuildHooksEnv(self):
2270 This runs on master, primary and secondary nodes of the instance.
2273 env = _BuildInstanceHookEnvByObject(self.instance)
2274 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2275 list(self.instance.secondary_nodes))
2278 def CheckPrereq(self):
2279 """Check prerequisites.
2281 This checks that the instance is in the cluster and is not running.
2284 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2285 assert instance is not None, \
2286 "Cannot retrieve locked instance %s" % self.op.instance_name
2288 if instance.disk_template == constants.DT_DISKLESS:
2289 raise errors.OpPrereqError("Instance '%s' has no disks" %
2290 self.op.instance_name)
2291 if instance.status != "down":
2292 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2293 self.op.instance_name)
2294 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2296 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2297 (self.op.instance_name,
2298 instance.primary_node))
2300 self.op.os_type = getattr(self.op, "os_type", None)
2301 if self.op.os_type is not None:
2303 pnode = self.cfg.GetNodeInfo(
2304 self.cfg.ExpandNodeName(instance.primary_node))
2306 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2308 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2310 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2311 " primary node" % self.op.os_type)
2313 self.instance = instance
2315 def Exec(self, feedback_fn):
2316 """Reinstall the instance.
2319 inst = self.instance
2321 if self.op.os_type is not None:
2322 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2323 inst.os = self.op.os_type
2324 self.cfg.AddInstance(inst)
2326 _StartInstanceDisks(self.cfg, inst, None)
2328 feedback_fn("Running the instance OS create scripts...")
2329 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2330 raise errors.OpExecError("Could not install OS for instance %s"
2332 (inst.name, inst.primary_node))
2334 _ShutdownInstanceDisks(inst, self.cfg)
2337 class LURenameInstance(LogicalUnit):
2338 """Rename an instance.
2341 HPATH = "instance-rename"
2342 HTYPE = constants.HTYPE_INSTANCE
2343 _OP_REQP = ["instance_name", "new_name"]
2345 def BuildHooksEnv(self):
2348 This runs on master, primary and secondary nodes of the instance.
2351 env = _BuildInstanceHookEnvByObject(self.instance)
2352 env["INSTANCE_NEW_NAME"] = self.op.new_name
2353 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2354 list(self.instance.secondary_nodes))
2357 def CheckPrereq(self):
2358 """Check prerequisites.
2360 This checks that the instance is in the cluster and is not running.
2363 instance = self.cfg.GetInstanceInfo(
2364 self.cfg.ExpandInstanceName(self.op.instance_name))
2365 if instance is None:
2366 raise errors.OpPrereqError("Instance '%s' not known" %
2367 self.op.instance_name)
2368 if instance.status != "down":
2369 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2370 self.op.instance_name)
2371 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2373 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2374 (self.op.instance_name,
2375 instance.primary_node))
2376 self.instance = instance
2378 # new name verification
2379 name_info = utils.HostInfo(self.op.new_name)
2381 self.op.new_name = new_name = name_info.name
2382 instance_list = self.cfg.GetInstanceList()
2383 if new_name in instance_list:
2384 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2387 if not getattr(self.op, "ignore_ip", False):
2388 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2389 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2390 (name_info.ip, new_name))
2393 def Exec(self, feedback_fn):
2394 """Reinstall the instance.
2397 inst = self.instance
2398 old_name = inst.name
2400 if inst.disk_template == constants.DT_FILE:
2401 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2403 self.cfg.RenameInstance(inst.name, self.op.new_name)
2404 # Change the instance lock. This is definitely safe while we hold the BGL
2405 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2406 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2408 # re-read the instance from the configuration after rename
2409 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2411 if inst.disk_template == constants.DT_FILE:
2412 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2413 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2414 old_file_storage_dir,
2415 new_file_storage_dir)
2418 raise errors.OpExecError("Could not connect to node '%s' to rename"
2419 " directory '%s' to '%s' (but the instance"
2420 " has been renamed in Ganeti)" % (
2421 inst.primary_node, old_file_storage_dir,
2422 new_file_storage_dir))
2425 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2426 " (but the instance has been renamed in"
2427 " Ganeti)" % (old_file_storage_dir,
2428 new_file_storage_dir))
2430 _StartInstanceDisks(self.cfg, inst, None)
2432 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2434 msg = ("Could not run OS rename script for instance %s on node %s"
2435 " (but the instance has been renamed in Ganeti)" %
2436 (inst.name, inst.primary_node))
2439 _ShutdownInstanceDisks(inst, self.cfg)
2442 class LURemoveInstance(LogicalUnit):
2443 """Remove an instance.
2446 HPATH = "instance-remove"
2447 HTYPE = constants.HTYPE_INSTANCE
2448 _OP_REQP = ["instance_name", "ignore_failures"]
2450 def BuildHooksEnv(self):
2453 This runs on master, primary and secondary nodes of the instance.
2456 env = _BuildInstanceHookEnvByObject(self.instance)
2457 nl = [self.sstore.GetMasterNode()]
2460 def CheckPrereq(self):
2461 """Check prerequisites.
2463 This checks that the instance is in the cluster.
2466 instance = self.cfg.GetInstanceInfo(
2467 self.cfg.ExpandInstanceName(self.op.instance_name))
2468 if instance is None:
2469 raise errors.OpPrereqError("Instance '%s' not known" %
2470 self.op.instance_name)
2471 self.instance = instance
2473 def Exec(self, feedback_fn):
2474 """Remove the instance.
2477 instance = self.instance
2478 logger.Info("shutting down instance %s on node %s" %
2479 (instance.name, instance.primary_node))
2481 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2482 if self.op.ignore_failures:
2483 feedback_fn("Warning: can't shutdown instance")
2485 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2486 (instance.name, instance.primary_node))
2488 logger.Info("removing block devices for instance %s" % instance.name)
2490 if not _RemoveDisks(instance, self.cfg):
2491 if self.op.ignore_failures:
2492 feedback_fn("Warning: can't remove instance's disks")
2494 raise errors.OpExecError("Can't remove instance's disks")
2496 logger.Info("removing instance %s out of cluster config" % instance.name)
2498 self.cfg.RemoveInstance(instance.name)
2499 # Remove the new instance from the Ganeti Lock Manager
2500 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2503 class LUQueryInstances(NoHooksLU):
2504 """Logical unit for querying instances.
2507 _OP_REQP = ["output_fields", "names"]
2510 def ExpandNames(self):
2511 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2512 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2513 "admin_state", "admin_ram",
2514 "disk_template", "ip", "mac", "bridge",
2515 "sda_size", "sdb_size", "vcpus", "tags",
2517 "network_port", "kernel_path", "initrd_path",
2518 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2519 "hvm_cdrom_image_path", "hvm_nic_type",
2520 "hvm_disk_type", "vnc_bind_address"],
2521 dynamic=self.dynamic_fields,
2522 selected=self.op.output_fields)
2524 self.needed_locks = {}
2525 self.share_locks[locking.LEVEL_INSTANCE] = 1
2526 self.share_locks[locking.LEVEL_NODE] = 1
2528 # TODO: we could lock instances (and nodes) only if the user asked for
2529 # dynamic fields. For that we need atomic ways to get info for a group of
2530 # instances from the config, though.
2531 if not self.op.names:
2532 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2534 self.needed_locks[locking.LEVEL_INSTANCE] = \
2535 _GetWantedInstances(self, self.op.names)
2537 self.needed_locks[locking.LEVEL_NODE] = []
2538 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2540 def DeclareLocks(self, level):
2541 # TODO: locking of nodes could be avoided when not querying them
2542 if level == locking.LEVEL_NODE:
2543 self._LockInstancesNodes()
2545 def CheckPrereq(self):
2546 """Check prerequisites.
2549 # This of course is valid only if we locked the instances
2550 self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2552 def Exec(self, feedback_fn):
2553 """Computes the list of nodes and their attributes.
2556 instance_names = self.wanted
2557 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2560 # begin data gathering
2562 nodes = frozenset([inst.primary_node for inst in instance_list])
2565 if self.dynamic_fields.intersection(self.op.output_fields):
2567 node_data = rpc.call_all_instances_info(nodes)
2569 result = node_data[name]
2571 live_data.update(result)
2572 elif result == False:
2573 bad_nodes.append(name)
2574 # else no instance is alive
2576 live_data = dict([(name, {}) for name in instance_names])
2578 # end data gathering
2581 for instance in instance_list:
2583 for field in self.op.output_fields:
2588 elif field == "pnode":
2589 val = instance.primary_node
2590 elif field == "snodes":
2591 val = list(instance.secondary_nodes)
2592 elif field == "admin_state":
2593 val = (instance.status != "down")
2594 elif field == "oper_state":
2595 if instance.primary_node in bad_nodes:
2598 val = bool(live_data.get(instance.name))
2599 elif field == "status":
2600 if instance.primary_node in bad_nodes:
2601 val = "ERROR_nodedown"
2603 running = bool(live_data.get(instance.name))
2605 if instance.status != "down":
2610 if instance.status != "down":
2614 elif field == "admin_ram":
2615 val = instance.memory
2616 elif field == "oper_ram":
2617 if instance.primary_node in bad_nodes:
2619 elif instance.name in live_data:
2620 val = live_data[instance.name].get("memory", "?")
2623 elif field == "disk_template":
2624 val = instance.disk_template
2626 val = instance.nics[0].ip
2627 elif field == "bridge":
2628 val = instance.nics[0].bridge
2629 elif field == "mac":
2630 val = instance.nics[0].mac
2631 elif field == "sda_size" or field == "sdb_size":
2632 disk = instance.FindDisk(field[:3])
2637 elif field == "vcpus":
2638 val = instance.vcpus
2639 elif field == "tags":
2640 val = list(instance.GetTags())
2641 elif field in ("network_port", "kernel_path", "initrd_path",
2642 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2643 "hvm_cdrom_image_path", "hvm_nic_type",
2644 "hvm_disk_type", "vnc_bind_address"):
2645 val = getattr(instance, field, None)
2648 elif field in ("hvm_nic_type", "hvm_disk_type",
2649 "kernel_path", "initrd_path"):
2654 raise errors.ParameterError(field)
2661 class LUFailoverInstance(LogicalUnit):
2662 """Failover an instance.
2665 HPATH = "instance-failover"
2666 HTYPE = constants.HTYPE_INSTANCE
2667 _OP_REQP = ["instance_name", "ignore_consistency"]
2670 def ExpandNames(self):
2671 self._ExpandAndLockInstance()
2672 self.needed_locks[locking.LEVEL_NODE] = []
2673 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2675 def DeclareLocks(self, level):
2676 if level == locking.LEVEL_NODE:
2677 self._LockInstancesNodes()
2679 def BuildHooksEnv(self):
2682 This runs on master, primary and secondary nodes of the instance.
2686 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2688 env.update(_BuildInstanceHookEnvByObject(self.instance))
2689 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2692 def CheckPrereq(self):
2693 """Check prerequisites.
2695 This checks that the instance is in the cluster.
2698 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2699 assert self.instance is not None, \
2700 "Cannot retrieve locked instance %s" % self.op.instance_name
2702 if instance.disk_template not in constants.DTS_NET_MIRROR:
2703 raise errors.OpPrereqError("Instance's disk layout is not"
2704 " network mirrored, cannot failover.")
2706 secondary_nodes = instance.secondary_nodes
2707 if not secondary_nodes:
2708 raise errors.ProgrammerError("no secondary node but using "
2709 "a mirrored disk template")
2711 target_node = secondary_nodes[0]
2712 # check memory requirements on the secondary node
2713 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2714 instance.name, instance.memory)
2716 # check bridge existance
2717 brlist = [nic.bridge for nic in instance.nics]
2718 if not rpc.call_bridges_exist(target_node, brlist):
2719 raise errors.OpPrereqError("One or more target bridges %s does not"
2720 " exist on destination node '%s'" %
2721 (brlist, target_node))
2723 def Exec(self, feedback_fn):
2724 """Failover an instance.
2726 The failover is done by shutting it down on its present node and
2727 starting it on the secondary.
2730 instance = self.instance
2732 source_node = instance.primary_node
2733 target_node = instance.secondary_nodes[0]
2735 feedback_fn("* checking disk consistency between source and target")
2736 for dev in instance.disks:
2737 # for drbd, these are drbd over lvm
2738 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2739 if instance.status == "up" and not self.op.ignore_consistency:
2740 raise errors.OpExecError("Disk %s is degraded on target node,"
2741 " aborting failover." % dev.iv_name)
2743 feedback_fn("* shutting down instance on source node")
2744 logger.Info("Shutting down instance %s on node %s" %
2745 (instance.name, source_node))
2747 if not rpc.call_instance_shutdown(source_node, instance):
2748 if self.op.ignore_consistency:
2749 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2750 " anyway. Please make sure node %s is down" %
2751 (instance.name, source_node, source_node))
2753 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2754 (instance.name, source_node))
2756 feedback_fn("* deactivating the instance's disks on source node")
2757 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2758 raise errors.OpExecError("Can't shut down the instance's disks.")
2760 instance.primary_node = target_node
2761 # distribute new instance config to the other nodes
2762 self.cfg.Update(instance)
2764 # Only start the instance if it's marked as up
2765 if instance.status == "up":
2766 feedback_fn("* activating the instance's disks on target node")
2767 logger.Info("Starting instance %s on node %s" %
2768 (instance.name, target_node))
2770 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2771 ignore_secondaries=True)
2773 _ShutdownInstanceDisks(instance, self.cfg)
2774 raise errors.OpExecError("Can't activate the instance's disks")
2776 feedback_fn("* starting the instance on the target node")
2777 if not rpc.call_instance_start(target_node, instance, None):
2778 _ShutdownInstanceDisks(instance, self.cfg)
2779 raise errors.OpExecError("Could not start instance %s on node %s." %
2780 (instance.name, target_node))
2783 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2784 """Create a tree of block devices on the primary node.
2786 This always creates all devices.
2790 for child in device.children:
2791 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2794 cfg.SetDiskID(device, node)
2795 new_id = rpc.call_blockdev_create(node, device, device.size,
2796 instance.name, True, info)
2799 if device.physical_id is None:
2800 device.physical_id = new_id
2804 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2805 """Create a tree of block devices on a secondary node.
2807 If this device type has to be created on secondaries, create it and
2810 If not, just recurse to children keeping the same 'force' value.
2813 if device.CreateOnSecondary():
2816 for child in device.children:
2817 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2818 child, force, info):
2823 cfg.SetDiskID(device, node)
2824 new_id = rpc.call_blockdev_create(node, device, device.size,
2825 instance.name, False, info)
2828 if device.physical_id is None:
2829 device.physical_id = new_id
2833 def _GenerateUniqueNames(cfg, exts):
2834 """Generate a suitable LV name.
2836 This will generate a logical volume name for the given instance.
2841 new_id = cfg.GenerateUniqueID()
2842 results.append("%s%s" % (new_id, val))
2846 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2847 """Generate a drbd8 device complete with its children.
2850 port = cfg.AllocatePort()
2851 vgname = cfg.GetVGName()
2852 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2853 logical_id=(vgname, names[0]))
2854 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2855 logical_id=(vgname, names[1]))
2856 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2857 logical_id = (primary, secondary, port),
2858 children = [dev_data, dev_meta],
2863 def _GenerateDiskTemplate(cfg, template_name,
2864 instance_name, primary_node,
2865 secondary_nodes, disk_sz, swap_sz,
2866 file_storage_dir, file_driver):
2867 """Generate the entire disk layout for a given template type.
2870 #TODO: compute space requirements
2872 vgname = cfg.GetVGName()
2873 if template_name == constants.DT_DISKLESS:
2875 elif template_name == constants.DT_PLAIN:
2876 if len(secondary_nodes) != 0:
2877 raise errors.ProgrammerError("Wrong template configuration")
2879 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2880 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2881 logical_id=(vgname, names[0]),
2883 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2884 logical_id=(vgname, names[1]),
2886 disks = [sda_dev, sdb_dev]
2887 elif template_name == constants.DT_DRBD8:
2888 if len(secondary_nodes) != 1:
2889 raise errors.ProgrammerError("Wrong template configuration")
2890 remote_node = secondary_nodes[0]
2891 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2892 ".sdb_data", ".sdb_meta"])
2893 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2894 disk_sz, names[0:2], "sda")
2895 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2896 swap_sz, names[2:4], "sdb")
2897 disks = [drbd_sda_dev, drbd_sdb_dev]
2898 elif template_name == constants.DT_FILE:
2899 if len(secondary_nodes) != 0:
2900 raise errors.ProgrammerError("Wrong template configuration")
2902 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2903 iv_name="sda", logical_id=(file_driver,
2904 "%s/sda" % file_storage_dir))
2905 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2906 iv_name="sdb", logical_id=(file_driver,
2907 "%s/sdb" % file_storage_dir))
2908 disks = [file_sda_dev, file_sdb_dev]
2910 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2914 def _GetInstanceInfoText(instance):
2915 """Compute that text that should be added to the disk's metadata.
2918 return "originstname+%s" % instance.name
2921 def _CreateDisks(cfg, instance):
2922 """Create all disks for an instance.
2924 This abstracts away some work from AddInstance.
2927 instance: the instance object
2930 True or False showing the success of the creation process
2933 info = _GetInstanceInfoText(instance)
2935 if instance.disk_template == constants.DT_FILE:
2936 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2937 result = rpc.call_file_storage_dir_create(instance.primary_node,
2941 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2945 logger.Error("failed to create directory '%s'" % file_storage_dir)
2948 for device in instance.disks:
2949 logger.Info("creating volume %s for instance %s" %
2950 (device.iv_name, instance.name))
2952 for secondary_node in instance.secondary_nodes:
2953 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2954 device, False, info):
2955 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2956 (device.iv_name, device, secondary_node))
2959 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2960 instance, device, info):
2961 logger.Error("failed to create volume %s on primary!" %
2968 def _RemoveDisks(instance, cfg):
2969 """Remove all disks for an instance.
2971 This abstracts away some work from `AddInstance()` and
2972 `RemoveInstance()`. Note that in case some of the devices couldn't
2973 be removed, the removal will continue with the other ones (compare
2974 with `_CreateDisks()`).
2977 instance: the instance object
2980 True or False showing the success of the removal proces
2983 logger.Info("removing block devices for instance %s" % instance.name)
2986 for device in instance.disks:
2987 for node, disk in device.ComputeNodeTree(instance.primary_node):
2988 cfg.SetDiskID(disk, node)
2989 if not rpc.call_blockdev_remove(node, disk):
2990 logger.Error("could not remove block device %s on node %s,"
2991 " continuing anyway" %
2992 (device.iv_name, node))
2995 if instance.disk_template == constants.DT_FILE:
2996 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2997 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2999 logger.Error("could not remove directory '%s'" % file_storage_dir)
3005 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3006 """Compute disk size requirements in the volume group
3008 This is currently hard-coded for the two-drive layout.
3011 # Required free disk space as a function of disk and swap space
3013 constants.DT_DISKLESS: None,
3014 constants.DT_PLAIN: disk_size + swap_size,
3015 # 256 MB are added for drbd metadata, 128MB for each drbd device
3016 constants.DT_DRBD8: disk_size + swap_size + 256,
3017 constants.DT_FILE: None,
3020 if disk_template not in req_size_dict:
3021 raise errors.ProgrammerError("Disk template '%s' size requirement"
3022 " is unknown" % disk_template)
3024 return req_size_dict[disk_template]
3027 class LUCreateInstance(LogicalUnit):
3028 """Create an instance.
3031 HPATH = "instance-add"
3032 HTYPE = constants.HTYPE_INSTANCE
3033 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3034 "disk_template", "swap_size", "mode", "start", "vcpus",
3035 "wait_for_sync", "ip_check", "mac"]
3037 def _RunAllocator(self):
3038 """Run the allocator based on input opcode.
3041 disks = [{"size": self.op.disk_size, "mode": "w"},
3042 {"size": self.op.swap_size, "mode": "w"}]
3043 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3044 "bridge": self.op.bridge}]
3045 ial = IAllocator(self.cfg, self.sstore,
3046 mode=constants.IALLOCATOR_MODE_ALLOC,
3047 name=self.op.instance_name,
3048 disk_template=self.op.disk_template,
3051 vcpus=self.op.vcpus,
3052 mem_size=self.op.mem_size,
3057 ial.Run(self.op.iallocator)
3060 raise errors.OpPrereqError("Can't compute nodes using"
3061 " iallocator '%s': %s" % (self.op.iallocator,
3063 if len(ial.nodes) != ial.required_nodes:
3064 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3065 " of nodes (%s), required %s" %
3066 (len(ial.nodes), ial.required_nodes))
3067 self.op.pnode = ial.nodes[0]
3068 logger.ToStdout("Selected nodes for the instance: %s" %
3069 (", ".join(ial.nodes),))
3070 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3071 (self.op.instance_name, self.op.iallocator, ial.nodes))
3072 if ial.required_nodes == 2:
3073 self.op.snode = ial.nodes[1]
3075 def BuildHooksEnv(self):
3078 This runs on master, primary and secondary nodes of the instance.
3082 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3083 "INSTANCE_DISK_SIZE": self.op.disk_size,
3084 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3085 "INSTANCE_ADD_MODE": self.op.mode,
3087 if self.op.mode == constants.INSTANCE_IMPORT:
3088 env["INSTANCE_SRC_NODE"] = self.op.src_node
3089 env["INSTANCE_SRC_PATH"] = self.op.src_path
3090 env["INSTANCE_SRC_IMAGE"] = self.src_image
3092 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3093 primary_node=self.op.pnode,
3094 secondary_nodes=self.secondaries,
3095 status=self.instance_status,
3096 os_type=self.op.os_type,
3097 memory=self.op.mem_size,
3098 vcpus=self.op.vcpus,
3099 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3102 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3107 def CheckPrereq(self):
3108 """Check prerequisites.
3111 # set optional parameters to none if they don't exist
3112 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3113 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3114 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3115 if not hasattr(self.op, attr):
3116 setattr(self.op, attr, None)
3118 if self.op.mode not in (constants.INSTANCE_CREATE,
3119 constants.INSTANCE_IMPORT):
3120 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3123 if (not self.cfg.GetVGName() and
3124 self.op.disk_template not in constants.DTS_NOT_LVM):
3125 raise errors.OpPrereqError("Cluster does not support lvm-based"
3128 if self.op.mode == constants.INSTANCE_IMPORT:
3129 src_node = getattr(self.op, "src_node", None)
3130 src_path = getattr(self.op, "src_path", None)
3131 if src_node is None or src_path is None:
3132 raise errors.OpPrereqError("Importing an instance requires source"
3133 " node and path options")
3134 src_node_full = self.cfg.ExpandNodeName(src_node)
3135 if src_node_full is None:
3136 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3137 self.op.src_node = src_node = src_node_full
3139 if not os.path.isabs(src_path):
3140 raise errors.OpPrereqError("The source path must be absolute")
3142 export_info = rpc.call_export_info(src_node, src_path)
3145 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3147 if not export_info.has_section(constants.INISECT_EXP):
3148 raise errors.ProgrammerError("Corrupted export config")
3150 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3151 if (int(ei_version) != constants.EXPORT_VERSION):
3152 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3153 (ei_version, constants.EXPORT_VERSION))
3155 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3156 raise errors.OpPrereqError("Can't import instance with more than"
3159 # FIXME: are the old os-es, disk sizes, etc. useful?
3160 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3161 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3163 self.src_image = diskimage
3164 else: # INSTANCE_CREATE
3165 if getattr(self.op, "os_type", None) is None:
3166 raise errors.OpPrereqError("No guest OS specified")
3168 #### instance parameters check
3170 # disk template and mirror node verification
3171 if self.op.disk_template not in constants.DISK_TEMPLATES:
3172 raise errors.OpPrereqError("Invalid disk template name")
3174 # instance name verification
3175 hostname1 = utils.HostInfo(self.op.instance_name)
3177 self.op.instance_name = instance_name = hostname1.name
3178 instance_list = self.cfg.GetInstanceList()
3179 if instance_name in instance_list:
3180 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3183 # ip validity checks
3184 ip = getattr(self.op, "ip", None)
3185 if ip is None or ip.lower() == "none":
3187 elif ip.lower() == "auto":
3188 inst_ip = hostname1.ip
3190 if not utils.IsValidIP(ip):
3191 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3192 " like a valid IP" % ip)
3194 self.inst_ip = self.op.ip = inst_ip
3196 if self.op.start and not self.op.ip_check:
3197 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3198 " adding an instance in start mode")
3200 if self.op.ip_check:
3201 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3202 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3203 (hostname1.ip, instance_name))
3205 # MAC address verification
3206 if self.op.mac != "auto":
3207 if not utils.IsValidMac(self.op.mac.lower()):
3208 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3211 # bridge verification
3212 bridge = getattr(self.op, "bridge", None)
3214 self.op.bridge = self.cfg.GetDefBridge()
3216 self.op.bridge = bridge
3218 # boot order verification
3219 if self.op.hvm_boot_order is not None:
3220 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3221 raise errors.OpPrereqError("invalid boot order specified,"
3222 " must be one or more of [acdn]")
3223 # file storage checks
3224 if (self.op.file_driver and
3225 not self.op.file_driver in constants.FILE_DRIVER):
3226 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3227 self.op.file_driver)
3229 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3230 raise errors.OpPrereqError("File storage directory not a relative"
3234 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3235 raise errors.OpPrereqError("One and only one of iallocator and primary"
3236 " node must be given")
3238 if self.op.iallocator is not None:
3239 self._RunAllocator()
3241 #### node related checks
3243 # check primary node
3244 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3246 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3248 self.op.pnode = pnode.name
3250 self.secondaries = []
3252 # mirror node verification
3253 if self.op.disk_template in constants.DTS_NET_MIRROR:
3254 if getattr(self.op, "snode", None) is None:
3255 raise errors.OpPrereqError("The networked disk templates need"
3258 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3259 if snode_name is None:
3260 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3262 elif snode_name == pnode.name:
3263 raise errors.OpPrereqError("The secondary node cannot be"
3264 " the primary node.")
3265 self.secondaries.append(snode_name)
3267 req_size = _ComputeDiskSize(self.op.disk_template,
3268 self.op.disk_size, self.op.swap_size)
3270 # Check lv size requirements
3271 if req_size is not None:
3272 nodenames = [pnode.name] + self.secondaries
3273 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3274 for node in nodenames:
3275 info = nodeinfo.get(node, None)
3277 raise errors.OpPrereqError("Cannot get current information"
3278 " from node '%s'" % node)
3279 vg_free = info.get('vg_free', None)
3280 if not isinstance(vg_free, int):
3281 raise errors.OpPrereqError("Can't compute free disk space on"
3283 if req_size > info['vg_free']:
3284 raise errors.OpPrereqError("Not enough disk space on target node %s."
3285 " %d MB available, %d MB required" %
3286 (node, info['vg_free'], req_size))
3289 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3291 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3292 " primary node" % self.op.os_type)
3294 if self.op.kernel_path == constants.VALUE_NONE:
3295 raise errors.OpPrereqError("Can't set instance kernel to none")
3298 # bridge check on primary node
3299 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3300 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3301 " destination node '%s'" %
3302 (self.op.bridge, pnode.name))
3304 # memory check on primary node
3306 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3307 "creating instance %s" % self.op.instance_name,
3310 # hvm_cdrom_image_path verification
3311 if self.op.hvm_cdrom_image_path is not None:
3312 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3313 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3314 " be an absolute path or None, not %s" %
3315 self.op.hvm_cdrom_image_path)
3316 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3317 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3318 " regular file or a symlink pointing to"
3319 " an existing regular file, not %s" %
3320 self.op.hvm_cdrom_image_path)
3322 # vnc_bind_address verification
3323 if self.op.vnc_bind_address is not None:
3324 if not utils.IsValidIP(self.op.vnc_bind_address):
3325 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3326 " like a valid IP address" %
3327 self.op.vnc_bind_address)
3329 # Xen HVM device type checks
3330 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3331 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3332 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3333 " hypervisor" % self.op.hvm_nic_type)
3334 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3335 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3336 " hypervisor" % self.op.hvm_disk_type)
3339 self.instance_status = 'up'
3341 self.instance_status = 'down'
3343 def Exec(self, feedback_fn):
3344 """Create and add the instance to the cluster.
3347 instance = self.op.instance_name
3348 pnode_name = self.pnode.name
3350 if self.op.mac == "auto":
3351 mac_address = self.cfg.GenerateMAC()
3353 mac_address = self.op.mac
3355 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3356 if self.inst_ip is not None:
3357 nic.ip = self.inst_ip
3359 ht_kind = self.sstore.GetHypervisorType()
3360 if ht_kind in constants.HTS_REQ_PORT:
3361 network_port = self.cfg.AllocatePort()
3365 if self.op.vnc_bind_address is None:
3366 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3368 # this is needed because os.path.join does not accept None arguments
3369 if self.op.file_storage_dir is None:
3370 string_file_storage_dir = ""
3372 string_file_storage_dir = self.op.file_storage_dir
3374 # build the full file storage dir path
3375 file_storage_dir = os.path.normpath(os.path.join(
3376 self.sstore.GetFileStorageDir(),
3377 string_file_storage_dir, instance))
3380 disks = _GenerateDiskTemplate(self.cfg,
3381 self.op.disk_template,
3382 instance, pnode_name,
3383 self.secondaries, self.op.disk_size,
3386 self.op.file_driver)
3388 iobj = objects.Instance(name=instance, os=self.op.os_type,
3389 primary_node=pnode_name,
3390 memory=self.op.mem_size,
3391 vcpus=self.op.vcpus,
3392 nics=[nic], disks=disks,
3393 disk_template=self.op.disk_template,
3394 status=self.instance_status,
3395 network_port=network_port,
3396 kernel_path=self.op.kernel_path,
3397 initrd_path=self.op.initrd_path,
3398 hvm_boot_order=self.op.hvm_boot_order,
3399 hvm_acpi=self.op.hvm_acpi,
3400 hvm_pae=self.op.hvm_pae,
3401 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3402 vnc_bind_address=self.op.vnc_bind_address,
3403 hvm_nic_type=self.op.hvm_nic_type,
3404 hvm_disk_type=self.op.hvm_disk_type,
3407 feedback_fn("* creating instance disks...")
3408 if not _CreateDisks(self.cfg, iobj):
3409 _RemoveDisks(iobj, self.cfg)
3410 raise errors.OpExecError("Device creation failed, reverting...")
3412 feedback_fn("adding instance %s to cluster config" % instance)
3414 self.cfg.AddInstance(iobj)
3415 # Add the new instance to the Ganeti Lock Manager
3416 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3418 if self.op.wait_for_sync:
3419 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3420 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3421 # make sure the disks are not degraded (still sync-ing is ok)
3423 feedback_fn("* checking mirrors status")
3424 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3429 _RemoveDisks(iobj, self.cfg)
3430 self.cfg.RemoveInstance(iobj.name)
3431 # Remove the new instance from the Ganeti Lock Manager
3432 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3433 raise errors.OpExecError("There are some degraded disks for"
3436 feedback_fn("creating os for instance %s on node %s" %
3437 (instance, pnode_name))
3439 if iobj.disk_template != constants.DT_DISKLESS:
3440 if self.op.mode == constants.INSTANCE_CREATE:
3441 feedback_fn("* running the instance OS create scripts...")
3442 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3443 raise errors.OpExecError("could not add os for instance %s"
3445 (instance, pnode_name))
3447 elif self.op.mode == constants.INSTANCE_IMPORT:
3448 feedback_fn("* running the instance OS import scripts...")
3449 src_node = self.op.src_node
3450 src_image = self.src_image
3451 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3452 src_node, src_image):
3453 raise errors.OpExecError("Could not import os for instance"
3455 (instance, pnode_name))
3457 # also checked in the prereq part
3458 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3462 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3463 feedback_fn("* starting instance...")
3464 if not rpc.call_instance_start(pnode_name, iobj, None):
3465 raise errors.OpExecError("Could not start instance")
3468 class LUConnectConsole(NoHooksLU):
3469 """Connect to an instance's console.
3471 This is somewhat special in that it returns the command line that
3472 you need to run on the master node in order to connect to the
3476 _OP_REQP = ["instance_name"]
3479 def ExpandNames(self):
3480 self._ExpandAndLockInstance()
3482 def CheckPrereq(self):
3483 """Check prerequisites.
3485 This checks that the instance is in the cluster.
3488 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3489 assert self.instance is not None, \
3490 "Cannot retrieve locked instance %s" % self.op.instance_name
3492 def Exec(self, feedback_fn):
3493 """Connect to the console of an instance
3496 instance = self.instance
3497 node = instance.primary_node
3499 node_insts = rpc.call_instance_list([node])[node]
3500 if node_insts is False:
3501 raise errors.OpExecError("Can't connect to node %s." % node)
3503 if instance.name not in node_insts:
3504 raise errors.OpExecError("Instance %s is not running." % instance.name)
3506 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3508 hyper = hypervisor.GetHypervisor()
3509 console_cmd = hyper.GetShellCommandForConsole(instance)
3512 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3515 class LUReplaceDisks(LogicalUnit):
3516 """Replace the disks of an instance.
3519 HPATH = "mirrors-replace"
3520 HTYPE = constants.HTYPE_INSTANCE
3521 _OP_REQP = ["instance_name", "mode", "disks"]
3523 def _RunAllocator(self):
3524 """Compute a new secondary node using an IAllocator.
3527 ial = IAllocator(self.cfg, self.sstore,
3528 mode=constants.IALLOCATOR_MODE_RELOC,
3529 name=self.op.instance_name,
3530 relocate_from=[self.sec_node])
3532 ial.Run(self.op.iallocator)
3535 raise errors.OpPrereqError("Can't compute nodes using"
3536 " iallocator '%s': %s" % (self.op.iallocator,
3538 if len(ial.nodes) != ial.required_nodes:
3539 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3540 " of nodes (%s), required %s" %
3541 (len(ial.nodes), ial.required_nodes))
3542 self.op.remote_node = ial.nodes[0]
3543 logger.ToStdout("Selected new secondary for the instance: %s" %
3544 self.op.remote_node)
3546 def BuildHooksEnv(self):
3549 This runs on the master, the primary and all the secondaries.
3553 "MODE": self.op.mode,
3554 "NEW_SECONDARY": self.op.remote_node,
3555 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3557 env.update(_BuildInstanceHookEnvByObject(self.instance))
3559 self.sstore.GetMasterNode(),
3560 self.instance.primary_node,
3562 if self.op.remote_node is not None:
3563 nl.append(self.op.remote_node)
3566 def CheckPrereq(self):
3567 """Check prerequisites.
3569 This checks that the instance is in the cluster.
3572 if not hasattr(self.op, "remote_node"):
3573 self.op.remote_node = None
3575 instance = self.cfg.GetInstanceInfo(
3576 self.cfg.ExpandInstanceName(self.op.instance_name))
3577 if instance is None:
3578 raise errors.OpPrereqError("Instance '%s' not known" %
3579 self.op.instance_name)
3580 self.instance = instance
3581 self.op.instance_name = instance.name
3583 if instance.disk_template not in constants.DTS_NET_MIRROR:
3584 raise errors.OpPrereqError("Instance's disk layout is not"
3585 " network mirrored.")
3587 if len(instance.secondary_nodes) != 1:
3588 raise errors.OpPrereqError("The instance has a strange layout,"
3589 " expected one secondary but found %d" %
3590 len(instance.secondary_nodes))
3592 self.sec_node = instance.secondary_nodes[0]
3594 ia_name = getattr(self.op, "iallocator", None)
3595 if ia_name is not None:
3596 if self.op.remote_node is not None:
3597 raise errors.OpPrereqError("Give either the iallocator or the new"
3598 " secondary, not both")
3599 self._RunAllocator()
3601 remote_node = self.op.remote_node
3602 if remote_node is not None:
3603 remote_node = self.cfg.ExpandNodeName(remote_node)
3604 if remote_node is None:
3605 raise errors.OpPrereqError("Node '%s' not known" %
3606 self.op.remote_node)
3607 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3609 self.remote_node_info = None
3610 if remote_node == instance.primary_node:
3611 raise errors.OpPrereqError("The specified node is the primary node of"
3613 elif remote_node == self.sec_node:
3614 if self.op.mode == constants.REPLACE_DISK_SEC:
3615 # this is for DRBD8, where we can't execute the same mode of
3616 # replacement as for drbd7 (no different port allocated)
3617 raise errors.OpPrereqError("Same secondary given, cannot execute"
3619 if instance.disk_template == constants.DT_DRBD8:
3620 if (self.op.mode == constants.REPLACE_DISK_ALL and
3621 remote_node is not None):
3622 # switch to replace secondary mode
3623 self.op.mode = constants.REPLACE_DISK_SEC
3625 if self.op.mode == constants.REPLACE_DISK_ALL:
3626 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3627 " secondary disk replacement, not"
3629 elif self.op.mode == constants.REPLACE_DISK_PRI:
3630 if remote_node is not None:
3631 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3632 " the secondary while doing a primary"
3633 " node disk replacement")
3634 self.tgt_node = instance.primary_node
3635 self.oth_node = instance.secondary_nodes[0]
3636 elif self.op.mode == constants.REPLACE_DISK_SEC:
3637 self.new_node = remote_node # this can be None, in which case
3638 # we don't change the secondary
3639 self.tgt_node = instance.secondary_nodes[0]
3640 self.oth_node = instance.primary_node
3642 raise errors.ProgrammerError("Unhandled disk replace mode")
3644 for name in self.op.disks:
3645 if instance.FindDisk(name) is None:
3646 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3647 (name, instance.name))
3648 self.op.remote_node = remote_node
3650 def _ExecD8DiskOnly(self, feedback_fn):
3651 """Replace a disk on the primary or secondary for dbrd8.
3653 The algorithm for replace is quite complicated:
3654 - for each disk to be replaced:
3655 - create new LVs on the target node with unique names
3656 - detach old LVs from the drbd device
3657 - rename old LVs to name_replaced.<time_t>
3658 - rename new LVs to old LVs
3659 - attach the new LVs (with the old names now) to the drbd device
3660 - wait for sync across all devices
3661 - for each modified disk:
3662 - remove old LVs (which have the name name_replaces.<time_t>)
3664 Failures are not very well handled.
3668 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3669 instance = self.instance
3671 vgname = self.cfg.GetVGName()
3674 tgt_node = self.tgt_node
3675 oth_node = self.oth_node
3677 # Step: check device activation
3678 self.proc.LogStep(1, steps_total, "check device existence")
3679 info("checking volume groups")
3680 my_vg = cfg.GetVGName()
3681 results = rpc.call_vg_list([oth_node, tgt_node])
3683 raise errors.OpExecError("Can't list volume groups on the nodes")
3684 for node in oth_node, tgt_node:
3685 res = results.get(node, False)
3686 if not res or my_vg not in res:
3687 raise errors.OpExecError("Volume group '%s' not found on %s" %
3689 for dev in instance.disks:
3690 if not dev.iv_name in self.op.disks:
3692 for node in tgt_node, oth_node:
3693 info("checking %s on %s" % (dev.iv_name, node))
3694 cfg.SetDiskID(dev, node)
3695 if not rpc.call_blockdev_find(node, dev):
3696 raise errors.OpExecError("Can't find device %s on node %s" %
3697 (dev.iv_name, node))
3699 # Step: check other node consistency
3700 self.proc.LogStep(2, steps_total, "check peer consistency")
3701 for dev in instance.disks:
3702 if not dev.iv_name in self.op.disks:
3704 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3705 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3706 oth_node==instance.primary_node):
3707 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3708 " to replace disks on this node (%s)" %
3709 (oth_node, tgt_node))
3711 # Step: create new storage
3712 self.proc.LogStep(3, steps_total, "allocate new storage")
3713 for dev in instance.disks:
3714 if not dev.iv_name in self.op.disks:
3717 cfg.SetDiskID(dev, tgt_node)
3718 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3719 names = _GenerateUniqueNames(cfg, lv_names)
3720 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3721 logical_id=(vgname, names[0]))
3722 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3723 logical_id=(vgname, names[1]))
3724 new_lvs = [lv_data, lv_meta]
3725 old_lvs = dev.children
3726 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3727 info("creating new local storage on %s for %s" %
3728 (tgt_node, dev.iv_name))
3729 # since we *always* want to create this LV, we use the
3730 # _Create...OnPrimary (which forces the creation), even if we
3731 # are talking about the secondary node
3732 for new_lv in new_lvs:
3733 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3734 _GetInstanceInfoText(instance)):
3735 raise errors.OpExecError("Failed to create new LV named '%s' on"
3737 (new_lv.logical_id[1], tgt_node))
3739 # Step: for each lv, detach+rename*2+attach
3740 self.proc.LogStep(4, steps_total, "change drbd configuration")
3741 for dev, old_lvs, new_lvs in iv_names.itervalues():
3742 info("detaching %s drbd from local storage" % dev.iv_name)
3743 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3744 raise errors.OpExecError("Can't detach drbd from local storage on node"
3745 " %s for device %s" % (tgt_node, dev.iv_name))
3747 #cfg.Update(instance)
3749 # ok, we created the new LVs, so now we know we have the needed
3750 # storage; as such, we proceed on the target node to rename
3751 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3752 # using the assumption that logical_id == physical_id (which in
3753 # turn is the unique_id on that node)
3755 # FIXME(iustin): use a better name for the replaced LVs
3756 temp_suffix = int(time.time())
3757 ren_fn = lambda d, suff: (d.physical_id[0],
3758 d.physical_id[1] + "_replaced-%s" % suff)
3759 # build the rename list based on what LVs exist on the node
3761 for to_ren in old_lvs:
3762 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3763 if find_res is not None: # device exists
3764 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3766 info("renaming the old LVs on the target node")
3767 if not rpc.call_blockdev_rename(tgt_node, rlist):
3768 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3769 # now we rename the new LVs to the old LVs
3770 info("renaming the new LVs on the target node")
3771 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3772 if not rpc.call_blockdev_rename(tgt_node, rlist):
3773 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3775 for old, new in zip(old_lvs, new_lvs):
3776 new.logical_id = old.logical_id
3777 cfg.SetDiskID(new, tgt_node)
3779 for disk in old_lvs:
3780 disk.logical_id = ren_fn(disk, temp_suffix)
3781 cfg.SetDiskID(disk, tgt_node)
3783 # now that the new lvs have the old name, we can add them to the device
3784 info("adding new mirror component on %s" % tgt_node)
3785 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3786 for new_lv in new_lvs:
3787 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3788 warning("Can't rollback device %s", hint="manually cleanup unused"
3790 raise errors.OpExecError("Can't add local storage to drbd")
3792 dev.children = new_lvs
3793 cfg.Update(instance)
3795 # Step: wait for sync
3797 # this can fail as the old devices are degraded and _WaitForSync
3798 # does a combined result over all disks, so we don't check its
3800 self.proc.LogStep(5, steps_total, "sync devices")
3801 _WaitForSync(cfg, instance, self.proc, unlock=True)
3803 # so check manually all the devices
3804 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3805 cfg.SetDiskID(dev, instance.primary_node)
3806 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3808 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3810 # Step: remove old storage
3811 self.proc.LogStep(6, steps_total, "removing old storage")
3812 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3813 info("remove logical volumes for %s" % name)
3815 cfg.SetDiskID(lv, tgt_node)
3816 if not rpc.call_blockdev_remove(tgt_node, lv):
3817 warning("Can't remove old LV", hint="manually remove unused LVs")
3820 def _ExecD8Secondary(self, feedback_fn):
3821 """Replace the secondary node for drbd8.
3823 The algorithm for replace is quite complicated:
3824 - for all disks of the instance:
3825 - create new LVs on the new node with same names
3826 - shutdown the drbd device on the old secondary
3827 - disconnect the drbd network on the primary
3828 - create the drbd device on the new secondary
3829 - network attach the drbd on the primary, using an artifice:
3830 the drbd code for Attach() will connect to the network if it
3831 finds a device which is connected to the good local disks but
3833 - wait for sync across all devices
3834 - remove all disks from the old secondary
3836 Failures are not very well handled.
3840 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3841 instance = self.instance
3843 vgname = self.cfg.GetVGName()
3846 old_node = self.tgt_node
3847 new_node = self.new_node
3848 pri_node = instance.primary_node
3850 # Step: check device activation
3851 self.proc.LogStep(1, steps_total, "check device existence")
3852 info("checking volume groups")
3853 my_vg = cfg.GetVGName()
3854 results = rpc.call_vg_list([pri_node, new_node])
3856 raise errors.OpExecError("Can't list volume groups on the nodes")
3857 for node in pri_node, new_node:
3858 res = results.get(node, False)
3859 if not res or my_vg not in res:
3860 raise errors.OpExecError("Volume group '%s' not found on %s" %
3862 for dev in instance.disks:
3863 if not dev.iv_name in self.op.disks:
3865 info("checking %s on %s" % (dev.iv_name, pri_node))
3866 cfg.SetDiskID(dev, pri_node)
3867 if not rpc.call_blockdev_find(pri_node, dev):
3868 raise errors.OpExecError("Can't find device %s on node %s" %
3869 (dev.iv_name, pri_node))
3871 # Step: check other node consistency
3872 self.proc.LogStep(2, steps_total, "check peer consistency")
3873 for dev in instance.disks:
3874 if not dev.iv_name in self.op.disks:
3876 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3877 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3878 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3879 " unsafe to replace the secondary" %
3882 # Step: create new storage
3883 self.proc.LogStep(3, steps_total, "allocate new storage")
3884 for dev in instance.disks:
3886 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3887 # since we *always* want to create this LV, we use the
3888 # _Create...OnPrimary (which forces the creation), even if we
3889 # are talking about the secondary node
3890 for new_lv in dev.children:
3891 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3892 _GetInstanceInfoText(instance)):
3893 raise errors.OpExecError("Failed to create new LV named '%s' on"
3895 (new_lv.logical_id[1], new_node))
3897 iv_names[dev.iv_name] = (dev, dev.children)
3899 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3900 for dev in instance.disks:
3902 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3903 # create new devices on new_node
3904 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3905 logical_id=(pri_node, new_node,
3907 children=dev.children)
3908 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3910 _GetInstanceInfoText(instance)):
3911 raise errors.OpExecError("Failed to create new DRBD on"
3912 " node '%s'" % new_node)
3914 for dev in instance.disks:
3915 # we have new devices, shutdown the drbd on the old secondary
3916 info("shutting down drbd for %s on old node" % dev.iv_name)
3917 cfg.SetDiskID(dev, old_node)
3918 if not rpc.call_blockdev_shutdown(old_node, dev):
3919 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3920 hint="Please cleanup this device manually as soon as possible")
3922 info("detaching primary drbds from the network (=> standalone)")
3924 for dev in instance.disks:
3925 cfg.SetDiskID(dev, pri_node)
3926 # set the physical (unique in bdev terms) id to None, meaning
3927 # detach from network
3928 dev.physical_id = (None,) * len(dev.physical_id)
3929 # and 'find' the device, which will 'fix' it to match the
3931 if rpc.call_blockdev_find(pri_node, dev):
3934 warning("Failed to detach drbd %s from network, unusual case" %
3938 # no detaches succeeded (very unlikely)
3939 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3941 # if we managed to detach at least one, we update all the disks of
3942 # the instance to point to the new secondary
3943 info("updating instance configuration")
3944 for dev in instance.disks:
3945 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3946 cfg.SetDiskID(dev, pri_node)
3947 cfg.Update(instance)
3949 # and now perform the drbd attach
3950 info("attaching primary drbds to new secondary (standalone => connected)")
3952 for dev in instance.disks:
3953 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3954 # since the attach is smart, it's enough to 'find' the device,
3955 # it will automatically activate the network, if the physical_id
3957 cfg.SetDiskID(dev, pri_node)
3958 if not rpc.call_blockdev_find(pri_node, dev):
3959 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3960 "please do a gnt-instance info to see the status of disks")
3962 # this can fail as the old devices are degraded and _WaitForSync
3963 # does a combined result over all disks, so we don't check its
3965 self.proc.LogStep(5, steps_total, "sync devices")
3966 _WaitForSync(cfg, instance, self.proc, unlock=True)
3968 # so check manually all the devices
3969 for name, (dev, old_lvs) in iv_names.iteritems():
3970 cfg.SetDiskID(dev, pri_node)
3971 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3973 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3975 self.proc.LogStep(6, steps_total, "removing old storage")
3976 for name, (dev, old_lvs) in iv_names.iteritems():
3977 info("remove logical volumes for %s" % name)
3979 cfg.SetDiskID(lv, old_node)
3980 if not rpc.call_blockdev_remove(old_node, lv):
3981 warning("Can't remove LV on old secondary",
3982 hint="Cleanup stale volumes by hand")
3984 def Exec(self, feedback_fn):
3985 """Execute disk replacement.
3987 This dispatches the disk replacement to the appropriate handler.
3990 instance = self.instance
3992 # Activate the instance disks if we're replacing them on a down instance
3993 if instance.status == "down":
3994 _StartInstanceDisks(self.cfg, instance, True)
3996 if instance.disk_template == constants.DT_DRBD8:
3997 if self.op.remote_node is None:
3998 fn = self._ExecD8DiskOnly
4000 fn = self._ExecD8Secondary
4002 raise errors.ProgrammerError("Unhandled disk replacement case")
4004 ret = fn(feedback_fn)
4006 # Deactivate the instance disks if we're replacing them on a down instance
4007 if instance.status == "down":
4008 _SafeShutdownInstanceDisks(instance, self.cfg)
4013 class LUGrowDisk(LogicalUnit):
4014 """Grow a disk of an instance.
4018 HTYPE = constants.HTYPE_INSTANCE
4019 _OP_REQP = ["instance_name", "disk", "amount"]
4022 def ExpandNames(self):
4023 self._ExpandAndLockInstance()
4024 self.needed_locks[locking.LEVEL_NODE] = []
4025 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4027 def DeclareLocks(self, level):
4028 if level == locking.LEVEL_NODE:
4029 self._LockInstancesNodes()
4031 def BuildHooksEnv(self):
4034 This runs on the master, the primary and all the secondaries.
4038 "DISK": self.op.disk,
4039 "AMOUNT": self.op.amount,
4041 env.update(_BuildInstanceHookEnvByObject(self.instance))
4043 self.sstore.GetMasterNode(),
4044 self.instance.primary_node,
4048 def CheckPrereq(self):
4049 """Check prerequisites.
4051 This checks that the instance is in the cluster.
4054 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4055 assert instance is not None, \
4056 "Cannot retrieve locked instance %s" % self.op.instance_name
4058 self.instance = instance
4060 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4061 raise errors.OpPrereqError("Instance's disk layout does not support"
4064 if instance.FindDisk(self.op.disk) is None:
4065 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4066 (self.op.disk, instance.name))
4068 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4069 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4070 for node in nodenames:
4071 info = nodeinfo.get(node, None)
4073 raise errors.OpPrereqError("Cannot get current information"
4074 " from node '%s'" % node)
4075 vg_free = info.get('vg_free', None)
4076 if not isinstance(vg_free, int):
4077 raise errors.OpPrereqError("Can't compute free disk space on"
4079 if self.op.amount > info['vg_free']:
4080 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4081 " %d MiB available, %d MiB required" %
4082 (node, info['vg_free'], self.op.amount))
4084 def Exec(self, feedback_fn):
4085 """Execute disk grow.
4088 instance = self.instance
4089 disk = instance.FindDisk(self.op.disk)
4090 for node in (instance.secondary_nodes + (instance.primary_node,)):
4091 self.cfg.SetDiskID(disk, node)
4092 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4093 if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4094 raise errors.OpExecError("grow request failed to node %s" % node)
4096 raise errors.OpExecError("grow request failed to node %s: %s" %
4098 disk.RecordGrow(self.op.amount)
4099 self.cfg.Update(instance)
4103 class LUQueryInstanceData(NoHooksLU):
4104 """Query runtime instance data.
4107 _OP_REQP = ["instances"]
4109 def CheckPrereq(self):
4110 """Check prerequisites.
4112 This only checks the optional instance list against the existing names.
4115 if not isinstance(self.op.instances, list):
4116 raise errors.OpPrereqError("Invalid argument type 'instances'")
4117 if self.op.instances:
4118 self.wanted_instances = []
4119 names = self.op.instances
4121 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4122 if instance is None:
4123 raise errors.OpPrereqError("No such instance name '%s'" % name)
4124 self.wanted_instances.append(instance)
4126 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4127 in self.cfg.GetInstanceList()]
4131 def _ComputeDiskStatus(self, instance, snode, dev):
4132 """Compute block device status.
4135 self.cfg.SetDiskID(dev, instance.primary_node)
4136 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4137 if dev.dev_type in constants.LDS_DRBD:
4138 # we change the snode then (otherwise we use the one passed in)
4139 if dev.logical_id[0] == instance.primary_node:
4140 snode = dev.logical_id[1]
4142 snode = dev.logical_id[0]
4145 self.cfg.SetDiskID(dev, snode)
4146 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4151 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4152 for child in dev.children]
4157 "iv_name": dev.iv_name,
4158 "dev_type": dev.dev_type,
4159 "logical_id": dev.logical_id,
4160 "physical_id": dev.physical_id,
4161 "pstatus": dev_pstatus,
4162 "sstatus": dev_sstatus,
4163 "children": dev_children,
4168 def Exec(self, feedback_fn):
4169 """Gather and return data"""
4171 for instance in self.wanted_instances:
4172 remote_info = rpc.call_instance_info(instance.primary_node,
4174 if remote_info and "state" in remote_info:
4177 remote_state = "down"
4178 if instance.status == "down":
4179 config_state = "down"
4183 disks = [self._ComputeDiskStatus(instance, None, device)
4184 for device in instance.disks]
4187 "name": instance.name,
4188 "config_state": config_state,
4189 "run_state": remote_state,
4190 "pnode": instance.primary_node,
4191 "snodes": instance.secondary_nodes,
4193 "memory": instance.memory,
4194 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4196 "vcpus": instance.vcpus,
4199 htkind = self.sstore.GetHypervisorType()
4200 if htkind == constants.HT_XEN_PVM30:
4201 idict["kernel_path"] = instance.kernel_path
4202 idict["initrd_path"] = instance.initrd_path
4204 if htkind == constants.HT_XEN_HVM31:
4205 idict["hvm_boot_order"] = instance.hvm_boot_order
4206 idict["hvm_acpi"] = instance.hvm_acpi
4207 idict["hvm_pae"] = instance.hvm_pae
4208 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4209 idict["hvm_nic_type"] = instance.hvm_nic_type
4210 idict["hvm_disk_type"] = instance.hvm_disk_type
4212 if htkind in constants.HTS_REQ_PORT:
4213 if instance.vnc_bind_address is None:
4214 vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4216 vnc_bind_address = instance.vnc_bind_address
4217 if instance.network_port is None:
4218 vnc_console_port = None
4219 elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4220 vnc_console_port = "%s:%s" % (instance.primary_node,
4221 instance.network_port)
4222 elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4223 vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4224 instance.network_port,
4225 instance.primary_node)
4227 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4228 instance.network_port)
4229 idict["vnc_console_port"] = vnc_console_port
4230 idict["vnc_bind_address"] = vnc_bind_address
4231 idict["network_port"] = instance.network_port
4233 result[instance.name] = idict
4238 class LUSetInstanceParams(LogicalUnit):
4239 """Modifies an instances's parameters.
4242 HPATH = "instance-modify"
4243 HTYPE = constants.HTYPE_INSTANCE
4244 _OP_REQP = ["instance_name"]
4247 def ExpandNames(self):
4248 self._ExpandAndLockInstance()
4250 def BuildHooksEnv(self):
4253 This runs on the master, primary and secondaries.
4258 args['memory'] = self.mem
4260 args['vcpus'] = self.vcpus
4261 if self.do_ip or self.do_bridge or self.mac:
4265 ip = self.instance.nics[0].ip
4267 bridge = self.bridge
4269 bridge = self.instance.nics[0].bridge
4273 mac = self.instance.nics[0].mac
4274 args['nics'] = [(ip, bridge, mac)]
4275 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4276 nl = [self.sstore.GetMasterNode(),
4277 self.instance.primary_node] + list(self.instance.secondary_nodes)
4280 def CheckPrereq(self):
4281 """Check prerequisites.
4283 This only checks the instance list against the existing names.
4286 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4287 # a separate CheckArguments function, if we implement one, so the operation
4288 # can be aborted without waiting for any lock, should it have an error...
4289 self.mem = getattr(self.op, "mem", None)
4290 self.vcpus = getattr(self.op, "vcpus", None)
4291 self.ip = getattr(self.op, "ip", None)
4292 self.mac = getattr(self.op, "mac", None)
4293 self.bridge = getattr(self.op, "bridge", None)
4294 self.kernel_path = getattr(self.op, "kernel_path", None)
4295 self.initrd_path = getattr(self.op, "initrd_path", None)
4296 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4297 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4298 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4299 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4300 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4301 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4302 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4303 self.force = getattr(self.op, "force", None)
4304 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4305 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4306 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4307 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4308 if all_parms.count(None) == len(all_parms):
4309 raise errors.OpPrereqError("No changes submitted")
4310 if self.mem is not None:
4312 self.mem = int(self.mem)
4313 except ValueError, err:
4314 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4315 if self.vcpus is not None:
4317 self.vcpus = int(self.vcpus)
4318 except ValueError, err:
4319 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4320 if self.ip is not None:
4322 if self.ip.lower() == "none":
4325 if not utils.IsValidIP(self.ip):
4326 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4329 self.do_bridge = (self.bridge is not None)
4330 if self.mac is not None:
4331 if self.cfg.IsMacInUse(self.mac):
4332 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4334 if not utils.IsValidMac(self.mac):
4335 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4337 if self.kernel_path is not None:
4338 self.do_kernel_path = True
4339 if self.kernel_path == constants.VALUE_NONE:
4340 raise errors.OpPrereqError("Can't set instance to no kernel")
4342 if self.kernel_path != constants.VALUE_DEFAULT:
4343 if not os.path.isabs(self.kernel_path):
4344 raise errors.OpPrereqError("The kernel path must be an absolute"
4347 self.do_kernel_path = False
4349 if self.initrd_path is not None:
4350 self.do_initrd_path = True
4351 if self.initrd_path not in (constants.VALUE_NONE,
4352 constants.VALUE_DEFAULT):
4353 if not os.path.isabs(self.initrd_path):
4354 raise errors.OpPrereqError("The initrd path must be an absolute"
4357 self.do_initrd_path = False
4359 # boot order verification
4360 if self.hvm_boot_order is not None:
4361 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4362 if len(self.hvm_boot_order.strip("acdn")) != 0:
4363 raise errors.OpPrereqError("invalid boot order specified,"
4364 " must be one or more of [acdn]"
4367 # hvm_cdrom_image_path verification
4368 if self.op.hvm_cdrom_image_path is not None:
4369 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4370 self.op.hvm_cdrom_image_path.lower() == "none"):
4371 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4372 " be an absolute path or None, not %s" %
4373 self.op.hvm_cdrom_image_path)
4374 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4375 self.op.hvm_cdrom_image_path.lower() == "none"):
4376 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4377 " regular file or a symlink pointing to"
4378 " an existing regular file, not %s" %
4379 self.op.hvm_cdrom_image_path)
4381 # vnc_bind_address verification
4382 if self.op.vnc_bind_address is not None:
4383 if not utils.IsValidIP(self.op.vnc_bind_address):
4384 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4385 " like a valid IP address" %
4386 self.op.vnc_bind_address)
4388 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4389 assert self.instance is not None, \
4390 "Cannot retrieve locked instance %s" % self.op.instance_name
4392 if self.mem is not None and not self.force:
4393 pnode = self.instance.primary_node
4395 nodelist.extend(instance.secondary_nodes)
4396 instance_info = rpc.call_instance_info(pnode, instance.name)
4397 nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4399 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4400 # Assume the primary node is unreachable and go ahead
4401 self.warn.append("Can't get info from primary node %s" % pnode)
4404 current_mem = instance_info['memory']
4406 # Assume instance not running
4407 # (there is a slight race condition here, but it's not very probable,
4408 # and we have no other way to check)
4410 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4412 raise errors.OpPrereqError("This change will prevent the instance"
4413 " from starting, due to %d MB of memory"
4414 " missing on its primary node" % miss_mem)
4416 for node in instance.secondary_nodes:
4417 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4418 self.warn.append("Can't get info from secondary node %s" % node)
4419 elif self.mem > nodeinfo[node]['memory_free']:
4420 self.warn.append("Not enough memory to failover instance to secondary"
4423 # Xen HVM device type checks
4424 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4425 if self.op.hvm_nic_type is not None:
4426 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4427 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4428 " HVM hypervisor" % self.op.hvm_nic_type)
4429 if self.op.hvm_disk_type is not None:
4430 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4431 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4432 " HVM hypervisor" % self.op.hvm_disk_type)
4436 def Exec(self, feedback_fn):
4437 """Modifies an instance.
4439 All parameters take effect only at the next restart of the instance.
4441 # Process here the warnings from CheckPrereq, as we don't have a
4442 # feedback_fn there.
4443 for warn in self.warn:
4444 feedback_fn("WARNING: %s" % warn)
4447 instance = self.instance
4449 instance.memory = self.mem
4450 result.append(("mem", self.mem))
4452 instance.vcpus = self.vcpus
4453 result.append(("vcpus", self.vcpus))
4455 instance.nics[0].ip = self.ip
4456 result.append(("ip", self.ip))
4458 instance.nics[0].bridge = self.bridge
4459 result.append(("bridge", self.bridge))
4461 instance.nics[0].mac = self.mac
4462 result.append(("mac", self.mac))
4463 if self.do_kernel_path:
4464 instance.kernel_path = self.kernel_path
4465 result.append(("kernel_path", self.kernel_path))
4466 if self.do_initrd_path:
4467 instance.initrd_path = self.initrd_path
4468 result.append(("initrd_path", self.initrd_path))
4469 if self.hvm_boot_order:
4470 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4471 instance.hvm_boot_order = None
4473 instance.hvm_boot_order = self.hvm_boot_order
4474 result.append(("hvm_boot_order", self.hvm_boot_order))
4475 if self.hvm_acpi is not None:
4476 instance.hvm_acpi = self.hvm_acpi
4477 result.append(("hvm_acpi", self.hvm_acpi))
4478 if self.hvm_pae is not None:
4479 instance.hvm_pae = self.hvm_pae
4480 result.append(("hvm_pae", self.hvm_pae))
4481 if self.hvm_nic_type is not None:
4482 instance.hvm_nic_type = self.hvm_nic_type
4483 result.append(("hvm_nic_type", self.hvm_nic_type))
4484 if self.hvm_disk_type is not None:
4485 instance.hvm_disk_type = self.hvm_disk_type
4486 result.append(("hvm_disk_type", self.hvm_disk_type))
4487 if self.hvm_cdrom_image_path:
4488 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4489 instance.hvm_cdrom_image_path = None
4491 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4492 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4493 if self.vnc_bind_address:
4494 instance.vnc_bind_address = self.vnc_bind_address
4495 result.append(("vnc_bind_address", self.vnc_bind_address))
4497 self.cfg.Update(instance)
4502 class LUQueryExports(NoHooksLU):
4503 """Query the exports list
4506 _OP_REQP = ['nodes']
4509 def ExpandNames(self):
4510 self.needed_locks = {}
4511 self.share_locks[locking.LEVEL_NODE] = 1
4512 if not self.op.nodes:
4513 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4515 self.needed_locks[locking.LEVEL_NODE] = \
4516 _GetWantedNodes(self, self.op.nodes)
4518 def CheckPrereq(self):
4519 """Check prerequisites.
4522 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4524 def Exec(self, feedback_fn):
4525 """Compute the list of all the exported system images.
4528 a dictionary with the structure node->(export-list)
4529 where export-list is a list of the instances exported on
4533 return rpc.call_export_list(self.nodes)
4536 class LUExportInstance(LogicalUnit):
4537 """Export an instance to an image in the cluster.
4540 HPATH = "instance-export"
4541 HTYPE = constants.HTYPE_INSTANCE
4542 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4545 def ExpandNames(self):
4546 self._ExpandAndLockInstance()
4547 # FIXME: lock only instance primary and destination node
4549 # Sad but true, for now we have do lock all nodes, as we don't know where
4550 # the previous export might be, and and in this LU we search for it and
4551 # remove it from its current node. In the future we could fix this by:
4552 # - making a tasklet to search (share-lock all), then create the new one,
4553 # then one to remove, after
4554 # - removing the removal operation altoghether
4555 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4557 def DeclareLocks(self, level):
4558 """Last minute lock declaration."""
4559 # All nodes are locked anyway, so nothing to do here.
4561 def BuildHooksEnv(self):
4564 This will run on the master, primary node and target node.
4568 "EXPORT_NODE": self.op.target_node,
4569 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4571 env.update(_BuildInstanceHookEnvByObject(self.instance))
4572 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4573 self.op.target_node]
4576 def CheckPrereq(self):
4577 """Check prerequisites.
4579 This checks that the instance and node names are valid.
4582 instance_name = self.op.instance_name
4583 self.instance = self.cfg.GetInstanceInfo(instance_name)
4584 assert self.instance is not None, \
4585 "Cannot retrieve locked instance %s" % self.op.instance_name
4587 self.dst_node = self.cfg.GetNodeInfo(
4588 self.cfg.ExpandNodeName(self.op.target_node))
4590 assert self.dst_node is not None, \
4591 "Cannot retrieve locked node %s" % self.op.target_node
4593 # instance disk type verification
4594 for disk in self.instance.disks:
4595 if disk.dev_type == constants.LD_FILE:
4596 raise errors.OpPrereqError("Export not supported for instances with"
4597 " file-based disks")
4599 def Exec(self, feedback_fn):
4600 """Export an instance to an image in the cluster.
4603 instance = self.instance
4604 dst_node = self.dst_node
4605 src_node = instance.primary_node
4606 if self.op.shutdown:
4607 # shutdown the instance, but not the disks
4608 if not rpc.call_instance_shutdown(src_node, instance):
4609 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4610 (instance.name, src_node))
4612 vgname = self.cfg.GetVGName()
4617 for disk in instance.disks:
4618 if disk.iv_name == "sda":
4619 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4620 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4622 if not new_dev_name:
4623 logger.Error("could not snapshot block device %s on node %s" %
4624 (disk.logical_id[1], src_node))
4626 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4627 logical_id=(vgname, new_dev_name),
4628 physical_id=(vgname, new_dev_name),
4629 iv_name=disk.iv_name)
4630 snap_disks.append(new_dev)
4633 if self.op.shutdown and instance.status == "up":
4634 if not rpc.call_instance_start(src_node, instance, None):
4635 _ShutdownInstanceDisks(instance, self.cfg)
4636 raise errors.OpExecError("Could not start instance")
4638 # TODO: check for size
4640 for dev in snap_disks:
4641 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4642 logger.Error("could not export block device %s from node %s to node %s"
4643 % (dev.logical_id[1], src_node, dst_node.name))
4644 if not rpc.call_blockdev_remove(src_node, dev):
4645 logger.Error("could not remove snapshot block device %s from node %s" %
4646 (dev.logical_id[1], src_node))
4648 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4649 logger.Error("could not finalize export for instance %s on node %s" %
4650 (instance.name, dst_node.name))
4652 nodelist = self.cfg.GetNodeList()
4653 nodelist.remove(dst_node.name)
4655 # on one-node clusters nodelist will be empty after the removal
4656 # if we proceed the backup would be removed because OpQueryExports
4657 # substitutes an empty list with the full cluster node list.
4659 exportlist = rpc.call_export_list(nodelist)
4660 for node in exportlist:
4661 if instance.name in exportlist[node]:
4662 if not rpc.call_export_remove(node, instance.name):
4663 logger.Error("could not remove older export for instance %s"
4664 " on node %s" % (instance.name, node))
4667 class LURemoveExport(NoHooksLU):
4668 """Remove exports related to the named instance.
4671 _OP_REQP = ["instance_name"]
4673 def CheckPrereq(self):
4674 """Check prerequisites.
4678 def Exec(self, feedback_fn):
4679 """Remove any export.
4682 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4683 # If the instance was not found we'll try with the name that was passed in.
4684 # This will only work if it was an FQDN, though.
4686 if not instance_name:
4688 instance_name = self.op.instance_name
4690 exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4692 for node in exportlist:
4693 if instance_name in exportlist[node]:
4695 if not rpc.call_export_remove(node, instance_name):
4696 logger.Error("could not remove export for instance %s"
4697 " on node %s" % (instance_name, node))
4699 if fqdn_warn and not found:
4700 feedback_fn("Export not found. If trying to remove an export belonging"
4701 " to a deleted instance please use its Fully Qualified"
4705 class TagsLU(NoHooksLU):
4708 This is an abstract class which is the parent of all the other tags LUs.
4711 def CheckPrereq(self):
4712 """Check prerequisites.
4715 if self.op.kind == constants.TAG_CLUSTER:
4716 self.target = self.cfg.GetClusterInfo()
4717 elif self.op.kind == constants.TAG_NODE:
4718 name = self.cfg.ExpandNodeName(self.op.name)
4720 raise errors.OpPrereqError("Invalid node name (%s)" %
4723 self.target = self.cfg.GetNodeInfo(name)
4724 elif self.op.kind == constants.TAG_INSTANCE:
4725 name = self.cfg.ExpandInstanceName(self.op.name)
4727 raise errors.OpPrereqError("Invalid instance name (%s)" %
4730 self.target = self.cfg.GetInstanceInfo(name)
4732 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4736 class LUGetTags(TagsLU):
4737 """Returns the tags of a given object.
4740 _OP_REQP = ["kind", "name"]
4742 def Exec(self, feedback_fn):
4743 """Returns the tag list.
4746 return list(self.target.GetTags())
4749 class LUSearchTags(NoHooksLU):
4750 """Searches the tags for a given pattern.
4753 _OP_REQP = ["pattern"]
4755 def CheckPrereq(self):
4756 """Check prerequisites.
4758 This checks the pattern passed for validity by compiling it.
4762 self.re = re.compile(self.op.pattern)
4763 except re.error, err:
4764 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4765 (self.op.pattern, err))
4767 def Exec(self, feedback_fn):
4768 """Returns the tag list.
4772 tgts = [("/cluster", cfg.GetClusterInfo())]
4773 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4774 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4775 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4776 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4778 for path, target in tgts:
4779 for tag in target.GetTags():
4780 if self.re.search(tag):
4781 results.append((path, tag))
4785 class LUAddTags(TagsLU):
4786 """Sets a tag on a given object.
4789 _OP_REQP = ["kind", "name", "tags"]
4791 def CheckPrereq(self):
4792 """Check prerequisites.
4794 This checks the type and length of the tag name and value.
4797 TagsLU.CheckPrereq(self)
4798 for tag in self.op.tags:
4799 objects.TaggableObject.ValidateTag(tag)
4801 def Exec(self, feedback_fn):
4806 for tag in self.op.tags:
4807 self.target.AddTag(tag)
4808 except errors.TagError, err:
4809 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4811 self.cfg.Update(self.target)
4812 except errors.ConfigurationError:
4813 raise errors.OpRetryError("There has been a modification to the"
4814 " config file and the operation has been"
4815 " aborted. Please retry.")
4818 class LUDelTags(TagsLU):
4819 """Delete a list of tags from a given object.
4822 _OP_REQP = ["kind", "name", "tags"]
4824 def CheckPrereq(self):
4825 """Check prerequisites.
4827 This checks that we have the given tag.
4830 TagsLU.CheckPrereq(self)
4831 for tag in self.op.tags:
4832 objects.TaggableObject.ValidateTag(tag)
4833 del_tags = frozenset(self.op.tags)
4834 cur_tags = self.target.GetTags()
4835 if not del_tags <= cur_tags:
4836 diff_tags = del_tags - cur_tags
4837 diff_names = ["'%s'" % tag for tag in diff_tags]
4839 raise errors.OpPrereqError("Tag(s) %s not found" %
4840 (",".join(diff_names)))
4842 def Exec(self, feedback_fn):
4843 """Remove the tag from the object.
4846 for tag in self.op.tags:
4847 self.target.RemoveTag(tag)
4849 self.cfg.Update(self.target)
4850 except errors.ConfigurationError:
4851 raise errors.OpRetryError("There has been a modification to the"
4852 " config file and the operation has been"
4853 " aborted. Please retry.")
4856 class LUTestDelay(NoHooksLU):
4857 """Sleep for a specified amount of time.
4859 This LU sleeps on the master and/or nodes for a specified amount of
4863 _OP_REQP = ["duration", "on_master", "on_nodes"]
4866 def ExpandNames(self):
4867 """Expand names and set required locks.
4869 This expands the node list, if any.
4872 self.needed_locks = {}
4873 if self.op.on_nodes:
4874 # _GetWantedNodes can be used here, but is not always appropriate to use
4875 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4877 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4878 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4880 def CheckPrereq(self):
4881 """Check prerequisites.
4885 def Exec(self, feedback_fn):
4886 """Do the actual sleep.
4889 if self.op.on_master:
4890 if not utils.TestDelay(self.op.duration):
4891 raise errors.OpExecError("Error during master delay test")
4892 if self.op.on_nodes:
4893 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4895 raise errors.OpExecError("Complete failure from rpc call")
4896 for node, node_result in result.items():
4898 raise errors.OpExecError("Failure during rpc call to node %s,"
4899 " result: %s" % (node, node_result))
4902 class IAllocator(object):
4903 """IAllocator framework.
4905 An IAllocator instance has three sets of attributes:
4906 - cfg/sstore that are needed to query the cluster
4907 - input data (all members of the _KEYS class attribute are required)
4908 - four buffer attributes (in|out_data|text), that represent the
4909 input (to the external script) in text and data structure format,
4910 and the output from it, again in two formats
4911 - the result variables from the script (success, info, nodes) for
4916 "mem_size", "disks", "disk_template",
4917 "os", "tags", "nics", "vcpus",
4923 def __init__(self, cfg, sstore, mode, name, **kwargs):
4925 self.sstore = sstore
4926 # init buffer variables
4927 self.in_text = self.out_text = self.in_data = self.out_data = None
4928 # init all input fields so that pylint is happy
4931 self.mem_size = self.disks = self.disk_template = None
4932 self.os = self.tags = self.nics = self.vcpus = None
4933 self.relocate_from = None
4935 self.required_nodes = None
4936 # init result fields
4937 self.success = self.info = self.nodes = None
4938 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4939 keyset = self._ALLO_KEYS
4940 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4941 keyset = self._RELO_KEYS
4943 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4944 " IAllocator" % self.mode)
4946 if key not in keyset:
4947 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4948 " IAllocator" % key)
4949 setattr(self, key, kwargs[key])
4951 if key not in kwargs:
4952 raise errors.ProgrammerError("Missing input parameter '%s' to"
4953 " IAllocator" % key)
4954 self._BuildInputData()
4956 def _ComputeClusterData(self):
4957 """Compute the generic allocator input data.
4959 This is the data that is independent of the actual operation.
4966 "cluster_name": self.sstore.GetClusterName(),
4967 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4968 "hypervisor_type": self.sstore.GetHypervisorType(),
4969 # we don't have job IDs
4972 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4976 node_list = cfg.GetNodeList()
4977 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4978 for nname in node_list:
4979 ninfo = cfg.GetNodeInfo(nname)
4980 if nname not in node_data or not isinstance(node_data[nname], dict):
4981 raise errors.OpExecError("Can't get data for node %s" % nname)
4982 remote_info = node_data[nname]
4983 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4984 'vg_size', 'vg_free', 'cpu_total']:
4985 if attr not in remote_info:
4986 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4989 remote_info[attr] = int(remote_info[attr])
4990 except ValueError, err:
4991 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4992 " %s" % (nname, attr, str(err)))
4993 # compute memory used by primary instances
4994 i_p_mem = i_p_up_mem = 0
4995 for iinfo in i_list:
4996 if iinfo.primary_node == nname:
4997 i_p_mem += iinfo.memory
4998 if iinfo.status == "up":
4999 i_p_up_mem += iinfo.memory
5001 # compute memory used by instances
5003 "tags": list(ninfo.GetTags()),
5004 "total_memory": remote_info['memory_total'],
5005 "reserved_memory": remote_info['memory_dom0'],
5006 "free_memory": remote_info['memory_free'],
5007 "i_pri_memory": i_p_mem,
5008 "i_pri_up_memory": i_p_up_mem,
5009 "total_disk": remote_info['vg_size'],
5010 "free_disk": remote_info['vg_free'],
5011 "primary_ip": ninfo.primary_ip,
5012 "secondary_ip": ninfo.secondary_ip,
5013 "total_cpus": remote_info['cpu_total'],
5015 node_results[nname] = pnr
5016 data["nodes"] = node_results
5020 for iinfo in i_list:
5021 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5022 for n in iinfo.nics]
5024 "tags": list(iinfo.GetTags()),
5025 "should_run": iinfo.status == "up",
5026 "vcpus": iinfo.vcpus,
5027 "memory": iinfo.memory,
5029 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5031 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5032 "disk_template": iinfo.disk_template,
5034 instance_data[iinfo.name] = pir
5036 data["instances"] = instance_data
5040 def _AddNewInstance(self):
5041 """Add new instance data to allocator structure.
5043 This in combination with _AllocatorGetClusterData will create the
5044 correct structure needed as input for the allocator.
5046 The checks for the completeness of the opcode must have already been
5051 if len(self.disks) != 2:
5052 raise errors.OpExecError("Only two-disk configurations supported")
5054 disk_space = _ComputeDiskSize(self.disk_template,
5055 self.disks[0]["size"], self.disks[1]["size"])
5057 if self.disk_template in constants.DTS_NET_MIRROR:
5058 self.required_nodes = 2
5060 self.required_nodes = 1
5064 "disk_template": self.disk_template,
5067 "vcpus": self.vcpus,
5068 "memory": self.mem_size,
5069 "disks": self.disks,
5070 "disk_space_total": disk_space,
5072 "required_nodes": self.required_nodes,
5074 data["request"] = request
5076 def _AddRelocateInstance(self):
5077 """Add relocate instance data to allocator structure.
5079 This in combination with _IAllocatorGetClusterData will create the
5080 correct structure needed as input for the allocator.
5082 The checks for the completeness of the opcode must have already been
5086 instance = self.cfg.GetInstanceInfo(self.name)
5087 if instance is None:
5088 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5089 " IAllocator" % self.name)
5091 if instance.disk_template not in constants.DTS_NET_MIRROR:
5092 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5094 if len(instance.secondary_nodes) != 1:
5095 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5097 self.required_nodes = 1
5099 disk_space = _ComputeDiskSize(instance.disk_template,
5100 instance.disks[0].size,
5101 instance.disks[1].size)
5106 "disk_space_total": disk_space,
5107 "required_nodes": self.required_nodes,
5108 "relocate_from": self.relocate_from,
5110 self.in_data["request"] = request
5112 def _BuildInputData(self):
5113 """Build input data structures.
5116 self._ComputeClusterData()
5118 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5119 self._AddNewInstance()
5121 self._AddRelocateInstance()
5123 self.in_text = serializer.Dump(self.in_data)
5125 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5126 """Run an instance allocator and return the results.
5131 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5133 if not isinstance(result, (list, tuple)) or len(result) != 4:
5134 raise errors.OpExecError("Invalid result from master iallocator runner")
5136 rcode, stdout, stderr, fail = result
5138 if rcode == constants.IARUN_NOTFOUND:
5139 raise errors.OpExecError("Can't find allocator '%s'" % name)
5140 elif rcode == constants.IARUN_FAILURE:
5141 raise errors.OpExecError("Instance allocator call failed: %s,"
5142 " output: %s" % (fail, stdout+stderr))
5143 self.out_text = stdout
5145 self._ValidateResult()
5147 def _ValidateResult(self):
5148 """Process the allocator results.
5150 This will process and if successful save the result in
5151 self.out_data and the other parameters.
5155 rdict = serializer.Load(self.out_text)
5156 except Exception, err:
5157 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5159 if not isinstance(rdict, dict):
5160 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5162 for key in "success", "info", "nodes":
5163 if key not in rdict:
5164 raise errors.OpExecError("Can't parse iallocator results:"
5165 " missing key '%s'" % key)
5166 setattr(self, key, rdict[key])
5168 if not isinstance(rdict["nodes"], list):
5169 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5171 self.out_data = rdict
5174 class LUTestAllocator(NoHooksLU):
5175 """Run allocator tests.
5177 This LU runs the allocator tests
5180 _OP_REQP = ["direction", "mode", "name"]
5182 def CheckPrereq(self):
5183 """Check prerequisites.
5185 This checks the opcode parameters depending on the director and mode test.
5188 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5189 for attr in ["name", "mem_size", "disks", "disk_template",
5190 "os", "tags", "nics", "vcpus"]:
5191 if not hasattr(self.op, attr):
5192 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5194 iname = self.cfg.ExpandInstanceName(self.op.name)
5195 if iname is not None:
5196 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5198 if not isinstance(self.op.nics, list):
5199 raise errors.OpPrereqError("Invalid parameter 'nics'")
5200 for row in self.op.nics:
5201 if (not isinstance(row, dict) or
5204 "bridge" not in row):
5205 raise errors.OpPrereqError("Invalid contents of the"
5206 " 'nics' parameter")
5207 if not isinstance(self.op.disks, list):
5208 raise errors.OpPrereqError("Invalid parameter 'disks'")
5209 if len(self.op.disks) != 2:
5210 raise errors.OpPrereqError("Only two-disk configurations supported")
5211 for row in self.op.disks:
5212 if (not isinstance(row, dict) or
5213 "size" not in row or
5214 not isinstance(row["size"], int) or
5215 "mode" not in row or
5216 row["mode"] not in ['r', 'w']):
5217 raise errors.OpPrereqError("Invalid contents of the"
5218 " 'disks' parameter")
5219 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5220 if not hasattr(self.op, "name"):
5221 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5222 fname = self.cfg.ExpandInstanceName(self.op.name)
5224 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5226 self.op.name = fname
5227 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5229 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5232 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5233 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5234 raise errors.OpPrereqError("Missing allocator name")
5235 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5236 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5239 def Exec(self, feedback_fn):
5240 """Run the allocator test.
5243 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5244 ial = IAllocator(self.cfg, self.sstore,
5247 mem_size=self.op.mem_size,
5248 disks=self.op.disks,
5249 disk_template=self.op.disk_template,
5253 vcpus=self.op.vcpus,
5256 ial = IAllocator(self.cfg, self.sstore,
5259 relocate_from=list(self.relocate_from),
5262 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5263 result = ial.in_text
5265 ial.Run(self.op.allocator, validate=False)
5266 result = ial.out_text