4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_MASTER: the LU needs to run on the master node
60 REQ_WSSTORE: the LU needs a writable SimpleStore
61 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
63 Note that all commands require root permissions.
73 def __init__(self, processor, op, context, sstore):
74 """Constructor for LogicalUnit.
76 This needs to be overriden in derived classes in order to check op
82 self.cfg = context.cfg
84 self.context = context
85 self.needed_locks = None
88 for attr_name in self._OP_REQP:
89 attr_val = getattr(op, attr_name, None)
91 raise errors.OpPrereqError("Required parameter '%s' missing" %
94 if not self.cfg.IsCluster():
95 raise errors.OpPrereqError("Cluster not initialized yet,"
96 " use 'gnt-cluster init' first.")
98 master = sstore.GetMasterNode()
99 if master != utils.HostInfo().name:
100 raise errors.OpPrereqError("Commands must be run on the master"
104 """Returns the SshRunner object
108 self.__ssh = ssh.SshRunner(self.sstore)
111 ssh = property(fget=__GetSSH)
113 def ExpandNames(self):
114 """Expand names for this LU.
116 This method is called before starting to execute the opcode, and it should
117 update all the parameters of the opcode to their canonical form (e.g. a
118 short node name must be fully expanded after this method has successfully
119 completed). This way locking, hooks, logging, ecc. can work correctly.
121 LUs which implement this method must also populate the self.needed_locks
122 member, as a dict with lock levels as keys, and a list of needed lock names
124 - Use an empty dict if you don't need any lock
125 - If you don't need any lock at a particular level omit that level
126 - Don't put anything for the BGL level
127 - If you want all locks at a level use None as a value
128 (this reflects what LockSet does, and will be replaced before
129 CheckPrereq with the full list of nodes that have been locked)
132 # Acquire all nodes and one instance
133 self.needed_locks = {
134 locking.LEVEL_NODE: None,
135 locking.LEVEL_INSTANCES: ['instance1.example.tld'],
137 # Acquire just two nodes
138 self.needed_locks = {
139 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
142 self.needed_locks = {} # No, you can't leave it to the default value None
145 # The implementation of this method is mandatory only if the new LU is
146 # concurrent, so that old LUs don't need to be changed all at the same
149 self.needed_locks = {} # Exclusive LUs don't need locks.
151 raise NotImplementedError
153 def CheckPrereq(self):
154 """Check prerequisites for this LU.
156 This method should check that the prerequisites for the execution
157 of this LU are fulfilled. It can do internode communication, but
158 it should be idempotent - no cluster or system changes are
161 The method should raise errors.OpPrereqError in case something is
162 not fulfilled. Its return value is ignored.
164 This method should also update all the parameters of the opcode to
165 their canonical form if it hasn't been done by ExpandNames before.
168 raise NotImplementedError
170 def Exec(self, feedback_fn):
173 This method should implement the actual work. It should raise
174 errors.OpExecError for failures that are somewhat dealt with in
178 raise NotImplementedError
180 def BuildHooksEnv(self):
181 """Build hooks environment for this LU.
183 This method should return a three-node tuple consisting of: a dict
184 containing the environment that will be used for running the
185 specific hook for this LU, a list of node names on which the hook
186 should run before the execution, and a list of node names on which
187 the hook should run after the execution.
189 The keys of the dict must not have 'GANETI_' prefixed as this will
190 be handled in the hooks runner. Also note additional keys will be
191 added by the hooks runner. If the LU doesn't define any
192 environment, an empty dict (and not None) should be returned.
194 No nodes should be returned as an empty list (and not None).
196 Note that if the HPATH for a LU class is None, this function will
200 raise NotImplementedError
202 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203 """Notify the LU about the results of its hooks.
205 This method is called every time a hooks phase is executed, and notifies
206 the Logical Unit about the hooks' result. The LU can then use it to alter
207 its result based on the hooks. By default the method does nothing and the
208 previous result is passed back unchanged but any LU can define it if it
209 wants to use the local cluster hook-scripts somehow.
212 phase: the hooks phase that has just been run
213 hooks_results: the results of the multi-node hooks rpc call
214 feedback_fn: function to send feedback back to the caller
215 lu_result: the previous result this LU had, or None in the PRE phase.
221 class NoHooksLU(LogicalUnit):
222 """Simple LU which runs no hooks.
224 This LU is intended as a parent for other LogicalUnits which will
225 run no hooks, in order to reduce duplicate code.
232 def _GetWantedNodes(lu, nodes):
233 """Returns list of checked and expanded node names.
236 nodes: List of nodes (strings) or None for all
239 if not isinstance(nodes, list):
240 raise errors.OpPrereqError("Invalid argument type 'nodes'")
246 node = lu.cfg.ExpandNodeName(name)
248 raise errors.OpPrereqError("No such node name '%s'" % name)
252 wanted = lu.cfg.GetNodeList()
253 return utils.NiceSort(wanted)
256 def _GetWantedInstances(lu, instances):
257 """Returns list of checked and expanded instance names.
260 instances: List of instances (strings) or None for all
263 if not isinstance(instances, list):
264 raise errors.OpPrereqError("Invalid argument type 'instances'")
269 for name in instances:
270 instance = lu.cfg.ExpandInstanceName(name)
272 raise errors.OpPrereqError("No such instance name '%s'" % name)
273 wanted.append(instance)
276 wanted = lu.cfg.GetInstanceList()
277 return utils.NiceSort(wanted)
280 def _CheckOutputFields(static, dynamic, selected):
281 """Checks whether all selected fields are valid.
284 static: Static fields
285 dynamic: Dynamic fields
288 static_fields = frozenset(static)
289 dynamic_fields = frozenset(dynamic)
291 all_fields = static_fields | dynamic_fields
293 if not all_fields.issuperset(selected):
294 raise errors.OpPrereqError("Unknown output fields selected: %s"
295 % ",".join(frozenset(selected).
296 difference(all_fields)))
299 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
300 memory, vcpus, nics):
301 """Builds instance related env variables for hooks from single variables.
304 secondary_nodes: List of secondary nodes as strings
308 "INSTANCE_NAME": name,
309 "INSTANCE_PRIMARY": primary_node,
310 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
311 "INSTANCE_OS_TYPE": os_type,
312 "INSTANCE_STATUS": status,
313 "INSTANCE_MEMORY": memory,
314 "INSTANCE_VCPUS": vcpus,
318 nic_count = len(nics)
319 for idx, (ip, bridge, mac) in enumerate(nics):
322 env["INSTANCE_NIC%d_IP" % idx] = ip
323 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
324 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
328 env["INSTANCE_NIC_COUNT"] = nic_count
333 def _BuildInstanceHookEnvByObject(instance, override=None):
334 """Builds instance related env variables for hooks from an object.
337 instance: objects.Instance object of instance
338 override: dict of values to override
341 'name': instance.name,
342 'primary_node': instance.primary_node,
343 'secondary_nodes': instance.secondary_nodes,
344 'os_type': instance.os,
345 'status': instance.os,
346 'memory': instance.memory,
347 'vcpus': instance.vcpus,
348 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
351 args.update(override)
352 return _BuildInstanceHookEnv(**args)
355 def _CheckInstanceBridgesExist(instance):
356 """Check that the brigdes needed by an instance exist.
359 # check bridges existance
360 brlist = [nic.bridge for nic in instance.nics]
361 if not rpc.call_bridges_exist(instance.primary_node, brlist):
362 raise errors.OpPrereqError("one or more target bridges %s does not"
363 " exist on destination node '%s'" %
364 (brlist, instance.primary_node))
367 class LUDestroyCluster(NoHooksLU):
368 """Logical unit for destroying the cluster.
373 def CheckPrereq(self):
374 """Check prerequisites.
376 This checks whether the cluster is empty.
378 Any errors are signalled by raising errors.OpPrereqError.
381 master = self.sstore.GetMasterNode()
383 nodelist = self.cfg.GetNodeList()
384 if len(nodelist) != 1 or nodelist[0] != master:
385 raise errors.OpPrereqError("There are still %d node(s) in"
386 " this cluster." % (len(nodelist) - 1))
387 instancelist = self.cfg.GetInstanceList()
389 raise errors.OpPrereqError("There are still %d instance(s) in"
390 " this cluster." % len(instancelist))
392 def Exec(self, feedback_fn):
393 """Destroys the cluster.
396 master = self.sstore.GetMasterNode()
397 if not rpc.call_node_stop_master(master):
398 raise errors.OpExecError("Could not disable the master role")
399 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
400 utils.CreateBackup(priv_key)
401 utils.CreateBackup(pub_key)
402 rpc.call_node_leave_cluster(master)
405 class LUVerifyCluster(LogicalUnit):
406 """Verifies the cluster status.
409 HPATH = "cluster-verify"
410 HTYPE = constants.HTYPE_CLUSTER
411 _OP_REQP = ["skip_checks"]
413 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
414 remote_version, feedback_fn):
415 """Run multiple tests against a node.
418 - compares ganeti version
419 - checks vg existance and size > 20G
420 - checks config file checksum
421 - checks ssh to other nodes
424 node: name of the node to check
425 file_list: required list of files
426 local_cksum: dictionary of local files and their checksums
429 # compares ganeti version
430 local_version = constants.PROTOCOL_VERSION
431 if not remote_version:
432 feedback_fn(" - ERROR: connection to %s failed" % (node))
435 if local_version != remote_version:
436 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
437 (local_version, node, remote_version))
440 # checks vg existance and size > 20G
444 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
448 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
449 constants.MIN_VG_SIZE)
451 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
454 # checks config file checksum
457 if 'filelist' not in node_result:
459 feedback_fn(" - ERROR: node hasn't returned file checksum data")
461 remote_cksum = node_result['filelist']
462 for file_name in file_list:
463 if file_name not in remote_cksum:
465 feedback_fn(" - ERROR: file '%s' missing" % file_name)
466 elif remote_cksum[file_name] != local_cksum[file_name]:
468 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
470 if 'nodelist' not in node_result:
472 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
474 if node_result['nodelist']:
476 for node in node_result['nodelist']:
477 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
478 (node, node_result['nodelist'][node]))
479 if 'node-net-test' not in node_result:
481 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
483 if node_result['node-net-test']:
485 nlist = utils.NiceSort(node_result['node-net-test'].keys())
487 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
488 (node, node_result['node-net-test'][node]))
490 hyp_result = node_result.get('hypervisor', None)
491 if hyp_result is not None:
492 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
495 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
496 node_instance, feedback_fn):
497 """Verify an instance.
499 This function checks to see if the required block devices are
500 available on the instance's node.
505 node_current = instanceconfig.primary_node
508 instanceconfig.MapLVsByNode(node_vol_should)
510 for node in node_vol_should:
511 for volume in node_vol_should[node]:
512 if node not in node_vol_is or volume not in node_vol_is[node]:
513 feedback_fn(" - ERROR: volume %s missing on node %s" %
517 if not instanceconfig.status == 'down':
518 if (node_current not in node_instance or
519 not instance in node_instance[node_current]):
520 feedback_fn(" - ERROR: instance %s not running on node %s" %
521 (instance, node_current))
524 for node in node_instance:
525 if (not node == node_current):
526 if instance in node_instance[node]:
527 feedback_fn(" - ERROR: instance %s should not run on node %s" %
533 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
534 """Verify if there are any unknown volumes in the cluster.
536 The .os, .swap and backup volumes are ignored. All other volumes are
542 for node in node_vol_is:
543 for volume in node_vol_is[node]:
544 if node not in node_vol_should or volume not in node_vol_should[node]:
545 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
550 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
551 """Verify the list of running instances.
553 This checks what instances are running but unknown to the cluster.
557 for node in node_instance:
558 for runninginstance in node_instance[node]:
559 if runninginstance not in instancelist:
560 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
561 (runninginstance, node))
565 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
566 """Verify N+1 Memory Resilience.
568 Check that if one single node dies we can still start all the instances it
574 for node, nodeinfo in node_info.iteritems():
575 # This code checks that every node which is now listed as secondary has
576 # enough memory to host all instances it is supposed to should a single
577 # other node in the cluster fail.
578 # FIXME: not ready for failover to an arbitrary node
579 # FIXME: does not support file-backed instances
580 # WARNING: we currently take into account down instances as well as up
581 # ones, considering that even if they're down someone might want to start
582 # them even in the event of a node failure.
583 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
585 for instance in instances:
586 needed_mem += instance_cfg[instance].memory
587 if nodeinfo['mfree'] < needed_mem:
588 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
589 " failovers should node %s fail" % (node, prinode))
593 def CheckPrereq(self):
594 """Check prerequisites.
596 Transform the list of checks we're going to skip into a set and check that
597 all its members are valid.
600 self.skip_set = frozenset(self.op.skip_checks)
601 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
602 raise errors.OpPrereqError("Invalid checks to be skipped specified")
604 def BuildHooksEnv(self):
607 Cluster-Verify hooks just rone in the post phase and their failure makes
608 the output be logged in the verify output and the verification to fail.
611 all_nodes = self.cfg.GetNodeList()
612 # TODO: populate the environment with useful information for verify hooks
614 return env, [], all_nodes
616 def Exec(self, feedback_fn):
617 """Verify integrity of cluster, performing various test on nodes.
621 feedback_fn("* Verifying global settings")
622 for msg in self.cfg.VerifyConfig():
623 feedback_fn(" - ERROR: %s" % msg)
625 vg_name = self.cfg.GetVGName()
626 nodelist = utils.NiceSort(self.cfg.GetNodeList())
627 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
628 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
629 i_non_redundant = [] # Non redundant instances
635 # FIXME: verify OS list
637 file_names = list(self.sstore.GetFileList())
638 file_names.append(constants.SSL_CERT_FILE)
639 file_names.append(constants.CLUSTER_CONF_FILE)
640 local_checksums = utils.FingerprintFiles(file_names)
642 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
643 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
644 all_instanceinfo = rpc.call_instance_list(nodelist)
645 all_vglist = rpc.call_vg_list(nodelist)
646 node_verify_param = {
647 'filelist': file_names,
648 'nodelist': nodelist,
650 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
651 for node in nodeinfo]
653 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
654 all_rversion = rpc.call_version(nodelist)
655 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
657 for node in nodelist:
658 feedback_fn("* Verifying node %s" % node)
659 result = self._VerifyNode(node, file_names, local_checksums,
660 all_vglist[node], all_nvinfo[node],
661 all_rversion[node], feedback_fn)
665 volumeinfo = all_volumeinfo[node]
667 if isinstance(volumeinfo, basestring):
668 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
669 (node, volumeinfo[-400:].encode('string_escape')))
671 node_volume[node] = {}
672 elif not isinstance(volumeinfo, dict):
673 feedback_fn(" - ERROR: connection to %s failed" % (node,))
677 node_volume[node] = volumeinfo
680 nodeinstance = all_instanceinfo[node]
681 if type(nodeinstance) != list:
682 feedback_fn(" - ERROR: connection to %s failed" % (node,))
686 node_instance[node] = nodeinstance
689 nodeinfo = all_ninfo[node]
690 if not isinstance(nodeinfo, dict):
691 feedback_fn(" - ERROR: connection to %s failed" % (node,))
697 "mfree": int(nodeinfo['memory_free']),
698 "dfree": int(nodeinfo['vg_free']),
701 # dictionary holding all instances this node is secondary for,
702 # grouped by their primary node. Each key is a cluster node, and each
703 # value is a list of instances which have the key as primary and the
704 # current node as secondary. this is handy to calculate N+1 memory
705 # availability if you can only failover from a primary to its
707 "sinst-by-pnode": {},
710 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
716 for instance in instancelist:
717 feedback_fn("* Verifying instance %s" % instance)
718 inst_config = self.cfg.GetInstanceInfo(instance)
719 result = self._VerifyInstance(instance, inst_config, node_volume,
720 node_instance, feedback_fn)
723 inst_config.MapLVsByNode(node_vol_should)
725 instance_cfg[instance] = inst_config
727 pnode = inst_config.primary_node
728 if pnode in node_info:
729 node_info[pnode]['pinst'].append(instance)
731 feedback_fn(" - ERROR: instance %s, connection to primary node"
732 " %s failed" % (instance, pnode))
735 # If the instance is non-redundant we cannot survive losing its primary
736 # node, so we are not N+1 compliant. On the other hand we have no disk
737 # templates with more than one secondary so that situation is not well
739 # FIXME: does not support file-backed instances
740 if len(inst_config.secondary_nodes) == 0:
741 i_non_redundant.append(instance)
742 elif len(inst_config.secondary_nodes) > 1:
743 feedback_fn(" - WARNING: multiple secondaries for instance %s"
746 for snode in inst_config.secondary_nodes:
747 if snode in node_info:
748 node_info[snode]['sinst'].append(instance)
749 if pnode not in node_info[snode]['sinst-by-pnode']:
750 node_info[snode]['sinst-by-pnode'][pnode] = []
751 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
753 feedback_fn(" - ERROR: instance %s, connection to secondary node"
754 " %s failed" % (instance, snode))
756 feedback_fn("* Verifying orphan volumes")
757 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
761 feedback_fn("* Verifying remaining instances")
762 result = self._VerifyOrphanInstances(instancelist, node_instance,
766 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
767 feedback_fn("* Verifying N+1 Memory redundancy")
768 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
771 feedback_fn("* Other Notes")
773 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
774 % len(i_non_redundant))
778 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
779 """Analize the post-hooks' result, handle it, and send some
780 nicely-formatted feedback back to the user.
783 phase: the hooks phase that has just been run
784 hooks_results: the results of the multi-node hooks rpc call
785 feedback_fn: function to send feedback back to the caller
786 lu_result: previous Exec result
789 # We only really run POST phase hooks, and are only interested in their results
790 if phase == constants.HOOKS_PHASE_POST:
791 # Used to change hooks' output to proper indentation
792 indent_re = re.compile('^', re.M)
793 feedback_fn("* Hooks Results")
794 if not hooks_results:
795 feedback_fn(" - ERROR: general communication failure")
798 for node_name in hooks_results:
799 show_node_header = True
800 res = hooks_results[node_name]
801 if res is False or not isinstance(res, list):
802 feedback_fn(" Communication failure")
805 for script, hkr, output in res:
806 if hkr == constants.HKR_FAIL:
807 # The node header is only shown once, if there are
808 # failing hooks on that node
810 feedback_fn(" Node %s:" % node_name)
811 show_node_header = False
812 feedback_fn(" ERROR: Script %s failed, output:" % script)
813 output = indent_re.sub(' ', output)
814 feedback_fn("%s" % output)
820 class LUVerifyDisks(NoHooksLU):
821 """Verifies the cluster disks status.
826 def CheckPrereq(self):
827 """Check prerequisites.
829 This has no prerequisites.
834 def Exec(self, feedback_fn):
835 """Verify integrity of cluster disks.
838 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
840 vg_name = self.cfg.GetVGName()
841 nodes = utils.NiceSort(self.cfg.GetNodeList())
842 instances = [self.cfg.GetInstanceInfo(name)
843 for name in self.cfg.GetInstanceList()]
846 for inst in instances:
848 if (inst.status != "up" or
849 inst.disk_template not in constants.DTS_NET_MIRROR):
851 inst.MapLVsByNode(inst_lvs)
852 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
853 for node, vol_list in inst_lvs.iteritems():
855 nv_dict[(node, vol)] = inst
860 node_lvs = rpc.call_volume_list(nodes, vg_name)
867 if isinstance(lvs, basestring):
868 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
870 elif not isinstance(lvs, dict):
871 logger.Info("connection to node %s failed or invalid data returned" %
873 res_nodes.append(node)
876 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
877 inst = nv_dict.pop((node, lv_name), None)
878 if (not lv_online and inst is not None
879 and inst.name not in res_instances):
880 res_instances.append(inst.name)
882 # any leftover items in nv_dict are missing LVs, let's arrange the
884 for key, inst in nv_dict.iteritems():
885 if inst.name not in res_missing:
886 res_missing[inst.name] = []
887 res_missing[inst.name].append(key)
892 class LURenameCluster(LogicalUnit):
893 """Rename the cluster.
896 HPATH = "cluster-rename"
897 HTYPE = constants.HTYPE_CLUSTER
901 def BuildHooksEnv(self):
906 "OP_TARGET": self.sstore.GetClusterName(),
907 "NEW_NAME": self.op.name,
909 mn = self.sstore.GetMasterNode()
910 return env, [mn], [mn]
912 def CheckPrereq(self):
913 """Verify that the passed name is a valid one.
916 hostname = utils.HostInfo(self.op.name)
918 new_name = hostname.name
919 self.ip = new_ip = hostname.ip
920 old_name = self.sstore.GetClusterName()
921 old_ip = self.sstore.GetMasterIP()
922 if new_name == old_name and new_ip == old_ip:
923 raise errors.OpPrereqError("Neither the name nor the IP address of the"
924 " cluster has changed")
926 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
927 raise errors.OpPrereqError("The given cluster IP address (%s) is"
928 " reachable on the network. Aborting." %
931 self.op.name = new_name
933 def Exec(self, feedback_fn):
934 """Rename the cluster.
937 clustername = self.op.name
941 # shutdown the master IP
942 master = ss.GetMasterNode()
943 if not rpc.call_node_stop_master(master):
944 raise errors.OpExecError("Could not disable the master role")
948 ss.SetKey(ss.SS_MASTER_IP, ip)
949 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
951 # Distribute updated ss config to all nodes
952 myself = self.cfg.GetNodeInfo(master)
953 dist_nodes = self.cfg.GetNodeList()
954 if myself.name in dist_nodes:
955 dist_nodes.remove(myself.name)
957 logger.Debug("Copying updated ssconf data to all nodes")
958 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
959 fname = ss.KeyToFilename(keyname)
960 result = rpc.call_upload_file(dist_nodes, fname)
961 for to_node in dist_nodes:
962 if not result[to_node]:
963 logger.Error("copy of file %s to node %s failed" %
966 if not rpc.call_node_start_master(master):
967 logger.Error("Could not re-enable the master role on the master,"
968 " please restart manually.")
971 def _RecursiveCheckIfLVMBased(disk):
972 """Check if the given disk or its children are lvm-based.
975 disk: ganeti.objects.Disk object
978 boolean indicating whether a LD_LV dev_type was found or not
982 for chdisk in disk.children:
983 if _RecursiveCheckIfLVMBased(chdisk):
985 return disk.dev_type == constants.LD_LV
988 class LUSetClusterParams(LogicalUnit):
989 """Change the parameters of the cluster.
992 HPATH = "cluster-modify"
993 HTYPE = constants.HTYPE_CLUSTER
996 def BuildHooksEnv(self):
1001 "OP_TARGET": self.sstore.GetClusterName(),
1002 "NEW_VG_NAME": self.op.vg_name,
1004 mn = self.sstore.GetMasterNode()
1005 return env, [mn], [mn]
1007 def CheckPrereq(self):
1008 """Check prerequisites.
1010 This checks whether the given params don't conflict and
1011 if the given volume group is valid.
1014 if not self.op.vg_name:
1015 instances = [self.cfg.GetInstanceInfo(name)
1016 for name in self.cfg.GetInstanceList()]
1017 for inst in instances:
1018 for disk in inst.disks:
1019 if _RecursiveCheckIfLVMBased(disk):
1020 raise errors.OpPrereqError("Cannot disable lvm storage while"
1021 " lvm-based instances exist")
1023 # if vg_name not None, checks given volume group on all nodes
1025 node_list = self.cfg.GetNodeList()
1026 vglist = rpc.call_vg_list(node_list)
1027 for node in node_list:
1028 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1029 constants.MIN_VG_SIZE)
1031 raise errors.OpPrereqError("Error on node '%s': %s" %
1034 def Exec(self, feedback_fn):
1035 """Change the parameters of the cluster.
1038 if self.op.vg_name != self.cfg.GetVGName():
1039 self.cfg.SetVGName(self.op.vg_name)
1041 feedback_fn("Cluster LVM configuration already in desired"
1042 " state, not changing")
1045 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1046 """Sleep and poll for an instance's disk to sync.
1049 if not instance.disks:
1053 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1055 node = instance.primary_node
1057 for dev in instance.disks:
1058 cfgw.SetDiskID(dev, node)
1064 cumul_degraded = False
1065 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1067 proc.LogWarning("Can't get any data from node %s" % node)
1070 raise errors.RemoteError("Can't contact node %s for mirror data,"
1071 " aborting." % node)
1075 for i in range(len(rstats)):
1078 proc.LogWarning("Can't compute data for node %s/%s" %
1079 (node, instance.disks[i].iv_name))
1081 # we ignore the ldisk parameter
1082 perc_done, est_time, is_degraded, _ = mstat
1083 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1084 if perc_done is not None:
1086 if est_time is not None:
1087 rem_time = "%d estimated seconds remaining" % est_time
1090 rem_time = "no time estimate"
1091 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1092 (instance.disks[i].iv_name, perc_done, rem_time))
1096 time.sleep(min(60, max_time))
1099 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1100 return not cumul_degraded
1103 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1104 """Check that mirrors are not degraded.
1106 The ldisk parameter, if True, will change the test from the
1107 is_degraded attribute (which represents overall non-ok status for
1108 the device(s)) to the ldisk (representing the local storage status).
1111 cfgw.SetDiskID(dev, node)
1118 if on_primary or dev.AssembleOnSecondary():
1119 rstats = rpc.call_blockdev_find(node, dev)
1121 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1124 result = result and (not rstats[idx])
1126 for child in dev.children:
1127 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1132 class LUDiagnoseOS(NoHooksLU):
1133 """Logical unit for OS diagnose/query.
1136 _OP_REQP = ["output_fields", "names"]
1138 def CheckPrereq(self):
1139 """Check prerequisites.
1141 This always succeeds, since this is a pure query LU.
1145 raise errors.OpPrereqError("Selective OS query not supported")
1147 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1148 _CheckOutputFields(static=[],
1149 dynamic=self.dynamic_fields,
1150 selected=self.op.output_fields)
1153 def _DiagnoseByOS(node_list, rlist):
1154 """Remaps a per-node return list into an a per-os per-node dictionary
1157 node_list: a list with the names of all nodes
1158 rlist: a map with node names as keys and OS objects as values
1161 map: a map with osnames as keys and as value another map, with
1163 keys and list of OS objects as values
1164 e.g. {"debian-etch": {"node1": [<object>,...],
1165 "node2": [<object>,]}
1170 for node_name, nr in rlist.iteritems():
1174 if os_obj.name not in all_os:
1175 # build a list of nodes for this os containing empty lists
1176 # for each node in node_list
1177 all_os[os_obj.name] = {}
1178 for nname in node_list:
1179 all_os[os_obj.name][nname] = []
1180 all_os[os_obj.name][node_name].append(os_obj)
1183 def Exec(self, feedback_fn):
1184 """Compute the list of OSes.
1187 node_list = self.cfg.GetNodeList()
1188 node_data = rpc.call_os_diagnose(node_list)
1189 if node_data == False:
1190 raise errors.OpExecError("Can't gather the list of OSes")
1191 pol = self._DiagnoseByOS(node_list, node_data)
1193 for os_name, os_data in pol.iteritems():
1195 for field in self.op.output_fields:
1198 elif field == "valid":
1199 val = utils.all([osl and osl[0] for osl in os_data.values()])
1200 elif field == "node_status":
1202 for node_name, nos_list in os_data.iteritems():
1203 val[node_name] = [(v.status, v.path) for v in nos_list]
1205 raise errors.ParameterError(field)
1212 class LURemoveNode(LogicalUnit):
1213 """Logical unit for removing a node.
1216 HPATH = "node-remove"
1217 HTYPE = constants.HTYPE_NODE
1218 _OP_REQP = ["node_name"]
1220 def BuildHooksEnv(self):
1223 This doesn't run on the target node in the pre phase as a failed
1224 node would then be impossible to remove.
1228 "OP_TARGET": self.op.node_name,
1229 "NODE_NAME": self.op.node_name,
1231 all_nodes = self.cfg.GetNodeList()
1232 all_nodes.remove(self.op.node_name)
1233 return env, all_nodes, all_nodes
1235 def CheckPrereq(self):
1236 """Check prerequisites.
1239 - the node exists in the configuration
1240 - it does not have primary or secondary instances
1241 - it's not the master
1243 Any errors are signalled by raising errors.OpPrereqError.
1246 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1248 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1250 instance_list = self.cfg.GetInstanceList()
1252 masternode = self.sstore.GetMasterNode()
1253 if node.name == masternode:
1254 raise errors.OpPrereqError("Node is the master node,"
1255 " you need to failover first.")
1257 for instance_name in instance_list:
1258 instance = self.cfg.GetInstanceInfo(instance_name)
1259 if node.name == instance.primary_node:
1260 raise errors.OpPrereqError("Instance %s still running on the node,"
1261 " please remove first." % instance_name)
1262 if node.name in instance.secondary_nodes:
1263 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1264 " please remove first." % instance_name)
1265 self.op.node_name = node.name
1268 def Exec(self, feedback_fn):
1269 """Removes the node from the cluster.
1273 logger.Info("stopping the node daemon and removing configs from node %s" %
1276 rpc.call_node_leave_cluster(node.name)
1278 logger.Info("Removing node %s from config" % node.name)
1280 self.cfg.RemoveNode(node.name)
1281 # Remove the node from the Ganeti Lock Manager
1282 self.context.glm.remove(locking.LEVEL_NODE, node.name)
1284 utils.RemoveHostFromEtcHosts(node.name)
1287 class LUQueryNodes(NoHooksLU):
1288 """Logical unit for querying nodes.
1291 _OP_REQP = ["output_fields", "names"]
1293 def CheckPrereq(self):
1294 """Check prerequisites.
1296 This checks that the fields required are valid output fields.
1299 self.dynamic_fields = frozenset([
1301 "mtotal", "mnode", "mfree",
1306 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1307 "pinst_list", "sinst_list",
1308 "pip", "sip", "tags"],
1309 dynamic=self.dynamic_fields,
1310 selected=self.op.output_fields)
1312 self.wanted = _GetWantedNodes(self, self.op.names)
1314 def Exec(self, feedback_fn):
1315 """Computes the list of nodes and their attributes.
1318 nodenames = self.wanted
1319 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1321 # begin data gathering
1323 if self.dynamic_fields.intersection(self.op.output_fields):
1325 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1326 for name in nodenames:
1327 nodeinfo = node_data.get(name, None)
1330 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1331 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1332 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1333 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1334 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1335 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1336 "bootid": nodeinfo['bootid'],
1339 live_data[name] = {}
1341 live_data = dict.fromkeys(nodenames, {})
1343 node_to_primary = dict([(name, set()) for name in nodenames])
1344 node_to_secondary = dict([(name, set()) for name in nodenames])
1346 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1347 "sinst_cnt", "sinst_list"))
1348 if inst_fields & frozenset(self.op.output_fields):
1349 instancelist = self.cfg.GetInstanceList()
1351 for instance_name in instancelist:
1352 inst = self.cfg.GetInstanceInfo(instance_name)
1353 if inst.primary_node in node_to_primary:
1354 node_to_primary[inst.primary_node].add(inst.name)
1355 for secnode in inst.secondary_nodes:
1356 if secnode in node_to_secondary:
1357 node_to_secondary[secnode].add(inst.name)
1359 # end data gathering
1362 for node in nodelist:
1364 for field in self.op.output_fields:
1367 elif field == "pinst_list":
1368 val = list(node_to_primary[node.name])
1369 elif field == "sinst_list":
1370 val = list(node_to_secondary[node.name])
1371 elif field == "pinst_cnt":
1372 val = len(node_to_primary[node.name])
1373 elif field == "sinst_cnt":
1374 val = len(node_to_secondary[node.name])
1375 elif field == "pip":
1376 val = node.primary_ip
1377 elif field == "sip":
1378 val = node.secondary_ip
1379 elif field == "tags":
1380 val = list(node.GetTags())
1381 elif field in self.dynamic_fields:
1382 val = live_data[node.name].get(field, None)
1384 raise errors.ParameterError(field)
1385 node_output.append(val)
1386 output.append(node_output)
1391 class LUQueryNodeVolumes(NoHooksLU):
1392 """Logical unit for getting volumes on node(s).
1395 _OP_REQP = ["nodes", "output_fields"]
1397 def CheckPrereq(self):
1398 """Check prerequisites.
1400 This checks that the fields required are valid output fields.
1403 self.nodes = _GetWantedNodes(self, self.op.nodes)
1405 _CheckOutputFields(static=["node"],
1406 dynamic=["phys", "vg", "name", "size", "instance"],
1407 selected=self.op.output_fields)
1410 def Exec(self, feedback_fn):
1411 """Computes the list of nodes and their attributes.
1414 nodenames = self.nodes
1415 volumes = rpc.call_node_volumes(nodenames)
1417 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1418 in self.cfg.GetInstanceList()]
1420 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1423 for node in nodenames:
1424 if node not in volumes or not volumes[node]:
1427 node_vols = volumes[node][:]
1428 node_vols.sort(key=lambda vol: vol['dev'])
1430 for vol in node_vols:
1432 for field in self.op.output_fields:
1435 elif field == "phys":
1439 elif field == "name":
1441 elif field == "size":
1442 val = int(float(vol['size']))
1443 elif field == "instance":
1445 if node not in lv_by_node[inst]:
1447 if vol['name'] in lv_by_node[inst][node]:
1453 raise errors.ParameterError(field)
1454 node_output.append(str(val))
1456 output.append(node_output)
1461 class LUAddNode(LogicalUnit):
1462 """Logical unit for adding node to the cluster.
1466 HTYPE = constants.HTYPE_NODE
1467 _OP_REQP = ["node_name"]
1469 def BuildHooksEnv(self):
1472 This will run on all nodes before, and on all nodes + the new node after.
1476 "OP_TARGET": self.op.node_name,
1477 "NODE_NAME": self.op.node_name,
1478 "NODE_PIP": self.op.primary_ip,
1479 "NODE_SIP": self.op.secondary_ip,
1481 nodes_0 = self.cfg.GetNodeList()
1482 nodes_1 = nodes_0 + [self.op.node_name, ]
1483 return env, nodes_0, nodes_1
1485 def CheckPrereq(self):
1486 """Check prerequisites.
1489 - the new node is not already in the config
1491 - its parameters (single/dual homed) matches the cluster
1493 Any errors are signalled by raising errors.OpPrereqError.
1496 node_name = self.op.node_name
1499 dns_data = utils.HostInfo(node_name)
1501 node = dns_data.name
1502 primary_ip = self.op.primary_ip = dns_data.ip
1503 secondary_ip = getattr(self.op, "secondary_ip", None)
1504 if secondary_ip is None:
1505 secondary_ip = primary_ip
1506 if not utils.IsValidIP(secondary_ip):
1507 raise errors.OpPrereqError("Invalid secondary IP given")
1508 self.op.secondary_ip = secondary_ip
1510 node_list = cfg.GetNodeList()
1511 if not self.op.readd and node in node_list:
1512 raise errors.OpPrereqError("Node %s is already in the configuration" %
1514 elif self.op.readd and node not in node_list:
1515 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1517 for existing_node_name in node_list:
1518 existing_node = cfg.GetNodeInfo(existing_node_name)
1520 if self.op.readd and node == existing_node_name:
1521 if (existing_node.primary_ip != primary_ip or
1522 existing_node.secondary_ip != secondary_ip):
1523 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1524 " address configuration as before")
1527 if (existing_node.primary_ip == primary_ip or
1528 existing_node.secondary_ip == primary_ip or
1529 existing_node.primary_ip == secondary_ip or
1530 existing_node.secondary_ip == secondary_ip):
1531 raise errors.OpPrereqError("New node ip address(es) conflict with"
1532 " existing node %s" % existing_node.name)
1534 # check that the type of the node (single versus dual homed) is the
1535 # same as for the master
1536 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1537 master_singlehomed = myself.secondary_ip == myself.primary_ip
1538 newbie_singlehomed = secondary_ip == primary_ip
1539 if master_singlehomed != newbie_singlehomed:
1540 if master_singlehomed:
1541 raise errors.OpPrereqError("The master has no private ip but the"
1542 " new node has one")
1544 raise errors.OpPrereqError("The master has a private ip but the"
1545 " new node doesn't have one")
1547 # checks reachablity
1548 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1549 raise errors.OpPrereqError("Node not reachable by ping")
1551 if not newbie_singlehomed:
1552 # check reachability from my secondary ip to newbie's secondary ip
1553 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1554 source=myself.secondary_ip):
1555 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1556 " based ping to noded port")
1558 self.new_node = objects.Node(name=node,
1559 primary_ip=primary_ip,
1560 secondary_ip=secondary_ip)
1562 def Exec(self, feedback_fn):
1563 """Adds the new node to the cluster.
1566 new_node = self.new_node
1567 node = new_node.name
1569 # check connectivity
1570 result = rpc.call_version([node])[node]
1572 if constants.PROTOCOL_VERSION == result:
1573 logger.Info("communication to node %s fine, sw version %s match" %
1576 raise errors.OpExecError("Version mismatch master version %s,"
1577 " node version %s" %
1578 (constants.PROTOCOL_VERSION, result))
1580 raise errors.OpExecError("Cannot get version from the new node")
1583 logger.Info("copy ssh key to node %s" % node)
1584 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1586 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1587 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1593 keyarray.append(f.read())
1597 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1598 keyarray[3], keyarray[4], keyarray[5])
1601 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1603 # Add node to our /etc/hosts, and add key to known_hosts
1604 utils.AddHostToEtcHosts(new_node.name)
1606 if new_node.secondary_ip != new_node.primary_ip:
1607 if not rpc.call_node_tcp_ping(new_node.name,
1608 constants.LOCALHOST_IP_ADDRESS,
1609 new_node.secondary_ip,
1610 constants.DEFAULT_NODED_PORT,
1612 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1613 " you gave (%s). Please fix and re-run this"
1614 " command." % new_node.secondary_ip)
1616 node_verify_list = [self.sstore.GetMasterNode()]
1617 node_verify_param = {
1619 # TODO: do a node-net-test as well?
1622 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1623 for verifier in node_verify_list:
1624 if not result[verifier]:
1625 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1626 " for remote verification" % verifier)
1627 if result[verifier]['nodelist']:
1628 for failed in result[verifier]['nodelist']:
1629 feedback_fn("ssh/hostname verification failed %s -> %s" %
1630 (verifier, result[verifier]['nodelist'][failed]))
1631 raise errors.OpExecError("ssh/hostname verification failed.")
1633 # Distribute updated /etc/hosts and known_hosts to all nodes,
1634 # including the node just added
1635 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1636 dist_nodes = self.cfg.GetNodeList()
1637 if not self.op.readd:
1638 dist_nodes.append(node)
1639 if myself.name in dist_nodes:
1640 dist_nodes.remove(myself.name)
1642 logger.Debug("Copying hosts and known_hosts to all nodes")
1643 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1644 result = rpc.call_upload_file(dist_nodes, fname)
1645 for to_node in dist_nodes:
1646 if not result[to_node]:
1647 logger.Error("copy of file %s to node %s failed" %
1650 to_copy = self.sstore.GetFileList()
1651 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652 to_copy.append(constants.VNC_PASSWORD_FILE)
1653 for fname in to_copy:
1654 result = rpc.call_upload_file([node], fname)
1655 if not result[node]:
1656 logger.Error("could not copy file %s to node %s" % (fname, node))
1658 if not self.op.readd:
1659 logger.Info("adding node %s to cluster.conf" % node)
1660 self.cfg.AddNode(new_node)
1661 # Add the new node to the Ganeti Lock Manager
1662 self.context.glm.add(locking.LEVEL_NODE, node)
1665 class LUMasterFailover(LogicalUnit):
1666 """Failover the master node to the current node.
1668 This is a special LU in that it must run on a non-master node.
1671 HPATH = "master-failover"
1672 HTYPE = constants.HTYPE_CLUSTER
1677 def BuildHooksEnv(self):
1680 This will run on the new master only in the pre phase, and on all
1681 the nodes in the post phase.
1685 "OP_TARGET": self.new_master,
1686 "NEW_MASTER": self.new_master,
1687 "OLD_MASTER": self.old_master,
1689 return env, [self.new_master], self.cfg.GetNodeList()
1691 def CheckPrereq(self):
1692 """Check prerequisites.
1694 This checks that we are not already the master.
1697 self.new_master = utils.HostInfo().name
1698 self.old_master = self.sstore.GetMasterNode()
1700 if self.old_master == self.new_master:
1701 raise errors.OpPrereqError("This commands must be run on the node"
1702 " where you want the new master to be."
1703 " %s is already the master" %
1706 def Exec(self, feedback_fn):
1707 """Failover the master node.
1709 This command, when run on a non-master node, will cause the current
1710 master to cease being master, and the non-master to become new
1714 #TODO: do not rely on gethostname returning the FQDN
1715 logger.Info("setting master to %s, old master: %s" %
1716 (self.new_master, self.old_master))
1718 if not rpc.call_node_stop_master(self.old_master):
1719 logger.Error("could disable the master role on the old master"
1720 " %s, please disable manually" % self.old_master)
1723 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1724 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1725 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1726 logger.Error("could not distribute the new simple store master file"
1727 " to the other nodes, please check.")
1729 if not rpc.call_node_start_master(self.new_master):
1730 logger.Error("could not start the master role on the new master"
1731 " %s, please check" % self.new_master)
1732 feedback_fn("Error in activating the master IP on the new master,"
1733 " please fix manually.")
1737 class LUQueryClusterInfo(NoHooksLU):
1738 """Query cluster configuration.
1744 def CheckPrereq(self):
1745 """No prerequsites needed for this LU.
1750 def Exec(self, feedback_fn):
1751 """Return cluster config.
1755 "name": self.sstore.GetClusterName(),
1756 "software_version": constants.RELEASE_VERSION,
1757 "protocol_version": constants.PROTOCOL_VERSION,
1758 "config_version": constants.CONFIG_VERSION,
1759 "os_api_version": constants.OS_API_VERSION,
1760 "export_version": constants.EXPORT_VERSION,
1761 "master": self.sstore.GetMasterNode(),
1762 "architecture": (platform.architecture()[0], platform.machine()),
1763 "hypervisor_type": self.sstore.GetHypervisorType(),
1769 class LUDumpClusterConfig(NoHooksLU):
1770 """Return a text-representation of the cluster-config.
1775 def CheckPrereq(self):
1776 """No prerequisites.
1781 def Exec(self, feedback_fn):
1782 """Dump a representation of the cluster config to the standard output.
1785 return self.cfg.DumpConfig()
1788 class LUActivateInstanceDisks(NoHooksLU):
1789 """Bring up an instance's disks.
1792 _OP_REQP = ["instance_name"]
1794 def CheckPrereq(self):
1795 """Check prerequisites.
1797 This checks that the instance is in the cluster.
1800 instance = self.cfg.GetInstanceInfo(
1801 self.cfg.ExpandInstanceName(self.op.instance_name))
1802 if instance is None:
1803 raise errors.OpPrereqError("Instance '%s' not known" %
1804 self.op.instance_name)
1805 self.instance = instance
1808 def Exec(self, feedback_fn):
1809 """Activate the disks.
1812 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1814 raise errors.OpExecError("Cannot activate block devices")
1819 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1820 """Prepare the block devices for an instance.
1822 This sets up the block devices on all nodes.
1825 instance: a ganeti.objects.Instance object
1826 ignore_secondaries: if true, errors on secondary nodes won't result
1827 in an error return from the function
1830 false if the operation failed
1831 list of (host, instance_visible_name, node_visible_name) if the operation
1832 suceeded with the mapping from node devices to instance devices
1836 iname = instance.name
1837 # With the two passes mechanism we try to reduce the window of
1838 # opportunity for the race condition of switching DRBD to primary
1839 # before handshaking occured, but we do not eliminate it
1841 # The proper fix would be to wait (with some limits) until the
1842 # connection has been made and drbd transitions from WFConnection
1843 # into any other network-connected state (Connected, SyncTarget,
1846 # 1st pass, assemble on all nodes in secondary mode
1847 for inst_disk in instance.disks:
1848 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1849 cfg.SetDiskID(node_disk, node)
1850 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1852 logger.Error("could not prepare block device %s on node %s"
1853 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1854 if not ignore_secondaries:
1857 # FIXME: race condition on drbd migration to primary
1859 # 2nd pass, do only the primary node
1860 for inst_disk in instance.disks:
1861 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1862 if node != instance.primary_node:
1864 cfg.SetDiskID(node_disk, node)
1865 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1867 logger.Error("could not prepare block device %s on node %s"
1868 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1870 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1872 # leave the disks configured for the primary node
1873 # this is a workaround that would be fixed better by
1874 # improving the logical/physical id handling
1875 for disk in instance.disks:
1876 cfg.SetDiskID(disk, instance.primary_node)
1878 return disks_ok, device_info
1881 def _StartInstanceDisks(cfg, instance, force):
1882 """Start the disks of an instance.
1885 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1886 ignore_secondaries=force)
1888 _ShutdownInstanceDisks(instance, cfg)
1889 if force is not None and not force:
1890 logger.Error("If the message above refers to a secondary node,"
1891 " you can retry the operation using '--force'.")
1892 raise errors.OpExecError("Disk consistency error")
1895 class LUDeactivateInstanceDisks(NoHooksLU):
1896 """Shutdown an instance's disks.
1899 _OP_REQP = ["instance_name"]
1901 def CheckPrereq(self):
1902 """Check prerequisites.
1904 This checks that the instance is in the cluster.
1907 instance = self.cfg.GetInstanceInfo(
1908 self.cfg.ExpandInstanceName(self.op.instance_name))
1909 if instance is None:
1910 raise errors.OpPrereqError("Instance '%s' not known" %
1911 self.op.instance_name)
1912 self.instance = instance
1914 def Exec(self, feedback_fn):
1915 """Deactivate the disks
1918 instance = self.instance
1919 ins_l = rpc.call_instance_list([instance.primary_node])
1920 ins_l = ins_l[instance.primary_node]
1921 if not type(ins_l) is list:
1922 raise errors.OpExecError("Can't contact node '%s'" %
1923 instance.primary_node)
1925 if self.instance.name in ins_l:
1926 raise errors.OpExecError("Instance is running, can't shutdown"
1929 _ShutdownInstanceDisks(instance, self.cfg)
1932 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1933 """Shutdown block devices of an instance.
1935 This does the shutdown on all nodes of the instance.
1937 If the ignore_primary is false, errors on the primary node are
1942 for disk in instance.disks:
1943 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1944 cfg.SetDiskID(top_disk, node)
1945 if not rpc.call_blockdev_shutdown(node, top_disk):
1946 logger.Error("could not shutdown block device %s on node %s" %
1947 (disk.iv_name, node))
1948 if not ignore_primary or node != instance.primary_node:
1953 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1954 """Checks if a node has enough free memory.
1956 This function check if a given node has the needed amount of free
1957 memory. In case the node has less memory or we cannot get the
1958 information from the node, this function raise an OpPrereqError
1962 - cfg: a ConfigWriter instance
1963 - node: the node name
1964 - reason: string to use in the error message
1965 - requested: the amount of memory in MiB
1968 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1969 if not nodeinfo or not isinstance(nodeinfo, dict):
1970 raise errors.OpPrereqError("Could not contact node %s for resource"
1971 " information" % (node,))
1973 free_mem = nodeinfo[node].get('memory_free')
1974 if not isinstance(free_mem, int):
1975 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1976 " was '%s'" % (node, free_mem))
1977 if requested > free_mem:
1978 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1979 " needed %s MiB, available %s MiB" %
1980 (node, reason, requested, free_mem))
1983 class LUStartupInstance(LogicalUnit):
1984 """Starts an instance.
1987 HPATH = "instance-start"
1988 HTYPE = constants.HTYPE_INSTANCE
1989 _OP_REQP = ["instance_name", "force"]
1991 def BuildHooksEnv(self):
1994 This runs on master, primary and secondary nodes of the instance.
1998 "FORCE": self.op.force,
2000 env.update(_BuildInstanceHookEnvByObject(self.instance))
2001 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2002 list(self.instance.secondary_nodes))
2005 def CheckPrereq(self):
2006 """Check prerequisites.
2008 This checks that the instance is in the cluster.
2011 instance = self.cfg.GetInstanceInfo(
2012 self.cfg.ExpandInstanceName(self.op.instance_name))
2013 if instance is None:
2014 raise errors.OpPrereqError("Instance '%s' not known" %
2015 self.op.instance_name)
2017 # check bridges existance
2018 _CheckInstanceBridgesExist(instance)
2020 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2021 "starting instance %s" % instance.name,
2024 self.instance = instance
2025 self.op.instance_name = instance.name
2027 def Exec(self, feedback_fn):
2028 """Start the instance.
2031 instance = self.instance
2032 force = self.op.force
2033 extra_args = getattr(self.op, "extra_args", "")
2035 self.cfg.MarkInstanceUp(instance.name)
2037 node_current = instance.primary_node
2039 _StartInstanceDisks(self.cfg, instance, force)
2041 if not rpc.call_instance_start(node_current, instance, extra_args):
2042 _ShutdownInstanceDisks(instance, self.cfg)
2043 raise errors.OpExecError("Could not start instance")
2046 class LURebootInstance(LogicalUnit):
2047 """Reboot an instance.
2050 HPATH = "instance-reboot"
2051 HTYPE = constants.HTYPE_INSTANCE
2052 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2054 def BuildHooksEnv(self):
2057 This runs on master, primary and secondary nodes of the instance.
2061 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2063 env.update(_BuildInstanceHookEnvByObject(self.instance))
2064 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2065 list(self.instance.secondary_nodes))
2068 def CheckPrereq(self):
2069 """Check prerequisites.
2071 This checks that the instance is in the cluster.
2074 instance = self.cfg.GetInstanceInfo(
2075 self.cfg.ExpandInstanceName(self.op.instance_name))
2076 if instance is None:
2077 raise errors.OpPrereqError("Instance '%s' not known" %
2078 self.op.instance_name)
2080 # check bridges existance
2081 _CheckInstanceBridgesExist(instance)
2083 self.instance = instance
2084 self.op.instance_name = instance.name
2086 def Exec(self, feedback_fn):
2087 """Reboot the instance.
2090 instance = self.instance
2091 ignore_secondaries = self.op.ignore_secondaries
2092 reboot_type = self.op.reboot_type
2093 extra_args = getattr(self.op, "extra_args", "")
2095 node_current = instance.primary_node
2097 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2098 constants.INSTANCE_REBOOT_HARD,
2099 constants.INSTANCE_REBOOT_FULL]:
2100 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2101 (constants.INSTANCE_REBOOT_SOFT,
2102 constants.INSTANCE_REBOOT_HARD,
2103 constants.INSTANCE_REBOOT_FULL))
2105 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2106 constants.INSTANCE_REBOOT_HARD]:
2107 if not rpc.call_instance_reboot(node_current, instance,
2108 reboot_type, extra_args):
2109 raise errors.OpExecError("Could not reboot instance")
2111 if not rpc.call_instance_shutdown(node_current, instance):
2112 raise errors.OpExecError("could not shutdown instance for full reboot")
2113 _ShutdownInstanceDisks(instance, self.cfg)
2114 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2115 if not rpc.call_instance_start(node_current, instance, extra_args):
2116 _ShutdownInstanceDisks(instance, self.cfg)
2117 raise errors.OpExecError("Could not start instance for full reboot")
2119 self.cfg.MarkInstanceUp(instance.name)
2122 class LUShutdownInstance(LogicalUnit):
2123 """Shutdown an instance.
2126 HPATH = "instance-stop"
2127 HTYPE = constants.HTYPE_INSTANCE
2128 _OP_REQP = ["instance_name"]
2130 def BuildHooksEnv(self):
2133 This runs on master, primary and secondary nodes of the instance.
2136 env = _BuildInstanceHookEnvByObject(self.instance)
2137 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2138 list(self.instance.secondary_nodes))
2141 def CheckPrereq(self):
2142 """Check prerequisites.
2144 This checks that the instance is in the cluster.
2147 instance = self.cfg.GetInstanceInfo(
2148 self.cfg.ExpandInstanceName(self.op.instance_name))
2149 if instance is None:
2150 raise errors.OpPrereqError("Instance '%s' not known" %
2151 self.op.instance_name)
2152 self.instance = instance
2154 def Exec(self, feedback_fn):
2155 """Shutdown the instance.
2158 instance = self.instance
2159 node_current = instance.primary_node
2160 self.cfg.MarkInstanceDown(instance.name)
2161 if not rpc.call_instance_shutdown(node_current, instance):
2162 logger.Error("could not shutdown instance")
2164 _ShutdownInstanceDisks(instance, self.cfg)
2167 class LUReinstallInstance(LogicalUnit):
2168 """Reinstall an instance.
2171 HPATH = "instance-reinstall"
2172 HTYPE = constants.HTYPE_INSTANCE
2173 _OP_REQP = ["instance_name"]
2175 def BuildHooksEnv(self):
2178 This runs on master, primary and secondary nodes of the instance.
2181 env = _BuildInstanceHookEnvByObject(self.instance)
2182 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183 list(self.instance.secondary_nodes))
2186 def CheckPrereq(self):
2187 """Check prerequisites.
2189 This checks that the instance is in the cluster and is not running.
2192 instance = self.cfg.GetInstanceInfo(
2193 self.cfg.ExpandInstanceName(self.op.instance_name))
2194 if instance is None:
2195 raise errors.OpPrereqError("Instance '%s' not known" %
2196 self.op.instance_name)
2197 if instance.disk_template == constants.DT_DISKLESS:
2198 raise errors.OpPrereqError("Instance '%s' has no disks" %
2199 self.op.instance_name)
2200 if instance.status != "down":
2201 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2202 self.op.instance_name)
2203 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2205 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2206 (self.op.instance_name,
2207 instance.primary_node))
2209 self.op.os_type = getattr(self.op, "os_type", None)
2210 if self.op.os_type is not None:
2212 pnode = self.cfg.GetNodeInfo(
2213 self.cfg.ExpandNodeName(instance.primary_node))
2215 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2217 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2219 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2220 " primary node" % self.op.os_type)
2222 self.instance = instance
2224 def Exec(self, feedback_fn):
2225 """Reinstall the instance.
2228 inst = self.instance
2230 if self.op.os_type is not None:
2231 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2232 inst.os = self.op.os_type
2233 self.cfg.AddInstance(inst)
2235 _StartInstanceDisks(self.cfg, inst, None)
2237 feedback_fn("Running the instance OS create scripts...")
2238 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2239 raise errors.OpExecError("Could not install OS for instance %s"
2241 (inst.name, inst.primary_node))
2243 _ShutdownInstanceDisks(inst, self.cfg)
2246 class LURenameInstance(LogicalUnit):
2247 """Rename an instance.
2250 HPATH = "instance-rename"
2251 HTYPE = constants.HTYPE_INSTANCE
2252 _OP_REQP = ["instance_name", "new_name"]
2254 def BuildHooksEnv(self):
2257 This runs on master, primary and secondary nodes of the instance.
2260 env = _BuildInstanceHookEnvByObject(self.instance)
2261 env["INSTANCE_NEW_NAME"] = self.op.new_name
2262 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2263 list(self.instance.secondary_nodes))
2266 def CheckPrereq(self):
2267 """Check prerequisites.
2269 This checks that the instance is in the cluster and is not running.
2272 instance = self.cfg.GetInstanceInfo(
2273 self.cfg.ExpandInstanceName(self.op.instance_name))
2274 if instance is None:
2275 raise errors.OpPrereqError("Instance '%s' not known" %
2276 self.op.instance_name)
2277 if instance.status != "down":
2278 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2279 self.op.instance_name)
2280 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2282 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2283 (self.op.instance_name,
2284 instance.primary_node))
2285 self.instance = instance
2287 # new name verification
2288 name_info = utils.HostInfo(self.op.new_name)
2290 self.op.new_name = new_name = name_info.name
2291 instance_list = self.cfg.GetInstanceList()
2292 if new_name in instance_list:
2293 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2296 if not getattr(self.op, "ignore_ip", False):
2297 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2298 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2299 (name_info.ip, new_name))
2302 def Exec(self, feedback_fn):
2303 """Reinstall the instance.
2306 inst = self.instance
2307 old_name = inst.name
2309 if inst.disk_template == constants.DT_FILE:
2310 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2312 self.cfg.RenameInstance(inst.name, self.op.new_name)
2314 # re-read the instance from the configuration after rename
2315 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2317 if inst.disk_template == constants.DT_FILE:
2318 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2319 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2320 old_file_storage_dir,
2321 new_file_storage_dir)
2324 raise errors.OpExecError("Could not connect to node '%s' to rename"
2325 " directory '%s' to '%s' (but the instance"
2326 " has been renamed in Ganeti)" % (
2327 inst.primary_node, old_file_storage_dir,
2328 new_file_storage_dir))
2331 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2332 " (but the instance has been renamed in"
2333 " Ganeti)" % (old_file_storage_dir,
2334 new_file_storage_dir))
2336 _StartInstanceDisks(self.cfg, inst, None)
2338 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2340 msg = ("Could run OS rename script for instance %s on node %s (but the"
2341 " instance has been renamed in Ganeti)" %
2342 (inst.name, inst.primary_node))
2345 _ShutdownInstanceDisks(inst, self.cfg)
2348 class LURemoveInstance(LogicalUnit):
2349 """Remove an instance.
2352 HPATH = "instance-remove"
2353 HTYPE = constants.HTYPE_INSTANCE
2354 _OP_REQP = ["instance_name", "ignore_failures"]
2356 def BuildHooksEnv(self):
2359 This runs on master, primary and secondary nodes of the instance.
2362 env = _BuildInstanceHookEnvByObject(self.instance)
2363 nl = [self.sstore.GetMasterNode()]
2366 def CheckPrereq(self):
2367 """Check prerequisites.
2369 This checks that the instance is in the cluster.
2372 instance = self.cfg.GetInstanceInfo(
2373 self.cfg.ExpandInstanceName(self.op.instance_name))
2374 if instance is None:
2375 raise errors.OpPrereqError("Instance '%s' not known" %
2376 self.op.instance_name)
2377 self.instance = instance
2379 def Exec(self, feedback_fn):
2380 """Remove the instance.
2383 instance = self.instance
2384 logger.Info("shutting down instance %s on node %s" %
2385 (instance.name, instance.primary_node))
2387 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2388 if self.op.ignore_failures:
2389 feedback_fn("Warning: can't shutdown instance")
2391 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2392 (instance.name, instance.primary_node))
2394 logger.Info("removing block devices for instance %s" % instance.name)
2396 if not _RemoveDisks(instance, self.cfg):
2397 if self.op.ignore_failures:
2398 feedback_fn("Warning: can't remove instance's disks")
2400 raise errors.OpExecError("Can't remove instance's disks")
2402 logger.Info("removing instance %s out of cluster config" % instance.name)
2404 self.cfg.RemoveInstance(instance.name)
2405 # Remove the new instance from the Ganeti Lock Manager
2406 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2409 class LUQueryInstances(NoHooksLU):
2410 """Logical unit for querying instances.
2413 _OP_REQP = ["output_fields", "names"]
2415 def CheckPrereq(self):
2416 """Check prerequisites.
2418 This checks that the fields required are valid output fields.
2421 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2422 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423 "admin_state", "admin_ram",
2424 "disk_template", "ip", "mac", "bridge",
2425 "sda_size", "sdb_size", "vcpus", "tags"],
2426 dynamic=self.dynamic_fields,
2427 selected=self.op.output_fields)
2429 self.wanted = _GetWantedInstances(self, self.op.names)
2431 def Exec(self, feedback_fn):
2432 """Computes the list of nodes and their attributes.
2435 instance_names = self.wanted
2436 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2439 # begin data gathering
2441 nodes = frozenset([inst.primary_node for inst in instance_list])
2444 if self.dynamic_fields.intersection(self.op.output_fields):
2446 node_data = rpc.call_all_instances_info(nodes)
2448 result = node_data[name]
2450 live_data.update(result)
2451 elif result == False:
2452 bad_nodes.append(name)
2453 # else no instance is alive
2455 live_data = dict([(name, {}) for name in instance_names])
2457 # end data gathering
2460 for instance in instance_list:
2462 for field in self.op.output_fields:
2467 elif field == "pnode":
2468 val = instance.primary_node
2469 elif field == "snodes":
2470 val = list(instance.secondary_nodes)
2471 elif field == "admin_state":
2472 val = (instance.status != "down")
2473 elif field == "oper_state":
2474 if instance.primary_node in bad_nodes:
2477 val = bool(live_data.get(instance.name))
2478 elif field == "status":
2479 if instance.primary_node in bad_nodes:
2480 val = "ERROR_nodedown"
2482 running = bool(live_data.get(instance.name))
2484 if instance.status != "down":
2489 if instance.status != "down":
2493 elif field == "admin_ram":
2494 val = instance.memory
2495 elif field == "oper_ram":
2496 if instance.primary_node in bad_nodes:
2498 elif instance.name in live_data:
2499 val = live_data[instance.name].get("memory", "?")
2502 elif field == "disk_template":
2503 val = instance.disk_template
2505 val = instance.nics[0].ip
2506 elif field == "bridge":
2507 val = instance.nics[0].bridge
2508 elif field == "mac":
2509 val = instance.nics[0].mac
2510 elif field == "sda_size" or field == "sdb_size":
2511 disk = instance.FindDisk(field[:3])
2516 elif field == "vcpus":
2517 val = instance.vcpus
2518 elif field == "tags":
2519 val = list(instance.GetTags())
2521 raise errors.ParameterError(field)
2528 class LUFailoverInstance(LogicalUnit):
2529 """Failover an instance.
2532 HPATH = "instance-failover"
2533 HTYPE = constants.HTYPE_INSTANCE
2534 _OP_REQP = ["instance_name", "ignore_consistency"]
2536 def BuildHooksEnv(self):
2539 This runs on master, primary and secondary nodes of the instance.
2543 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2545 env.update(_BuildInstanceHookEnvByObject(self.instance))
2546 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2549 def CheckPrereq(self):
2550 """Check prerequisites.
2552 This checks that the instance is in the cluster.
2555 instance = self.cfg.GetInstanceInfo(
2556 self.cfg.ExpandInstanceName(self.op.instance_name))
2557 if instance is None:
2558 raise errors.OpPrereqError("Instance '%s' not known" %
2559 self.op.instance_name)
2561 if instance.disk_template not in constants.DTS_NET_MIRROR:
2562 raise errors.OpPrereqError("Instance's disk layout is not"
2563 " network mirrored, cannot failover.")
2565 secondary_nodes = instance.secondary_nodes
2566 if not secondary_nodes:
2567 raise errors.ProgrammerError("no secondary node but using "
2568 "a mirrored disk template")
2570 target_node = secondary_nodes[0]
2571 # check memory requirements on the secondary node
2572 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2573 instance.name, instance.memory)
2575 # check bridge existance
2576 brlist = [nic.bridge for nic in instance.nics]
2577 if not rpc.call_bridges_exist(target_node, brlist):
2578 raise errors.OpPrereqError("One or more target bridges %s does not"
2579 " exist on destination node '%s'" %
2580 (brlist, target_node))
2582 self.instance = instance
2584 def Exec(self, feedback_fn):
2585 """Failover an instance.
2587 The failover is done by shutting it down on its present node and
2588 starting it on the secondary.
2591 instance = self.instance
2593 source_node = instance.primary_node
2594 target_node = instance.secondary_nodes[0]
2596 feedback_fn("* checking disk consistency between source and target")
2597 for dev in instance.disks:
2598 # for drbd, these are drbd over lvm
2599 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2600 if instance.status == "up" and not self.op.ignore_consistency:
2601 raise errors.OpExecError("Disk %s is degraded on target node,"
2602 " aborting failover." % dev.iv_name)
2604 feedback_fn("* shutting down instance on source node")
2605 logger.Info("Shutting down instance %s on node %s" %
2606 (instance.name, source_node))
2608 if not rpc.call_instance_shutdown(source_node, instance):
2609 if self.op.ignore_consistency:
2610 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2611 " anyway. Please make sure node %s is down" %
2612 (instance.name, source_node, source_node))
2614 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2615 (instance.name, source_node))
2617 feedback_fn("* deactivating the instance's disks on source node")
2618 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2619 raise errors.OpExecError("Can't shut down the instance's disks.")
2621 instance.primary_node = target_node
2622 # distribute new instance config to the other nodes
2623 self.cfg.Update(instance)
2625 # Only start the instance if it's marked as up
2626 if instance.status == "up":
2627 feedback_fn("* activating the instance's disks on target node")
2628 logger.Info("Starting instance %s on node %s" %
2629 (instance.name, target_node))
2631 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2632 ignore_secondaries=True)
2634 _ShutdownInstanceDisks(instance, self.cfg)
2635 raise errors.OpExecError("Can't activate the instance's disks")
2637 feedback_fn("* starting the instance on the target node")
2638 if not rpc.call_instance_start(target_node, instance, None):
2639 _ShutdownInstanceDisks(instance, self.cfg)
2640 raise errors.OpExecError("Could not start instance %s on node %s." %
2641 (instance.name, target_node))
2644 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2645 """Create a tree of block devices on the primary node.
2647 This always creates all devices.
2651 for child in device.children:
2652 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2655 cfg.SetDiskID(device, node)
2656 new_id = rpc.call_blockdev_create(node, device, device.size,
2657 instance.name, True, info)
2660 if device.physical_id is None:
2661 device.physical_id = new_id
2665 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2666 """Create a tree of block devices on a secondary node.
2668 If this device type has to be created on secondaries, create it and
2671 If not, just recurse to children keeping the same 'force' value.
2674 if device.CreateOnSecondary():
2677 for child in device.children:
2678 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2679 child, force, info):
2684 cfg.SetDiskID(device, node)
2685 new_id = rpc.call_blockdev_create(node, device, device.size,
2686 instance.name, False, info)
2689 if device.physical_id is None:
2690 device.physical_id = new_id
2694 def _GenerateUniqueNames(cfg, exts):
2695 """Generate a suitable LV name.
2697 This will generate a logical volume name for the given instance.
2702 new_id = cfg.GenerateUniqueID()
2703 results.append("%s%s" % (new_id, val))
2707 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2708 """Generate a drbd8 device complete with its children.
2711 port = cfg.AllocatePort()
2712 vgname = cfg.GetVGName()
2713 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2714 logical_id=(vgname, names[0]))
2715 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2716 logical_id=(vgname, names[1]))
2717 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2718 logical_id = (primary, secondary, port),
2719 children = [dev_data, dev_meta],
2724 def _GenerateDiskTemplate(cfg, template_name,
2725 instance_name, primary_node,
2726 secondary_nodes, disk_sz, swap_sz,
2727 file_storage_dir, file_driver):
2728 """Generate the entire disk layout for a given template type.
2731 #TODO: compute space requirements
2733 vgname = cfg.GetVGName()
2734 if template_name == constants.DT_DISKLESS:
2736 elif template_name == constants.DT_PLAIN:
2737 if len(secondary_nodes) != 0:
2738 raise errors.ProgrammerError("Wrong template configuration")
2740 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2741 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2742 logical_id=(vgname, names[0]),
2744 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2745 logical_id=(vgname, names[1]),
2747 disks = [sda_dev, sdb_dev]
2748 elif template_name == constants.DT_DRBD8:
2749 if len(secondary_nodes) != 1:
2750 raise errors.ProgrammerError("Wrong template configuration")
2751 remote_node = secondary_nodes[0]
2752 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2753 ".sdb_data", ".sdb_meta"])
2754 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2755 disk_sz, names[0:2], "sda")
2756 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2757 swap_sz, names[2:4], "sdb")
2758 disks = [drbd_sda_dev, drbd_sdb_dev]
2759 elif template_name == constants.DT_FILE:
2760 if len(secondary_nodes) != 0:
2761 raise errors.ProgrammerError("Wrong template configuration")
2763 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2764 iv_name="sda", logical_id=(file_driver,
2765 "%s/sda" % file_storage_dir))
2766 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2767 iv_name="sdb", logical_id=(file_driver,
2768 "%s/sdb" % file_storage_dir))
2769 disks = [file_sda_dev, file_sdb_dev]
2771 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2775 def _GetInstanceInfoText(instance):
2776 """Compute that text that should be added to the disk's metadata.
2779 return "originstname+%s" % instance.name
2782 def _CreateDisks(cfg, instance):
2783 """Create all disks for an instance.
2785 This abstracts away some work from AddInstance.
2788 instance: the instance object
2791 True or False showing the success of the creation process
2794 info = _GetInstanceInfoText(instance)
2796 if instance.disk_template == constants.DT_FILE:
2797 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2798 result = rpc.call_file_storage_dir_create(instance.primary_node,
2802 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2806 logger.Error("failed to create directory '%s'" % file_storage_dir)
2809 for device in instance.disks:
2810 logger.Info("creating volume %s for instance %s" %
2811 (device.iv_name, instance.name))
2813 for secondary_node in instance.secondary_nodes:
2814 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2815 device, False, info):
2816 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2817 (device.iv_name, device, secondary_node))
2820 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2821 instance, device, info):
2822 logger.Error("failed to create volume %s on primary!" %
2829 def _RemoveDisks(instance, cfg):
2830 """Remove all disks for an instance.
2832 This abstracts away some work from `AddInstance()` and
2833 `RemoveInstance()`. Note that in case some of the devices couldn't
2834 be removed, the removal will continue with the other ones (compare
2835 with `_CreateDisks()`).
2838 instance: the instance object
2841 True or False showing the success of the removal proces
2844 logger.Info("removing block devices for instance %s" % instance.name)
2847 for device in instance.disks:
2848 for node, disk in device.ComputeNodeTree(instance.primary_node):
2849 cfg.SetDiskID(disk, node)
2850 if not rpc.call_blockdev_remove(node, disk):
2851 logger.Error("could not remove block device %s on node %s,"
2852 " continuing anyway" %
2853 (device.iv_name, node))
2856 if instance.disk_template == constants.DT_FILE:
2857 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2858 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2860 logger.Error("could not remove directory '%s'" % file_storage_dir)
2866 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2867 """Compute disk size requirements in the volume group
2869 This is currently hard-coded for the two-drive layout.
2872 # Required free disk space as a function of disk and swap space
2874 constants.DT_DISKLESS: None,
2875 constants.DT_PLAIN: disk_size + swap_size,
2876 # 256 MB are added for drbd metadata, 128MB for each drbd device
2877 constants.DT_DRBD8: disk_size + swap_size + 256,
2878 constants.DT_FILE: None,
2881 if disk_template not in req_size_dict:
2882 raise errors.ProgrammerError("Disk template '%s' size requirement"
2883 " is unknown" % disk_template)
2885 return req_size_dict[disk_template]
2888 class LUCreateInstance(LogicalUnit):
2889 """Create an instance.
2892 HPATH = "instance-add"
2893 HTYPE = constants.HTYPE_INSTANCE
2894 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2895 "disk_template", "swap_size", "mode", "start", "vcpus",
2896 "wait_for_sync", "ip_check", "mac"]
2898 def _RunAllocator(self):
2899 """Run the allocator based on input opcode.
2902 disks = [{"size": self.op.disk_size, "mode": "w"},
2903 {"size": self.op.swap_size, "mode": "w"}]
2904 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2905 "bridge": self.op.bridge}]
2906 ial = IAllocator(self.cfg, self.sstore,
2907 mode=constants.IALLOCATOR_MODE_ALLOC,
2908 name=self.op.instance_name,
2909 disk_template=self.op.disk_template,
2912 vcpus=self.op.vcpus,
2913 mem_size=self.op.mem_size,
2918 ial.Run(self.op.iallocator)
2921 raise errors.OpPrereqError("Can't compute nodes using"
2922 " iallocator '%s': %s" % (self.op.iallocator,
2924 if len(ial.nodes) != ial.required_nodes:
2925 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2926 " of nodes (%s), required %s" %
2927 (len(ial.nodes), ial.required_nodes))
2928 self.op.pnode = ial.nodes[0]
2929 logger.ToStdout("Selected nodes for the instance: %s" %
2930 (", ".join(ial.nodes),))
2931 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2932 (self.op.instance_name, self.op.iallocator, ial.nodes))
2933 if ial.required_nodes == 2:
2934 self.op.snode = ial.nodes[1]
2936 def BuildHooksEnv(self):
2939 This runs on master, primary and secondary nodes of the instance.
2943 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2944 "INSTANCE_DISK_SIZE": self.op.disk_size,
2945 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2946 "INSTANCE_ADD_MODE": self.op.mode,
2948 if self.op.mode == constants.INSTANCE_IMPORT:
2949 env["INSTANCE_SRC_NODE"] = self.op.src_node
2950 env["INSTANCE_SRC_PATH"] = self.op.src_path
2951 env["INSTANCE_SRC_IMAGE"] = self.src_image
2953 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2954 primary_node=self.op.pnode,
2955 secondary_nodes=self.secondaries,
2956 status=self.instance_status,
2957 os_type=self.op.os_type,
2958 memory=self.op.mem_size,
2959 vcpus=self.op.vcpus,
2960 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2963 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2968 def CheckPrereq(self):
2969 """Check prerequisites.
2972 # set optional parameters to none if they don't exist
2973 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2974 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2975 "vnc_bind_address"]:
2976 if not hasattr(self.op, attr):
2977 setattr(self.op, attr, None)
2979 if self.op.mode not in (constants.INSTANCE_CREATE,
2980 constants.INSTANCE_IMPORT):
2981 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2984 if (not self.cfg.GetVGName() and
2985 self.op.disk_template not in constants.DTS_NOT_LVM):
2986 raise errors.OpPrereqError("Cluster does not support lvm-based"
2989 if self.op.mode == constants.INSTANCE_IMPORT:
2990 src_node = getattr(self.op, "src_node", None)
2991 src_path = getattr(self.op, "src_path", None)
2992 if src_node is None or src_path is None:
2993 raise errors.OpPrereqError("Importing an instance requires source"
2994 " node and path options")
2995 src_node_full = self.cfg.ExpandNodeName(src_node)
2996 if src_node_full is None:
2997 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2998 self.op.src_node = src_node = src_node_full
3000 if not os.path.isabs(src_path):
3001 raise errors.OpPrereqError("The source path must be absolute")
3003 export_info = rpc.call_export_info(src_node, src_path)
3006 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3008 if not export_info.has_section(constants.INISECT_EXP):
3009 raise errors.ProgrammerError("Corrupted export config")
3011 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3012 if (int(ei_version) != constants.EXPORT_VERSION):
3013 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3014 (ei_version, constants.EXPORT_VERSION))
3016 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3017 raise errors.OpPrereqError("Can't import instance with more than"
3020 # FIXME: are the old os-es, disk sizes, etc. useful?
3021 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3022 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3024 self.src_image = diskimage
3025 else: # INSTANCE_CREATE
3026 if getattr(self.op, "os_type", None) is None:
3027 raise errors.OpPrereqError("No guest OS specified")
3029 #### instance parameters check
3031 # disk template and mirror node verification
3032 if self.op.disk_template not in constants.DISK_TEMPLATES:
3033 raise errors.OpPrereqError("Invalid disk template name")
3035 # instance name verification
3036 hostname1 = utils.HostInfo(self.op.instance_name)
3038 self.op.instance_name = instance_name = hostname1.name
3039 instance_list = self.cfg.GetInstanceList()
3040 if instance_name in instance_list:
3041 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3044 # ip validity checks
3045 ip = getattr(self.op, "ip", None)
3046 if ip is None or ip.lower() == "none":
3048 elif ip.lower() == "auto":
3049 inst_ip = hostname1.ip
3051 if not utils.IsValidIP(ip):
3052 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3053 " like a valid IP" % ip)
3055 self.inst_ip = self.op.ip = inst_ip
3057 if self.op.start and not self.op.ip_check:
3058 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3059 " adding an instance in start mode")
3061 if self.op.ip_check:
3062 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3063 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3064 (hostname1.ip, instance_name))
3066 # MAC address verification
3067 if self.op.mac != "auto":
3068 if not utils.IsValidMac(self.op.mac.lower()):
3069 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3072 # bridge verification
3073 bridge = getattr(self.op, "bridge", None)
3075 self.op.bridge = self.cfg.GetDefBridge()
3077 self.op.bridge = bridge
3079 # boot order verification
3080 if self.op.hvm_boot_order is not None:
3081 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3082 raise errors.OpPrereqError("invalid boot order specified,"
3083 " must be one or more of [acdn]")
3084 # file storage checks
3085 if (self.op.file_driver and
3086 not self.op.file_driver in constants.FILE_DRIVER):
3087 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3088 self.op.file_driver)
3090 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3091 raise errors.OpPrereqError("File storage directory not a relative"
3095 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3096 raise errors.OpPrereqError("One and only one of iallocator and primary"
3097 " node must be given")
3099 if self.op.iallocator is not None:
3100 self._RunAllocator()
3102 #### node related checks
3104 # check primary node
3105 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3107 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3109 self.op.pnode = pnode.name
3111 self.secondaries = []
3113 # mirror node verification
3114 if self.op.disk_template in constants.DTS_NET_MIRROR:
3115 if getattr(self.op, "snode", None) is None:
3116 raise errors.OpPrereqError("The networked disk templates need"
3119 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3120 if snode_name is None:
3121 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3123 elif snode_name == pnode.name:
3124 raise errors.OpPrereqError("The secondary node cannot be"
3125 " the primary node.")
3126 self.secondaries.append(snode_name)
3128 req_size = _ComputeDiskSize(self.op.disk_template,
3129 self.op.disk_size, self.op.swap_size)
3131 # Check lv size requirements
3132 if req_size is not None:
3133 nodenames = [pnode.name] + self.secondaries
3134 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3135 for node in nodenames:
3136 info = nodeinfo.get(node, None)
3138 raise errors.OpPrereqError("Cannot get current information"
3139 " from node '%s'" % node)
3140 vg_free = info.get('vg_free', None)
3141 if not isinstance(vg_free, int):
3142 raise errors.OpPrereqError("Can't compute free disk space on"
3144 if req_size > info['vg_free']:
3145 raise errors.OpPrereqError("Not enough disk space on target node %s."
3146 " %d MB available, %d MB required" %
3147 (node, info['vg_free'], req_size))
3150 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3152 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3153 " primary node" % self.op.os_type)
3155 if self.op.kernel_path == constants.VALUE_NONE:
3156 raise errors.OpPrereqError("Can't set instance kernel to none")
3159 # bridge check on primary node
3160 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3161 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3162 " destination node '%s'" %
3163 (self.op.bridge, pnode.name))
3165 # memory check on primary node
3167 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3168 "creating instance %s" % self.op.instance_name,
3171 # hvm_cdrom_image_path verification
3172 if self.op.hvm_cdrom_image_path is not None:
3173 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3174 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3175 " be an absolute path or None, not %s" %
3176 self.op.hvm_cdrom_image_path)
3177 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3178 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3179 " regular file or a symlink pointing to"
3180 " an existing regular file, not %s" %
3181 self.op.hvm_cdrom_image_path)
3183 # vnc_bind_address verification
3184 if self.op.vnc_bind_address is not None:
3185 if not utils.IsValidIP(self.op.vnc_bind_address):
3186 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3187 " like a valid IP address" %
3188 self.op.vnc_bind_address)
3191 self.instance_status = 'up'
3193 self.instance_status = 'down'
3195 def Exec(self, feedback_fn):
3196 """Create and add the instance to the cluster.
3199 instance = self.op.instance_name
3200 pnode_name = self.pnode.name
3202 if self.op.mac == "auto":
3203 mac_address = self.cfg.GenerateMAC()
3205 mac_address = self.op.mac
3207 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3208 if self.inst_ip is not None:
3209 nic.ip = self.inst_ip
3211 ht_kind = self.sstore.GetHypervisorType()
3212 if ht_kind in constants.HTS_REQ_PORT:
3213 network_port = self.cfg.AllocatePort()
3217 if self.op.vnc_bind_address is None:
3218 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3220 # this is needed because os.path.join does not accept None arguments
3221 if self.op.file_storage_dir is None:
3222 string_file_storage_dir = ""
3224 string_file_storage_dir = self.op.file_storage_dir
3226 # build the full file storage dir path
3227 file_storage_dir = os.path.normpath(os.path.join(
3228 self.sstore.GetFileStorageDir(),
3229 string_file_storage_dir, instance))
3232 disks = _GenerateDiskTemplate(self.cfg,
3233 self.op.disk_template,
3234 instance, pnode_name,
3235 self.secondaries, self.op.disk_size,
3238 self.op.file_driver)
3240 iobj = objects.Instance(name=instance, os=self.op.os_type,
3241 primary_node=pnode_name,
3242 memory=self.op.mem_size,
3243 vcpus=self.op.vcpus,
3244 nics=[nic], disks=disks,
3245 disk_template=self.op.disk_template,
3246 status=self.instance_status,
3247 network_port=network_port,
3248 kernel_path=self.op.kernel_path,
3249 initrd_path=self.op.initrd_path,
3250 hvm_boot_order=self.op.hvm_boot_order,
3251 hvm_acpi=self.op.hvm_acpi,
3252 hvm_pae=self.op.hvm_pae,
3253 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3254 vnc_bind_address=self.op.vnc_bind_address,
3257 feedback_fn("* creating instance disks...")
3258 if not _CreateDisks(self.cfg, iobj):
3259 _RemoveDisks(iobj, self.cfg)
3260 raise errors.OpExecError("Device creation failed, reverting...")
3262 feedback_fn("adding instance %s to cluster config" % instance)
3264 self.cfg.AddInstance(iobj)
3265 # Add the new instance to the Ganeti Lock Manager
3266 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3268 if self.op.wait_for_sync:
3269 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3270 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3271 # make sure the disks are not degraded (still sync-ing is ok)
3273 feedback_fn("* checking mirrors status")
3274 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3279 _RemoveDisks(iobj, self.cfg)
3280 self.cfg.RemoveInstance(iobj.name)
3281 # Remove the new instance from the Ganeti Lock Manager
3282 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3283 raise errors.OpExecError("There are some degraded disks for"
3286 feedback_fn("creating os for instance %s on node %s" %
3287 (instance, pnode_name))
3289 if iobj.disk_template != constants.DT_DISKLESS:
3290 if self.op.mode == constants.INSTANCE_CREATE:
3291 feedback_fn("* running the instance OS create scripts...")
3292 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3293 raise errors.OpExecError("could not add os for instance %s"
3295 (instance, pnode_name))
3297 elif self.op.mode == constants.INSTANCE_IMPORT:
3298 feedback_fn("* running the instance OS import scripts...")
3299 src_node = self.op.src_node
3300 src_image = self.src_image
3301 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3302 src_node, src_image):
3303 raise errors.OpExecError("Could not import os for instance"
3305 (instance, pnode_name))
3307 # also checked in the prereq part
3308 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3312 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3313 feedback_fn("* starting instance...")
3314 if not rpc.call_instance_start(pnode_name, iobj, None):
3315 raise errors.OpExecError("Could not start instance")
3318 class LUConnectConsole(NoHooksLU):
3319 """Connect to an instance's console.
3321 This is somewhat special in that it returns the command line that
3322 you need to run on the master node in order to connect to the
3326 _OP_REQP = ["instance_name"]
3328 def CheckPrereq(self):
3329 """Check prerequisites.
3331 This checks that the instance is in the cluster.
3334 instance = self.cfg.GetInstanceInfo(
3335 self.cfg.ExpandInstanceName(self.op.instance_name))
3336 if instance is None:
3337 raise errors.OpPrereqError("Instance '%s' not known" %
3338 self.op.instance_name)
3339 self.instance = instance
3341 def Exec(self, feedback_fn):
3342 """Connect to the console of an instance
3345 instance = self.instance
3346 node = instance.primary_node
3348 node_insts = rpc.call_instance_list([node])[node]
3349 if node_insts is False:
3350 raise errors.OpExecError("Can't connect to node %s." % node)
3352 if instance.name not in node_insts:
3353 raise errors.OpExecError("Instance %s is not running." % instance.name)
3355 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3357 hyper = hypervisor.GetHypervisor()
3358 console_cmd = hyper.GetShellCommandForConsole(instance)
3361 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3364 class LUReplaceDisks(LogicalUnit):
3365 """Replace the disks of an instance.
3368 HPATH = "mirrors-replace"
3369 HTYPE = constants.HTYPE_INSTANCE
3370 _OP_REQP = ["instance_name", "mode", "disks"]
3372 def _RunAllocator(self):
3373 """Compute a new secondary node using an IAllocator.
3376 ial = IAllocator(self.cfg, self.sstore,
3377 mode=constants.IALLOCATOR_MODE_RELOC,
3378 name=self.op.instance_name,
3379 relocate_from=[self.sec_node])
3381 ial.Run(self.op.iallocator)
3384 raise errors.OpPrereqError("Can't compute nodes using"
3385 " iallocator '%s': %s" % (self.op.iallocator,
3387 if len(ial.nodes) != ial.required_nodes:
3388 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3389 " of nodes (%s), required %s" %
3390 (len(ial.nodes), ial.required_nodes))
3391 self.op.remote_node = ial.nodes[0]
3392 logger.ToStdout("Selected new secondary for the instance: %s" %
3393 self.op.remote_node)
3395 def BuildHooksEnv(self):
3398 This runs on the master, the primary and all the secondaries.
3402 "MODE": self.op.mode,
3403 "NEW_SECONDARY": self.op.remote_node,
3404 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3406 env.update(_BuildInstanceHookEnvByObject(self.instance))
3408 self.sstore.GetMasterNode(),
3409 self.instance.primary_node,
3411 if self.op.remote_node is not None:
3412 nl.append(self.op.remote_node)
3415 def CheckPrereq(self):
3416 """Check prerequisites.
3418 This checks that the instance is in the cluster.
3421 if not hasattr(self.op, "remote_node"):
3422 self.op.remote_node = None
3424 instance = self.cfg.GetInstanceInfo(
3425 self.cfg.ExpandInstanceName(self.op.instance_name))
3426 if instance is None:
3427 raise errors.OpPrereqError("Instance '%s' not known" %
3428 self.op.instance_name)
3429 self.instance = instance
3430 self.op.instance_name = instance.name
3432 if instance.disk_template not in constants.DTS_NET_MIRROR:
3433 raise errors.OpPrereqError("Instance's disk layout is not"
3434 " network mirrored.")
3436 if len(instance.secondary_nodes) != 1:
3437 raise errors.OpPrereqError("The instance has a strange layout,"
3438 " expected one secondary but found %d" %
3439 len(instance.secondary_nodes))
3441 self.sec_node = instance.secondary_nodes[0]
3443 ia_name = getattr(self.op, "iallocator", None)
3444 if ia_name is not None:
3445 if self.op.remote_node is not None:
3446 raise errors.OpPrereqError("Give either the iallocator or the new"
3447 " secondary, not both")
3448 self.op.remote_node = self._RunAllocator()
3450 remote_node = self.op.remote_node
3451 if remote_node is not None:
3452 remote_node = self.cfg.ExpandNodeName(remote_node)
3453 if remote_node is None:
3454 raise errors.OpPrereqError("Node '%s' not known" %
3455 self.op.remote_node)
3456 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3458 self.remote_node_info = None
3459 if remote_node == instance.primary_node:
3460 raise errors.OpPrereqError("The specified node is the primary node of"
3462 elif remote_node == self.sec_node:
3463 if self.op.mode == constants.REPLACE_DISK_SEC:
3464 # this is for DRBD8, where we can't execute the same mode of
3465 # replacement as for drbd7 (no different port allocated)
3466 raise errors.OpPrereqError("Same secondary given, cannot execute"
3468 if instance.disk_template == constants.DT_DRBD8:
3469 if (self.op.mode == constants.REPLACE_DISK_ALL and
3470 remote_node is not None):
3471 # switch to replace secondary mode
3472 self.op.mode = constants.REPLACE_DISK_SEC
3474 if self.op.mode == constants.REPLACE_DISK_ALL:
3475 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3476 " secondary disk replacement, not"
3478 elif self.op.mode == constants.REPLACE_DISK_PRI:
3479 if remote_node is not None:
3480 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3481 " the secondary while doing a primary"
3482 " node disk replacement")
3483 self.tgt_node = instance.primary_node
3484 self.oth_node = instance.secondary_nodes[0]
3485 elif self.op.mode == constants.REPLACE_DISK_SEC:
3486 self.new_node = remote_node # this can be None, in which case
3487 # we don't change the secondary
3488 self.tgt_node = instance.secondary_nodes[0]
3489 self.oth_node = instance.primary_node
3491 raise errors.ProgrammerError("Unhandled disk replace mode")
3493 for name in self.op.disks:
3494 if instance.FindDisk(name) is None:
3495 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3496 (name, instance.name))
3497 self.op.remote_node = remote_node
3499 def _ExecD8DiskOnly(self, feedback_fn):
3500 """Replace a disk on the primary or secondary for dbrd8.
3502 The algorithm for replace is quite complicated:
3503 - for each disk to be replaced:
3504 - create new LVs on the target node with unique names
3505 - detach old LVs from the drbd device
3506 - rename old LVs to name_replaced.<time_t>
3507 - rename new LVs to old LVs
3508 - attach the new LVs (with the old names now) to the drbd device
3509 - wait for sync across all devices
3510 - for each modified disk:
3511 - remove old LVs (which have the name name_replaces.<time_t>)
3513 Failures are not very well handled.
3517 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3518 instance = self.instance
3520 vgname = self.cfg.GetVGName()
3523 tgt_node = self.tgt_node
3524 oth_node = self.oth_node
3526 # Step: check device activation
3527 self.proc.LogStep(1, steps_total, "check device existence")
3528 info("checking volume groups")
3529 my_vg = cfg.GetVGName()
3530 results = rpc.call_vg_list([oth_node, tgt_node])
3532 raise errors.OpExecError("Can't list volume groups on the nodes")
3533 for node in oth_node, tgt_node:
3534 res = results.get(node, False)
3535 if not res or my_vg not in res:
3536 raise errors.OpExecError("Volume group '%s' not found on %s" %
3538 for dev in instance.disks:
3539 if not dev.iv_name in self.op.disks:
3541 for node in tgt_node, oth_node:
3542 info("checking %s on %s" % (dev.iv_name, node))
3543 cfg.SetDiskID(dev, node)
3544 if not rpc.call_blockdev_find(node, dev):
3545 raise errors.OpExecError("Can't find device %s on node %s" %
3546 (dev.iv_name, node))
3548 # Step: check other node consistency
3549 self.proc.LogStep(2, steps_total, "check peer consistency")
3550 for dev in instance.disks:
3551 if not dev.iv_name in self.op.disks:
3553 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3554 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3555 oth_node==instance.primary_node):
3556 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3557 " to replace disks on this node (%s)" %
3558 (oth_node, tgt_node))
3560 # Step: create new storage
3561 self.proc.LogStep(3, steps_total, "allocate new storage")
3562 for dev in instance.disks:
3563 if not dev.iv_name in self.op.disks:
3566 cfg.SetDiskID(dev, tgt_node)
3567 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3568 names = _GenerateUniqueNames(cfg, lv_names)
3569 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3570 logical_id=(vgname, names[0]))
3571 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3572 logical_id=(vgname, names[1]))
3573 new_lvs = [lv_data, lv_meta]
3574 old_lvs = dev.children
3575 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3576 info("creating new local storage on %s for %s" %
3577 (tgt_node, dev.iv_name))
3578 # since we *always* want to create this LV, we use the
3579 # _Create...OnPrimary (which forces the creation), even if we
3580 # are talking about the secondary node
3581 for new_lv in new_lvs:
3582 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3583 _GetInstanceInfoText(instance)):
3584 raise errors.OpExecError("Failed to create new LV named '%s' on"
3586 (new_lv.logical_id[1], tgt_node))
3588 # Step: for each lv, detach+rename*2+attach
3589 self.proc.LogStep(4, steps_total, "change drbd configuration")
3590 for dev, old_lvs, new_lvs in iv_names.itervalues():
3591 info("detaching %s drbd from local storage" % dev.iv_name)
3592 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3593 raise errors.OpExecError("Can't detach drbd from local storage on node"
3594 " %s for device %s" % (tgt_node, dev.iv_name))
3596 #cfg.Update(instance)
3598 # ok, we created the new LVs, so now we know we have the needed
3599 # storage; as such, we proceed on the target node to rename
3600 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3601 # using the assumption that logical_id == physical_id (which in
3602 # turn is the unique_id on that node)
3604 # FIXME(iustin): use a better name for the replaced LVs
3605 temp_suffix = int(time.time())
3606 ren_fn = lambda d, suff: (d.physical_id[0],
3607 d.physical_id[1] + "_replaced-%s" % suff)
3608 # build the rename list based on what LVs exist on the node
3610 for to_ren in old_lvs:
3611 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3612 if find_res is not None: # device exists
3613 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3615 info("renaming the old LVs on the target node")
3616 if not rpc.call_blockdev_rename(tgt_node, rlist):
3617 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3618 # now we rename the new LVs to the old LVs
3619 info("renaming the new LVs on the target node")
3620 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3621 if not rpc.call_blockdev_rename(tgt_node, rlist):
3622 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3624 for old, new in zip(old_lvs, new_lvs):
3625 new.logical_id = old.logical_id
3626 cfg.SetDiskID(new, tgt_node)
3628 for disk in old_lvs:
3629 disk.logical_id = ren_fn(disk, temp_suffix)
3630 cfg.SetDiskID(disk, tgt_node)
3632 # now that the new lvs have the old name, we can add them to the device
3633 info("adding new mirror component on %s" % tgt_node)
3634 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3635 for new_lv in new_lvs:
3636 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3637 warning("Can't rollback device %s", hint="manually cleanup unused"
3639 raise errors.OpExecError("Can't add local storage to drbd")
3641 dev.children = new_lvs
3642 cfg.Update(instance)
3644 # Step: wait for sync
3646 # this can fail as the old devices are degraded and _WaitForSync
3647 # does a combined result over all disks, so we don't check its
3649 self.proc.LogStep(5, steps_total, "sync devices")
3650 _WaitForSync(cfg, instance, self.proc, unlock=True)
3652 # so check manually all the devices
3653 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3654 cfg.SetDiskID(dev, instance.primary_node)
3655 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3657 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3659 # Step: remove old storage
3660 self.proc.LogStep(6, steps_total, "removing old storage")
3661 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3662 info("remove logical volumes for %s" % name)
3664 cfg.SetDiskID(lv, tgt_node)
3665 if not rpc.call_blockdev_remove(tgt_node, lv):
3666 warning("Can't remove old LV", hint="manually remove unused LVs")
3669 def _ExecD8Secondary(self, feedback_fn):
3670 """Replace the secondary node for drbd8.
3672 The algorithm for replace is quite complicated:
3673 - for all disks of the instance:
3674 - create new LVs on the new node with same names
3675 - shutdown the drbd device on the old secondary
3676 - disconnect the drbd network on the primary
3677 - create the drbd device on the new secondary
3678 - network attach the drbd on the primary, using an artifice:
3679 the drbd code for Attach() will connect to the network if it
3680 finds a device which is connected to the good local disks but
3682 - wait for sync across all devices
3683 - remove all disks from the old secondary
3685 Failures are not very well handled.
3689 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3690 instance = self.instance
3692 vgname = self.cfg.GetVGName()
3695 old_node = self.tgt_node
3696 new_node = self.new_node
3697 pri_node = instance.primary_node
3699 # Step: check device activation
3700 self.proc.LogStep(1, steps_total, "check device existence")
3701 info("checking volume groups")
3702 my_vg = cfg.GetVGName()
3703 results = rpc.call_vg_list([pri_node, new_node])
3705 raise errors.OpExecError("Can't list volume groups on the nodes")
3706 for node in pri_node, new_node:
3707 res = results.get(node, False)
3708 if not res or my_vg not in res:
3709 raise errors.OpExecError("Volume group '%s' not found on %s" %
3711 for dev in instance.disks:
3712 if not dev.iv_name in self.op.disks:
3714 info("checking %s on %s" % (dev.iv_name, pri_node))
3715 cfg.SetDiskID(dev, pri_node)
3716 if not rpc.call_blockdev_find(pri_node, dev):
3717 raise errors.OpExecError("Can't find device %s on node %s" %
3718 (dev.iv_name, pri_node))
3720 # Step: check other node consistency
3721 self.proc.LogStep(2, steps_total, "check peer consistency")
3722 for dev in instance.disks:
3723 if not dev.iv_name in self.op.disks:
3725 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3726 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3727 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3728 " unsafe to replace the secondary" %
3731 # Step: create new storage
3732 self.proc.LogStep(3, steps_total, "allocate new storage")
3733 for dev in instance.disks:
3735 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3736 # since we *always* want to create this LV, we use the
3737 # _Create...OnPrimary (which forces the creation), even if we
3738 # are talking about the secondary node
3739 for new_lv in dev.children:
3740 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3741 _GetInstanceInfoText(instance)):
3742 raise errors.OpExecError("Failed to create new LV named '%s' on"
3744 (new_lv.logical_id[1], new_node))
3746 iv_names[dev.iv_name] = (dev, dev.children)
3748 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3749 for dev in instance.disks:
3751 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3752 # create new devices on new_node
3753 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3754 logical_id=(pri_node, new_node,
3756 children=dev.children)
3757 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3759 _GetInstanceInfoText(instance)):
3760 raise errors.OpExecError("Failed to create new DRBD on"
3761 " node '%s'" % new_node)
3763 for dev in instance.disks:
3764 # we have new devices, shutdown the drbd on the old secondary
3765 info("shutting down drbd for %s on old node" % dev.iv_name)
3766 cfg.SetDiskID(dev, old_node)
3767 if not rpc.call_blockdev_shutdown(old_node, dev):
3768 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3769 hint="Please cleanup this device manually as soon as possible")
3771 info("detaching primary drbds from the network (=> standalone)")
3773 for dev in instance.disks:
3774 cfg.SetDiskID(dev, pri_node)
3775 # set the physical (unique in bdev terms) id to None, meaning
3776 # detach from network
3777 dev.physical_id = (None,) * len(dev.physical_id)
3778 # and 'find' the device, which will 'fix' it to match the
3780 if rpc.call_blockdev_find(pri_node, dev):
3783 warning("Failed to detach drbd %s from network, unusual case" %
3787 # no detaches succeeded (very unlikely)
3788 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3790 # if we managed to detach at least one, we update all the disks of
3791 # the instance to point to the new secondary
3792 info("updating instance configuration")
3793 for dev in instance.disks:
3794 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3795 cfg.SetDiskID(dev, pri_node)
3796 cfg.Update(instance)
3798 # and now perform the drbd attach
3799 info("attaching primary drbds to new secondary (standalone => connected)")
3801 for dev in instance.disks:
3802 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3803 # since the attach is smart, it's enough to 'find' the device,
3804 # it will automatically activate the network, if the physical_id
3806 cfg.SetDiskID(dev, pri_node)
3807 if not rpc.call_blockdev_find(pri_node, dev):
3808 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3809 "please do a gnt-instance info to see the status of disks")
3811 # this can fail as the old devices are degraded and _WaitForSync
3812 # does a combined result over all disks, so we don't check its
3814 self.proc.LogStep(5, steps_total, "sync devices")
3815 _WaitForSync(cfg, instance, self.proc, unlock=True)
3817 # so check manually all the devices
3818 for name, (dev, old_lvs) in iv_names.iteritems():
3819 cfg.SetDiskID(dev, pri_node)
3820 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3822 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3824 self.proc.LogStep(6, steps_total, "removing old storage")
3825 for name, (dev, old_lvs) in iv_names.iteritems():
3826 info("remove logical volumes for %s" % name)
3828 cfg.SetDiskID(lv, old_node)
3829 if not rpc.call_blockdev_remove(old_node, lv):
3830 warning("Can't remove LV on old secondary",
3831 hint="Cleanup stale volumes by hand")
3833 def Exec(self, feedback_fn):
3834 """Execute disk replacement.
3836 This dispatches the disk replacement to the appropriate handler.
3839 instance = self.instance
3841 # Activate the instance disks if we're replacing them on a down instance
3842 if instance.status == "down":
3843 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3844 self.proc.ChainOpCode(op)
3846 if instance.disk_template == constants.DT_DRBD8:
3847 if self.op.remote_node is None:
3848 fn = self._ExecD8DiskOnly
3850 fn = self._ExecD8Secondary
3852 raise errors.ProgrammerError("Unhandled disk replacement case")
3854 ret = fn(feedback_fn)
3856 # Deactivate the instance disks if we're replacing them on a down instance
3857 if instance.status == "down":
3858 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3859 self.proc.ChainOpCode(op)
3864 class LUGrowDisk(LogicalUnit):
3865 """Grow a disk of an instance.
3869 HTYPE = constants.HTYPE_INSTANCE
3870 _OP_REQP = ["instance_name", "disk", "amount"]
3872 def BuildHooksEnv(self):
3875 This runs on the master, the primary and all the secondaries.
3879 "DISK": self.op.disk,
3880 "AMOUNT": self.op.amount,
3882 env.update(_BuildInstanceHookEnvByObject(self.instance))
3884 self.sstore.GetMasterNode(),
3885 self.instance.primary_node,
3889 def CheckPrereq(self):
3890 """Check prerequisites.
3892 This checks that the instance is in the cluster.
3895 instance = self.cfg.GetInstanceInfo(
3896 self.cfg.ExpandInstanceName(self.op.instance_name))
3897 if instance is None:
3898 raise errors.OpPrereqError("Instance '%s' not known" %
3899 self.op.instance_name)
3900 self.instance = instance
3901 self.op.instance_name = instance.name
3903 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3904 raise errors.OpPrereqError("Instance's disk layout does not support"
3907 if instance.FindDisk(self.op.disk) is None:
3908 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3909 (self.op.disk, instance.name))
3911 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3912 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3913 for node in nodenames:
3914 info = nodeinfo.get(node, None)
3916 raise errors.OpPrereqError("Cannot get current information"
3917 " from node '%s'" % node)
3918 vg_free = info.get('vg_free', None)
3919 if not isinstance(vg_free, int):
3920 raise errors.OpPrereqError("Can't compute free disk space on"
3922 if self.op.amount > info['vg_free']:
3923 raise errors.OpPrereqError("Not enough disk space on target node %s:"
3924 " %d MiB available, %d MiB required" %
3925 (node, info['vg_free'], self.op.amount))
3927 def Exec(self, feedback_fn):
3928 """Execute disk grow.
3931 instance = self.instance
3932 disk = instance.FindDisk(self.op.disk)
3933 for node in (instance.secondary_nodes + (instance.primary_node,)):
3934 self.cfg.SetDiskID(disk, node)
3935 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3936 if not result or not isinstance(result, tuple) or len(result) != 2:
3937 raise errors.OpExecError("grow request failed to node %s" % node)
3939 raise errors.OpExecError("grow request failed to node %s: %s" %
3941 disk.RecordGrow(self.op.amount)
3942 self.cfg.Update(instance)
3946 class LUQueryInstanceData(NoHooksLU):
3947 """Query runtime instance data.
3950 _OP_REQP = ["instances"]
3952 def CheckPrereq(self):
3953 """Check prerequisites.
3955 This only checks the optional instance list against the existing names.
3958 if not isinstance(self.op.instances, list):
3959 raise errors.OpPrereqError("Invalid argument type 'instances'")
3960 if self.op.instances:
3961 self.wanted_instances = []
3962 names = self.op.instances
3964 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3965 if instance is None:
3966 raise errors.OpPrereqError("No such instance name '%s'" % name)
3967 self.wanted_instances.append(instance)
3969 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3970 in self.cfg.GetInstanceList()]
3974 def _ComputeDiskStatus(self, instance, snode, dev):
3975 """Compute block device status.
3978 self.cfg.SetDiskID(dev, instance.primary_node)
3979 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3980 if dev.dev_type in constants.LDS_DRBD:
3981 # we change the snode then (otherwise we use the one passed in)
3982 if dev.logical_id[0] == instance.primary_node:
3983 snode = dev.logical_id[1]
3985 snode = dev.logical_id[0]
3988 self.cfg.SetDiskID(dev, snode)
3989 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3994 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3995 for child in dev.children]
4000 "iv_name": dev.iv_name,
4001 "dev_type": dev.dev_type,
4002 "logical_id": dev.logical_id,
4003 "physical_id": dev.physical_id,
4004 "pstatus": dev_pstatus,
4005 "sstatus": dev_sstatus,
4006 "children": dev_children,
4011 def Exec(self, feedback_fn):
4012 """Gather and return data"""
4014 for instance in self.wanted_instances:
4015 remote_info = rpc.call_instance_info(instance.primary_node,
4017 if remote_info and "state" in remote_info:
4020 remote_state = "down"
4021 if instance.status == "down":
4022 config_state = "down"
4026 disks = [self._ComputeDiskStatus(instance, None, device)
4027 for device in instance.disks]
4030 "name": instance.name,
4031 "config_state": config_state,
4032 "run_state": remote_state,
4033 "pnode": instance.primary_node,
4034 "snodes": instance.secondary_nodes,
4036 "memory": instance.memory,
4037 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4039 "vcpus": instance.vcpus,
4042 htkind = self.sstore.GetHypervisorType()
4043 if htkind == constants.HT_XEN_PVM30:
4044 idict["kernel_path"] = instance.kernel_path
4045 idict["initrd_path"] = instance.initrd_path
4047 if htkind == constants.HT_XEN_HVM31:
4048 idict["hvm_boot_order"] = instance.hvm_boot_order
4049 idict["hvm_acpi"] = instance.hvm_acpi
4050 idict["hvm_pae"] = instance.hvm_pae
4051 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4053 if htkind in constants.HTS_REQ_PORT:
4054 idict["vnc_bind_address"] = instance.vnc_bind_address
4055 idict["network_port"] = instance.network_port
4057 result[instance.name] = idict
4062 class LUSetInstanceParams(LogicalUnit):
4063 """Modifies an instances's parameters.
4066 HPATH = "instance-modify"
4067 HTYPE = constants.HTYPE_INSTANCE
4068 _OP_REQP = ["instance_name"]
4070 def BuildHooksEnv(self):
4073 This runs on the master, primary and secondaries.
4078 args['memory'] = self.mem
4080 args['vcpus'] = self.vcpus
4081 if self.do_ip or self.do_bridge or self.mac:
4085 ip = self.instance.nics[0].ip
4087 bridge = self.bridge
4089 bridge = self.instance.nics[0].bridge
4093 mac = self.instance.nics[0].mac
4094 args['nics'] = [(ip, bridge, mac)]
4095 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4096 nl = [self.sstore.GetMasterNode(),
4097 self.instance.primary_node] + list(self.instance.secondary_nodes)
4100 def CheckPrereq(self):
4101 """Check prerequisites.
4103 This only checks the instance list against the existing names.
4106 self.mem = getattr(self.op, "mem", None)
4107 self.vcpus = getattr(self.op, "vcpus", None)
4108 self.ip = getattr(self.op, "ip", None)
4109 self.mac = getattr(self.op, "mac", None)
4110 self.bridge = getattr(self.op, "bridge", None)
4111 self.kernel_path = getattr(self.op, "kernel_path", None)
4112 self.initrd_path = getattr(self.op, "initrd_path", None)
4113 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4114 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4115 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4116 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4117 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4118 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4119 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4120 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4121 self.vnc_bind_address]
4122 if all_parms.count(None) == len(all_parms):
4123 raise errors.OpPrereqError("No changes submitted")
4124 if self.mem is not None:
4126 self.mem = int(self.mem)
4127 except ValueError, err:
4128 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4129 if self.vcpus is not None:
4131 self.vcpus = int(self.vcpus)
4132 except ValueError, err:
4133 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4134 if self.ip is not None:
4136 if self.ip.lower() == "none":
4139 if not utils.IsValidIP(self.ip):
4140 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4143 self.do_bridge = (self.bridge is not None)
4144 if self.mac is not None:
4145 if self.cfg.IsMacInUse(self.mac):
4146 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4148 if not utils.IsValidMac(self.mac):
4149 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4151 if self.kernel_path is not None:
4152 self.do_kernel_path = True
4153 if self.kernel_path == constants.VALUE_NONE:
4154 raise errors.OpPrereqError("Can't set instance to no kernel")
4156 if self.kernel_path != constants.VALUE_DEFAULT:
4157 if not os.path.isabs(self.kernel_path):
4158 raise errors.OpPrereqError("The kernel path must be an absolute"
4161 self.do_kernel_path = False
4163 if self.initrd_path is not None:
4164 self.do_initrd_path = True
4165 if self.initrd_path not in (constants.VALUE_NONE,
4166 constants.VALUE_DEFAULT):
4167 if not os.path.isabs(self.initrd_path):
4168 raise errors.OpPrereqError("The initrd path must be an absolute"
4171 self.do_initrd_path = False
4173 # boot order verification
4174 if self.hvm_boot_order is not None:
4175 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4176 if len(self.hvm_boot_order.strip("acdn")) != 0:
4177 raise errors.OpPrereqError("invalid boot order specified,"
4178 " must be one or more of [acdn]"
4181 # hvm_cdrom_image_path verification
4182 if self.op.hvm_cdrom_image_path is not None:
4183 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4184 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4185 " be an absolute path or None, not %s" %
4186 self.op.hvm_cdrom_image_path)
4187 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4188 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4189 " regular file or a symlink pointing to"
4190 " an existing regular file, not %s" %
4191 self.op.hvm_cdrom_image_path)
4193 # vnc_bind_address verification
4194 if self.op.vnc_bind_address is not None:
4195 if not utils.IsValidIP(self.op.vnc_bind_address):
4196 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4197 " like a valid IP address" %
4198 self.op.vnc_bind_address)
4200 instance = self.cfg.GetInstanceInfo(
4201 self.cfg.ExpandInstanceName(self.op.instance_name))
4202 if instance is None:
4203 raise errors.OpPrereqError("No such instance name '%s'" %
4204 self.op.instance_name)
4205 self.op.instance_name = instance.name
4206 self.instance = instance
4209 def Exec(self, feedback_fn):
4210 """Modifies an instance.
4212 All parameters take effect only at the next restart of the instance.
4215 instance = self.instance
4217 instance.memory = self.mem
4218 result.append(("mem", self.mem))
4220 instance.vcpus = self.vcpus
4221 result.append(("vcpus", self.vcpus))
4223 instance.nics[0].ip = self.ip
4224 result.append(("ip", self.ip))
4226 instance.nics[0].bridge = self.bridge
4227 result.append(("bridge", self.bridge))
4229 instance.nics[0].mac = self.mac
4230 result.append(("mac", self.mac))
4231 if self.do_kernel_path:
4232 instance.kernel_path = self.kernel_path
4233 result.append(("kernel_path", self.kernel_path))
4234 if self.do_initrd_path:
4235 instance.initrd_path = self.initrd_path
4236 result.append(("initrd_path", self.initrd_path))
4237 if self.hvm_boot_order:
4238 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4239 instance.hvm_boot_order = None
4241 instance.hvm_boot_order = self.hvm_boot_order
4242 result.append(("hvm_boot_order", self.hvm_boot_order))
4244 instance.hvm_acpi = self.hvm_acpi
4245 result.append(("hvm_acpi", self.hvm_acpi))
4247 instance.hvm_pae = self.hvm_pae
4248 result.append(("hvm_pae", self.hvm_pae))
4249 if self.hvm_cdrom_image_path:
4250 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4251 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4252 if self.vnc_bind_address:
4253 instance.vnc_bind_address = self.vnc_bind_address
4254 result.append(("vnc_bind_address", self.vnc_bind_address))
4256 self.cfg.AddInstance(instance)
4261 class LUQueryExports(NoHooksLU):
4262 """Query the exports list
4267 def CheckPrereq(self):
4268 """Check that the nodelist contains only existing nodes.
4271 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4273 def Exec(self, feedback_fn):
4274 """Compute the list of all the exported system images.
4277 a dictionary with the structure node->(export-list)
4278 where export-list is a list of the instances exported on
4282 return rpc.call_export_list(self.nodes)
4285 class LUExportInstance(LogicalUnit):
4286 """Export an instance to an image in the cluster.
4289 HPATH = "instance-export"
4290 HTYPE = constants.HTYPE_INSTANCE
4291 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4293 def BuildHooksEnv(self):
4296 This will run on the master, primary node and target node.
4300 "EXPORT_NODE": self.op.target_node,
4301 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4303 env.update(_BuildInstanceHookEnvByObject(self.instance))
4304 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4305 self.op.target_node]
4308 def CheckPrereq(self):
4309 """Check prerequisites.
4311 This checks that the instance and node names are valid.
4314 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4315 self.instance = self.cfg.GetInstanceInfo(instance_name)
4316 if self.instance is None:
4317 raise errors.OpPrereqError("Instance '%s' not found" %
4318 self.op.instance_name)
4321 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4322 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4324 if self.dst_node is None:
4325 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4326 self.op.target_node)
4327 self.op.target_node = self.dst_node.name
4329 # instance disk type verification
4330 for disk in self.instance.disks:
4331 if disk.dev_type == constants.LD_FILE:
4332 raise errors.OpPrereqError("Export not supported for instances with"
4333 " file-based disks")
4335 def Exec(self, feedback_fn):
4336 """Export an instance to an image in the cluster.
4339 instance = self.instance
4340 dst_node = self.dst_node
4341 src_node = instance.primary_node
4342 if self.op.shutdown:
4343 # shutdown the instance, but not the disks
4344 if not rpc.call_instance_shutdown(src_node, instance):
4345 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4346 (instance.name, src_node))
4348 vgname = self.cfg.GetVGName()
4353 for disk in instance.disks:
4354 if disk.iv_name == "sda":
4355 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4356 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4358 if not new_dev_name:
4359 logger.Error("could not snapshot block device %s on node %s" %
4360 (disk.logical_id[1], src_node))
4362 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4363 logical_id=(vgname, new_dev_name),
4364 physical_id=(vgname, new_dev_name),
4365 iv_name=disk.iv_name)
4366 snap_disks.append(new_dev)
4369 if self.op.shutdown and instance.status == "up":
4370 if not rpc.call_instance_start(src_node, instance, None):
4371 _ShutdownInstanceDisks(instance, self.cfg)
4372 raise errors.OpExecError("Could not start instance")
4374 # TODO: check for size
4376 for dev in snap_disks:
4377 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4378 logger.Error("could not export block device %s from node %s to node %s"
4379 % (dev.logical_id[1], src_node, dst_node.name))
4380 if not rpc.call_blockdev_remove(src_node, dev):
4381 logger.Error("could not remove snapshot block device %s from node %s" %
4382 (dev.logical_id[1], src_node))
4384 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4385 logger.Error("could not finalize export for instance %s on node %s" %
4386 (instance.name, dst_node.name))
4388 nodelist = self.cfg.GetNodeList()
4389 nodelist.remove(dst_node.name)
4391 # on one-node clusters nodelist will be empty after the removal
4392 # if we proceed the backup would be removed because OpQueryExports
4393 # substitutes an empty list with the full cluster node list.
4395 op = opcodes.OpQueryExports(nodes=nodelist)
4396 exportlist = self.proc.ChainOpCode(op)
4397 for node in exportlist:
4398 if instance.name in exportlist[node]:
4399 if not rpc.call_export_remove(node, instance.name):
4400 logger.Error("could not remove older export for instance %s"
4401 " on node %s" % (instance.name, node))
4404 class LURemoveExport(NoHooksLU):
4405 """Remove exports related to the named instance.
4408 _OP_REQP = ["instance_name"]
4410 def CheckPrereq(self):
4411 """Check prerequisites.
4415 def Exec(self, feedback_fn):
4416 """Remove any export.
4419 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4420 # If the instance was not found we'll try with the name that was passed in.
4421 # This will only work if it was an FQDN, though.
4423 if not instance_name:
4425 instance_name = self.op.instance_name
4427 op = opcodes.OpQueryExports(nodes=[])
4428 exportlist = self.proc.ChainOpCode(op)
4430 for node in exportlist:
4431 if instance_name in exportlist[node]:
4433 if not rpc.call_export_remove(node, instance_name):
4434 logger.Error("could not remove export for instance %s"
4435 " on node %s" % (instance_name, node))
4437 if fqdn_warn and not found:
4438 feedback_fn("Export not found. If trying to remove an export belonging"
4439 " to a deleted instance please use its Fully Qualified"
4443 class TagsLU(NoHooksLU):
4446 This is an abstract class which is the parent of all the other tags LUs.
4449 def CheckPrereq(self):
4450 """Check prerequisites.
4453 if self.op.kind == constants.TAG_CLUSTER:
4454 self.target = self.cfg.GetClusterInfo()
4455 elif self.op.kind == constants.TAG_NODE:
4456 name = self.cfg.ExpandNodeName(self.op.name)
4458 raise errors.OpPrereqError("Invalid node name (%s)" %
4461 self.target = self.cfg.GetNodeInfo(name)
4462 elif self.op.kind == constants.TAG_INSTANCE:
4463 name = self.cfg.ExpandInstanceName(self.op.name)
4465 raise errors.OpPrereqError("Invalid instance name (%s)" %
4468 self.target = self.cfg.GetInstanceInfo(name)
4470 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4474 class LUGetTags(TagsLU):
4475 """Returns the tags of a given object.
4478 _OP_REQP = ["kind", "name"]
4480 def Exec(self, feedback_fn):
4481 """Returns the tag list.
4484 return self.target.GetTags()
4487 class LUSearchTags(NoHooksLU):
4488 """Searches the tags for a given pattern.
4491 _OP_REQP = ["pattern"]
4493 def CheckPrereq(self):
4494 """Check prerequisites.
4496 This checks the pattern passed for validity by compiling it.
4500 self.re = re.compile(self.op.pattern)
4501 except re.error, err:
4502 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4503 (self.op.pattern, err))
4505 def Exec(self, feedback_fn):
4506 """Returns the tag list.
4510 tgts = [("/cluster", cfg.GetClusterInfo())]
4511 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4512 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4513 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4514 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4516 for path, target in tgts:
4517 for tag in target.GetTags():
4518 if self.re.search(tag):
4519 results.append((path, tag))
4523 class LUAddTags(TagsLU):
4524 """Sets a tag on a given object.
4527 _OP_REQP = ["kind", "name", "tags"]
4529 def CheckPrereq(self):
4530 """Check prerequisites.
4532 This checks the type and length of the tag name and value.
4535 TagsLU.CheckPrereq(self)
4536 for tag in self.op.tags:
4537 objects.TaggableObject.ValidateTag(tag)
4539 def Exec(self, feedback_fn):
4544 for tag in self.op.tags:
4545 self.target.AddTag(tag)
4546 except errors.TagError, err:
4547 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4549 self.cfg.Update(self.target)
4550 except errors.ConfigurationError:
4551 raise errors.OpRetryError("There has been a modification to the"
4552 " config file and the operation has been"
4553 " aborted. Please retry.")
4556 class LUDelTags(TagsLU):
4557 """Delete a list of tags from a given object.
4560 _OP_REQP = ["kind", "name", "tags"]
4562 def CheckPrereq(self):
4563 """Check prerequisites.
4565 This checks that we have the given tag.
4568 TagsLU.CheckPrereq(self)
4569 for tag in self.op.tags:
4570 objects.TaggableObject.ValidateTag(tag)
4571 del_tags = frozenset(self.op.tags)
4572 cur_tags = self.target.GetTags()
4573 if not del_tags <= cur_tags:
4574 diff_tags = del_tags - cur_tags
4575 diff_names = ["'%s'" % tag for tag in diff_tags]
4577 raise errors.OpPrereqError("Tag(s) %s not found" %
4578 (",".join(diff_names)))
4580 def Exec(self, feedback_fn):
4581 """Remove the tag from the object.
4584 for tag in self.op.tags:
4585 self.target.RemoveTag(tag)
4587 self.cfg.Update(self.target)
4588 except errors.ConfigurationError:
4589 raise errors.OpRetryError("There has been a modification to the"
4590 " config file and the operation has been"
4591 " aborted. Please retry.")
4593 class LUTestDelay(NoHooksLU):
4594 """Sleep for a specified amount of time.
4596 This LU sleeps on the master and/or nodes for a specified amount of
4600 _OP_REQP = ["duration", "on_master", "on_nodes"]
4603 def ExpandNames(self):
4604 """Expand names and set required locks.
4606 This expands the node list, if any.
4609 self.needed_locks = {}
4610 if self.op.on_nodes:
4611 # _GetWantedNodes can be used here, but is not always appropriate to use
4612 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4614 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4615 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4617 def CheckPrereq(self):
4618 """Check prerequisites.
4622 def Exec(self, feedback_fn):
4623 """Do the actual sleep.
4626 if self.op.on_master:
4627 if not utils.TestDelay(self.op.duration):
4628 raise errors.OpExecError("Error during master delay test")
4629 if self.op.on_nodes:
4630 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4632 raise errors.OpExecError("Complete failure from rpc call")
4633 for node, node_result in result.items():
4635 raise errors.OpExecError("Failure during rpc call to node %s,"
4636 " result: %s" % (node, node_result))
4639 class IAllocator(object):
4640 """IAllocator framework.
4642 An IAllocator instance has three sets of attributes:
4643 - cfg/sstore that are needed to query the cluster
4644 - input data (all members of the _KEYS class attribute are required)
4645 - four buffer attributes (in|out_data|text), that represent the
4646 input (to the external script) in text and data structure format,
4647 and the output from it, again in two formats
4648 - the result variables from the script (success, info, nodes) for
4653 "mem_size", "disks", "disk_template",
4654 "os", "tags", "nics", "vcpus",
4660 def __init__(self, cfg, sstore, mode, name, **kwargs):
4662 self.sstore = sstore
4663 # init buffer variables
4664 self.in_text = self.out_text = self.in_data = self.out_data = None
4665 # init all input fields so that pylint is happy
4668 self.mem_size = self.disks = self.disk_template = None
4669 self.os = self.tags = self.nics = self.vcpus = None
4670 self.relocate_from = None
4672 self.required_nodes = None
4673 # init result fields
4674 self.success = self.info = self.nodes = None
4675 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4676 keyset = self._ALLO_KEYS
4677 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4678 keyset = self._RELO_KEYS
4680 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4681 " IAllocator" % self.mode)
4683 if key not in keyset:
4684 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4685 " IAllocator" % key)
4686 setattr(self, key, kwargs[key])
4688 if key not in kwargs:
4689 raise errors.ProgrammerError("Missing input parameter '%s' to"
4690 " IAllocator" % key)
4691 self._BuildInputData()
4693 def _ComputeClusterData(self):
4694 """Compute the generic allocator input data.
4696 This is the data that is independent of the actual operation.
4703 "cluster_name": self.sstore.GetClusterName(),
4704 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4705 "hypervisor_type": self.sstore.GetHypervisorType(),
4706 # we don't have job IDs
4709 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4713 node_list = cfg.GetNodeList()
4714 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4715 for nname in node_list:
4716 ninfo = cfg.GetNodeInfo(nname)
4717 if nname not in node_data or not isinstance(node_data[nname], dict):
4718 raise errors.OpExecError("Can't get data for node %s" % nname)
4719 remote_info = node_data[nname]
4720 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4721 'vg_size', 'vg_free', 'cpu_total']:
4722 if attr not in remote_info:
4723 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4726 remote_info[attr] = int(remote_info[attr])
4727 except ValueError, err:
4728 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4729 " %s" % (nname, attr, str(err)))
4730 # compute memory used by primary instances
4731 i_p_mem = i_p_up_mem = 0
4732 for iinfo in i_list:
4733 if iinfo.primary_node == nname:
4734 i_p_mem += iinfo.memory
4735 if iinfo.status == "up":
4736 i_p_up_mem += iinfo.memory
4738 # compute memory used by instances
4740 "tags": list(ninfo.GetTags()),
4741 "total_memory": remote_info['memory_total'],
4742 "reserved_memory": remote_info['memory_dom0'],
4743 "free_memory": remote_info['memory_free'],
4744 "i_pri_memory": i_p_mem,
4745 "i_pri_up_memory": i_p_up_mem,
4746 "total_disk": remote_info['vg_size'],
4747 "free_disk": remote_info['vg_free'],
4748 "primary_ip": ninfo.primary_ip,
4749 "secondary_ip": ninfo.secondary_ip,
4750 "total_cpus": remote_info['cpu_total'],
4752 node_results[nname] = pnr
4753 data["nodes"] = node_results
4757 for iinfo in i_list:
4758 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4759 for n in iinfo.nics]
4761 "tags": list(iinfo.GetTags()),
4762 "should_run": iinfo.status == "up",
4763 "vcpus": iinfo.vcpus,
4764 "memory": iinfo.memory,
4766 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4768 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4769 "disk_template": iinfo.disk_template,
4771 instance_data[iinfo.name] = pir
4773 data["instances"] = instance_data
4777 def _AddNewInstance(self):
4778 """Add new instance data to allocator structure.
4780 This in combination with _AllocatorGetClusterData will create the
4781 correct structure needed as input for the allocator.
4783 The checks for the completeness of the opcode must have already been
4788 if len(self.disks) != 2:
4789 raise errors.OpExecError("Only two-disk configurations supported")
4791 disk_space = _ComputeDiskSize(self.disk_template,
4792 self.disks[0]["size"], self.disks[1]["size"])
4794 if self.disk_template in constants.DTS_NET_MIRROR:
4795 self.required_nodes = 2
4797 self.required_nodes = 1
4801 "disk_template": self.disk_template,
4804 "vcpus": self.vcpus,
4805 "memory": self.mem_size,
4806 "disks": self.disks,
4807 "disk_space_total": disk_space,
4809 "required_nodes": self.required_nodes,
4811 data["request"] = request
4813 def _AddRelocateInstance(self):
4814 """Add relocate instance data to allocator structure.
4816 This in combination with _IAllocatorGetClusterData will create the
4817 correct structure needed as input for the allocator.
4819 The checks for the completeness of the opcode must have already been
4823 instance = self.cfg.GetInstanceInfo(self.name)
4824 if instance is None:
4825 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4826 " IAllocator" % self.name)
4828 if instance.disk_template not in constants.DTS_NET_MIRROR:
4829 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4831 if len(instance.secondary_nodes) != 1:
4832 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4834 self.required_nodes = 1
4836 disk_space = _ComputeDiskSize(instance.disk_template,
4837 instance.disks[0].size,
4838 instance.disks[1].size)
4843 "disk_space_total": disk_space,
4844 "required_nodes": self.required_nodes,
4845 "relocate_from": self.relocate_from,
4847 self.in_data["request"] = request
4849 def _BuildInputData(self):
4850 """Build input data structures.
4853 self._ComputeClusterData()
4855 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4856 self._AddNewInstance()
4858 self._AddRelocateInstance()
4860 self.in_text = serializer.Dump(self.in_data)
4862 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4863 """Run an instance allocator and return the results.
4868 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4870 if not isinstance(result, tuple) or len(result) != 4:
4871 raise errors.OpExecError("Invalid result from master iallocator runner")
4873 rcode, stdout, stderr, fail = result
4875 if rcode == constants.IARUN_NOTFOUND:
4876 raise errors.OpExecError("Can't find allocator '%s'" % name)
4877 elif rcode == constants.IARUN_FAILURE:
4878 raise errors.OpExecError("Instance allocator call failed: %s,"
4880 (fail, stdout+stderr))
4881 self.out_text = stdout
4883 self._ValidateResult()
4885 def _ValidateResult(self):
4886 """Process the allocator results.
4888 This will process and if successful save the result in
4889 self.out_data and the other parameters.
4893 rdict = serializer.Load(self.out_text)
4894 except Exception, err:
4895 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4897 if not isinstance(rdict, dict):
4898 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4900 for key in "success", "info", "nodes":
4901 if key not in rdict:
4902 raise errors.OpExecError("Can't parse iallocator results:"
4903 " missing key '%s'" % key)
4904 setattr(self, key, rdict[key])
4906 if not isinstance(rdict["nodes"], list):
4907 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4909 self.out_data = rdict
4912 class LUTestAllocator(NoHooksLU):
4913 """Run allocator tests.
4915 This LU runs the allocator tests
4918 _OP_REQP = ["direction", "mode", "name"]
4920 def CheckPrereq(self):
4921 """Check prerequisites.
4923 This checks the opcode parameters depending on the director and mode test.
4926 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4927 for attr in ["name", "mem_size", "disks", "disk_template",
4928 "os", "tags", "nics", "vcpus"]:
4929 if not hasattr(self.op, attr):
4930 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4932 iname = self.cfg.ExpandInstanceName(self.op.name)
4933 if iname is not None:
4934 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4936 if not isinstance(self.op.nics, list):
4937 raise errors.OpPrereqError("Invalid parameter 'nics'")
4938 for row in self.op.nics:
4939 if (not isinstance(row, dict) or
4942 "bridge" not in row):
4943 raise errors.OpPrereqError("Invalid contents of the"
4944 " 'nics' parameter")
4945 if not isinstance(self.op.disks, list):
4946 raise errors.OpPrereqError("Invalid parameter 'disks'")
4947 if len(self.op.disks) != 2:
4948 raise errors.OpPrereqError("Only two-disk configurations supported")
4949 for row in self.op.disks:
4950 if (not isinstance(row, dict) or
4951 "size" not in row or
4952 not isinstance(row["size"], int) or
4953 "mode" not in row or
4954 row["mode"] not in ['r', 'w']):
4955 raise errors.OpPrereqError("Invalid contents of the"
4956 " 'disks' parameter")
4957 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4958 if not hasattr(self.op, "name"):
4959 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4960 fname = self.cfg.ExpandInstanceName(self.op.name)
4962 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4964 self.op.name = fname
4965 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4967 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4970 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4971 if not hasattr(self.op, "allocator") or self.op.allocator is None:
4972 raise errors.OpPrereqError("Missing allocator name")
4973 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4974 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4977 def Exec(self, feedback_fn):
4978 """Run the allocator test.
4981 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4982 ial = IAllocator(self.cfg, self.sstore,
4985 mem_size=self.op.mem_size,
4986 disks=self.op.disks,
4987 disk_template=self.op.disk_template,
4991 vcpus=self.op.vcpus,
4994 ial = IAllocator(self.cfg, self.sstore,
4997 relocate_from=list(self.relocate_from),
5000 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5001 result = ial.in_text
5003 ial.Run(self.op.allocator, validate=False)
5004 result = ial.out_text