4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_WSSTORE: the LU needs a writable SimpleStore
60 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62 Note that all commands require root permissions.
72 def __init__(self, processor, op, context, sstore):
73 """Constructor for LogicalUnit.
75 This needs to be overriden in derived classes in order to check op
81 self.cfg = context.cfg
83 self.context = context
86 for attr_name in self._OP_REQP:
87 attr_val = getattr(op, attr_name, None)
89 raise errors.OpPrereqError("Required parameter '%s' missing" %
92 if not cfg.IsCluster():
93 raise errors.OpPrereqError("Cluster not initialized yet,"
94 " use 'gnt-cluster init' first.")
96 master = sstore.GetMasterNode()
97 if master != utils.HostInfo().name:
98 raise errors.OpPrereqError("Commands must be run on the master"
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.sstore)
109 ssh = property(fget=__GetSSH)
111 def CheckPrereq(self):
112 """Check prerequisites for this LU.
114 This method should check that the prerequisites for the execution
115 of this LU are fulfilled. It can do internode communication, but
116 it should be idempotent - no cluster or system changes are
119 The method should raise errors.OpPrereqError in case something is
120 not fulfilled. Its return value is ignored.
122 This method should also update all the parameters of the opcode to
123 their canonical form; e.g. a short node name must be fully
124 expanded after this method has successfully completed (so that
125 hooks, logging, etc. work correctly).
128 raise NotImplementedError
130 def Exec(self, feedback_fn):
133 This method should implement the actual work. It should raise
134 errors.OpExecError for failures that are somewhat dealt with in
138 raise NotImplementedError
140 def BuildHooksEnv(self):
141 """Build hooks environment for this LU.
143 This method should return a three-node tuple consisting of: a dict
144 containing the environment that will be used for running the
145 specific hook for this LU, a list of node names on which the hook
146 should run before the execution, and a list of node names on which
147 the hook should run after the execution.
149 The keys of the dict must not have 'GANETI_' prefixed as this will
150 be handled in the hooks runner. Also note additional keys will be
151 added by the hooks runner. If the LU doesn't define any
152 environment, an empty dict (and not None) should be returned.
154 No nodes should be returned as an empty list (and not None).
156 Note that if the HPATH for a LU class is None, this function will
160 raise NotImplementedError
162 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
163 """Notify the LU about the results of its hooks.
165 This method is called every time a hooks phase is executed, and notifies
166 the Logical Unit about the hooks' result. The LU can then use it to alter
167 its result based on the hooks. By default the method does nothing and the
168 previous result is passed back unchanged but any LU can define it if it
169 wants to use the local cluster hook-scripts somehow.
172 phase: the hooks phase that has just been run
173 hooks_results: the results of the multi-node hooks rpc call
174 feedback_fn: function to send feedback back to the caller
175 lu_result: the previous result this LU had, or None in the PRE phase.
181 class NoHooksLU(LogicalUnit):
182 """Simple LU which runs no hooks.
184 This LU is intended as a parent for other LogicalUnits which will
185 run no hooks, in order to reduce duplicate code.
192 def _GetWantedNodes(lu, nodes):
193 """Returns list of checked and expanded node names.
196 nodes: List of nodes (strings) or None for all
199 if not isinstance(nodes, list):
200 raise errors.OpPrereqError("Invalid argument type 'nodes'")
206 node = lu.cfg.ExpandNodeName(name)
208 raise errors.OpPrereqError("No such node name '%s'" % name)
212 wanted = lu.cfg.GetNodeList()
213 return utils.NiceSort(wanted)
216 def _GetWantedInstances(lu, instances):
217 """Returns list of checked and expanded instance names.
220 instances: List of instances (strings) or None for all
223 if not isinstance(instances, list):
224 raise errors.OpPrereqError("Invalid argument type 'instances'")
229 for name in instances:
230 instance = lu.cfg.ExpandInstanceName(name)
232 raise errors.OpPrereqError("No such instance name '%s'" % name)
233 wanted.append(instance)
236 wanted = lu.cfg.GetInstanceList()
237 return utils.NiceSort(wanted)
240 def _CheckOutputFields(static, dynamic, selected):
241 """Checks whether all selected fields are valid.
244 static: Static fields
245 dynamic: Dynamic fields
248 static_fields = frozenset(static)
249 dynamic_fields = frozenset(dynamic)
251 all_fields = static_fields | dynamic_fields
253 if not all_fields.issuperset(selected):
254 raise errors.OpPrereqError("Unknown output fields selected: %s"
255 % ",".join(frozenset(selected).
256 difference(all_fields)))
259 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
260 memory, vcpus, nics):
261 """Builds instance related env variables for hooks from single variables.
264 secondary_nodes: List of secondary nodes as strings
268 "INSTANCE_NAME": name,
269 "INSTANCE_PRIMARY": primary_node,
270 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
271 "INSTANCE_OS_TYPE": os_type,
272 "INSTANCE_STATUS": status,
273 "INSTANCE_MEMORY": memory,
274 "INSTANCE_VCPUS": vcpus,
278 nic_count = len(nics)
279 for idx, (ip, bridge, mac) in enumerate(nics):
282 env["INSTANCE_NIC%d_IP" % idx] = ip
283 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
284 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
288 env["INSTANCE_NIC_COUNT"] = nic_count
293 def _BuildInstanceHookEnvByObject(instance, override=None):
294 """Builds instance related env variables for hooks from an object.
297 instance: objects.Instance object of instance
298 override: dict of values to override
301 'name': instance.name,
302 'primary_node': instance.primary_node,
303 'secondary_nodes': instance.secondary_nodes,
304 'os_type': instance.os,
305 'status': instance.os,
306 'memory': instance.memory,
307 'vcpus': instance.vcpus,
308 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
311 args.update(override)
312 return _BuildInstanceHookEnv(**args)
315 def _CheckInstanceBridgesExist(instance):
316 """Check that the brigdes needed by an instance exist.
319 # check bridges existance
320 brlist = [nic.bridge for nic in instance.nics]
321 if not rpc.call_bridges_exist(instance.primary_node, brlist):
322 raise errors.OpPrereqError("one or more target bridges %s does not"
323 " exist on destination node '%s'" %
324 (brlist, instance.primary_node))
327 class LUDestroyCluster(NoHooksLU):
328 """Logical unit for destroying the cluster.
333 def CheckPrereq(self):
334 """Check prerequisites.
336 This checks whether the cluster is empty.
338 Any errors are signalled by raising errors.OpPrereqError.
341 master = self.sstore.GetMasterNode()
343 nodelist = self.cfg.GetNodeList()
344 if len(nodelist) != 1 or nodelist[0] != master:
345 raise errors.OpPrereqError("There are still %d node(s) in"
346 " this cluster." % (len(nodelist) - 1))
347 instancelist = self.cfg.GetInstanceList()
349 raise errors.OpPrereqError("There are still %d instance(s) in"
350 " this cluster." % len(instancelist))
352 def Exec(self, feedback_fn):
353 """Destroys the cluster.
356 master = self.sstore.GetMasterNode()
357 if not rpc.call_node_stop_master(master):
358 raise errors.OpExecError("Could not disable the master role")
359 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
360 utils.CreateBackup(priv_key)
361 utils.CreateBackup(pub_key)
362 rpc.call_node_leave_cluster(master)
365 class LUVerifyCluster(LogicalUnit):
366 """Verifies the cluster status.
369 HPATH = "cluster-verify"
370 HTYPE = constants.HTYPE_CLUSTER
371 _OP_REQP = ["skip_checks"]
373 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
374 remote_version, feedback_fn):
375 """Run multiple tests against a node.
378 - compares ganeti version
379 - checks vg existance and size > 20G
380 - checks config file checksum
381 - checks ssh to other nodes
384 node: name of the node to check
385 file_list: required list of files
386 local_cksum: dictionary of local files and their checksums
389 # compares ganeti version
390 local_version = constants.PROTOCOL_VERSION
391 if not remote_version:
392 feedback_fn(" - ERROR: connection to %s failed" % (node))
395 if local_version != remote_version:
396 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
397 (local_version, node, remote_version))
400 # checks vg existance and size > 20G
404 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
408 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
409 constants.MIN_VG_SIZE)
411 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
414 # checks config file checksum
417 if 'filelist' not in node_result:
419 feedback_fn(" - ERROR: node hasn't returned file checksum data")
421 remote_cksum = node_result['filelist']
422 for file_name in file_list:
423 if file_name not in remote_cksum:
425 feedback_fn(" - ERROR: file '%s' missing" % file_name)
426 elif remote_cksum[file_name] != local_cksum[file_name]:
428 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
430 if 'nodelist' not in node_result:
432 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
434 if node_result['nodelist']:
436 for node in node_result['nodelist']:
437 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
438 (node, node_result['nodelist'][node]))
439 if 'node-net-test' not in node_result:
441 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
443 if node_result['node-net-test']:
445 nlist = utils.NiceSort(node_result['node-net-test'].keys())
447 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
448 (node, node_result['node-net-test'][node]))
450 hyp_result = node_result.get('hypervisor', None)
451 if hyp_result is not None:
452 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
455 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
456 node_instance, feedback_fn):
457 """Verify an instance.
459 This function checks to see if the required block devices are
460 available on the instance's node.
465 node_current = instanceconfig.primary_node
468 instanceconfig.MapLVsByNode(node_vol_should)
470 for node in node_vol_should:
471 for volume in node_vol_should[node]:
472 if node not in node_vol_is or volume not in node_vol_is[node]:
473 feedback_fn(" - ERROR: volume %s missing on node %s" %
477 if not instanceconfig.status == 'down':
478 if (node_current not in node_instance or
479 not instance in node_instance[node_current]):
480 feedback_fn(" - ERROR: instance %s not running on node %s" %
481 (instance, node_current))
484 for node in node_instance:
485 if (not node == node_current):
486 if instance in node_instance[node]:
487 feedback_fn(" - ERROR: instance %s should not run on node %s" %
493 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
494 """Verify if there are any unknown volumes in the cluster.
496 The .os, .swap and backup volumes are ignored. All other volumes are
502 for node in node_vol_is:
503 for volume in node_vol_is[node]:
504 if node not in node_vol_should or volume not in node_vol_should[node]:
505 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
510 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
511 """Verify the list of running instances.
513 This checks what instances are running but unknown to the cluster.
517 for node in node_instance:
518 for runninginstance in node_instance[node]:
519 if runninginstance not in instancelist:
520 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
521 (runninginstance, node))
525 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
526 """Verify N+1 Memory Resilience.
528 Check that if one single node dies we can still start all the instances it
534 for node, nodeinfo in node_info.iteritems():
535 # This code checks that every node which is now listed as secondary has
536 # enough memory to host all instances it is supposed to should a single
537 # other node in the cluster fail.
538 # FIXME: not ready for failover to an arbitrary node
539 # FIXME: does not support file-backed instances
540 # WARNING: we currently take into account down instances as well as up
541 # ones, considering that even if they're down someone might want to start
542 # them even in the event of a node failure.
543 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
545 for instance in instances:
546 needed_mem += instance_cfg[instance].memory
547 if nodeinfo['mfree'] < needed_mem:
548 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
549 " failovers should node %s fail" % (node, prinode))
553 def CheckPrereq(self):
554 """Check prerequisites.
556 Transform the list of checks we're going to skip into a set and check that
557 all its members are valid.
560 self.skip_set = frozenset(self.op.skip_checks)
561 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
562 raise errors.OpPrereqError("Invalid checks to be skipped specified")
564 def BuildHooksEnv(self):
567 Cluster-Verify hooks just rone in the post phase and their failure makes
568 the output be logged in the verify output and the verification to fail.
571 all_nodes = self.cfg.GetNodeList()
572 # TODO: populate the environment with useful information for verify hooks
574 return env, [], all_nodes
576 def Exec(self, feedback_fn):
577 """Verify integrity of cluster, performing various test on nodes.
581 feedback_fn("* Verifying global settings")
582 for msg in self.cfg.VerifyConfig():
583 feedback_fn(" - ERROR: %s" % msg)
585 vg_name = self.cfg.GetVGName()
586 nodelist = utils.NiceSort(self.cfg.GetNodeList())
587 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
588 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
589 i_non_redundant = [] # Non redundant instances
595 # FIXME: verify OS list
597 file_names = list(self.sstore.GetFileList())
598 file_names.append(constants.SSL_CERT_FILE)
599 file_names.append(constants.CLUSTER_CONF_FILE)
600 local_checksums = utils.FingerprintFiles(file_names)
602 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
603 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
604 all_instanceinfo = rpc.call_instance_list(nodelist)
605 all_vglist = rpc.call_vg_list(nodelist)
606 node_verify_param = {
607 'filelist': file_names,
608 'nodelist': nodelist,
610 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
611 for node in nodeinfo]
613 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
614 all_rversion = rpc.call_version(nodelist)
615 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
617 for node in nodelist:
618 feedback_fn("* Verifying node %s" % node)
619 result = self._VerifyNode(node, file_names, local_checksums,
620 all_vglist[node], all_nvinfo[node],
621 all_rversion[node], feedback_fn)
625 volumeinfo = all_volumeinfo[node]
627 if isinstance(volumeinfo, basestring):
628 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
629 (node, volumeinfo[-400:].encode('string_escape')))
631 node_volume[node] = {}
632 elif not isinstance(volumeinfo, dict):
633 feedback_fn(" - ERROR: connection to %s failed" % (node,))
637 node_volume[node] = volumeinfo
640 nodeinstance = all_instanceinfo[node]
641 if type(nodeinstance) != list:
642 feedback_fn(" - ERROR: connection to %s failed" % (node,))
646 node_instance[node] = nodeinstance
649 nodeinfo = all_ninfo[node]
650 if not isinstance(nodeinfo, dict):
651 feedback_fn(" - ERROR: connection to %s failed" % (node,))
657 "mfree": int(nodeinfo['memory_free']),
658 "dfree": int(nodeinfo['vg_free']),
661 # dictionary holding all instances this node is secondary for,
662 # grouped by their primary node. Each key is a cluster node, and each
663 # value is a list of instances which have the key as primary and the
664 # current node as secondary. this is handy to calculate N+1 memory
665 # availability if you can only failover from a primary to its
667 "sinst-by-pnode": {},
670 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
676 for instance in instancelist:
677 feedback_fn("* Verifying instance %s" % instance)
678 inst_config = self.cfg.GetInstanceInfo(instance)
679 result = self._VerifyInstance(instance, inst_config, node_volume,
680 node_instance, feedback_fn)
683 inst_config.MapLVsByNode(node_vol_should)
685 instance_cfg[instance] = inst_config
687 pnode = inst_config.primary_node
688 if pnode in node_info:
689 node_info[pnode]['pinst'].append(instance)
691 feedback_fn(" - ERROR: instance %s, connection to primary node"
692 " %s failed" % (instance, pnode))
695 # If the instance is non-redundant we cannot survive losing its primary
696 # node, so we are not N+1 compliant. On the other hand we have no disk
697 # templates with more than one secondary so that situation is not well
699 # FIXME: does not support file-backed instances
700 if len(inst_config.secondary_nodes) == 0:
701 i_non_redundant.append(instance)
702 elif len(inst_config.secondary_nodes) > 1:
703 feedback_fn(" - WARNING: multiple secondaries for instance %s"
706 for snode in inst_config.secondary_nodes:
707 if snode in node_info:
708 node_info[snode]['sinst'].append(instance)
709 if pnode not in node_info[snode]['sinst-by-pnode']:
710 node_info[snode]['sinst-by-pnode'][pnode] = []
711 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
713 feedback_fn(" - ERROR: instance %s, connection to secondary node"
714 " %s failed" % (instance, snode))
716 feedback_fn("* Verifying orphan volumes")
717 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
721 feedback_fn("* Verifying remaining instances")
722 result = self._VerifyOrphanInstances(instancelist, node_instance,
726 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
727 feedback_fn("* Verifying N+1 Memory redundancy")
728 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
731 feedback_fn("* Other Notes")
733 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
734 % len(i_non_redundant))
738 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
739 """Analize the post-hooks' result, handle it, and send some
740 nicely-formatted feedback back to the user.
743 phase: the hooks phase that has just been run
744 hooks_results: the results of the multi-node hooks rpc call
745 feedback_fn: function to send feedback back to the caller
746 lu_result: previous Exec result
749 # We only really run POST phase hooks, and are only interested in their results
750 if phase == constants.HOOKS_PHASE_POST:
751 # Used to change hooks' output to proper indentation
752 indent_re = re.compile('^', re.M)
753 feedback_fn("* Hooks Results")
754 if not hooks_results:
755 feedback_fn(" - ERROR: general communication failure")
758 for node_name in hooks_results:
759 show_node_header = True
760 res = hooks_results[node_name]
761 if res is False or not isinstance(res, list):
762 feedback_fn(" Communication failure")
765 for script, hkr, output in res:
766 if hkr == constants.HKR_FAIL:
767 # The node header is only shown once, if there are
768 # failing hooks on that node
770 feedback_fn(" Node %s:" % node_name)
771 show_node_header = False
772 feedback_fn(" ERROR: Script %s failed, output:" % script)
773 output = indent_re.sub(' ', output)
774 feedback_fn("%s" % output)
780 class LUVerifyDisks(NoHooksLU):
781 """Verifies the cluster disks status.
786 def CheckPrereq(self):
787 """Check prerequisites.
789 This has no prerequisites.
794 def Exec(self, feedback_fn):
795 """Verify integrity of cluster disks.
798 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
800 vg_name = self.cfg.GetVGName()
801 nodes = utils.NiceSort(self.cfg.GetNodeList())
802 instances = [self.cfg.GetInstanceInfo(name)
803 for name in self.cfg.GetInstanceList()]
806 for inst in instances:
808 if (inst.status != "up" or
809 inst.disk_template not in constants.DTS_NET_MIRROR):
811 inst.MapLVsByNode(inst_lvs)
812 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
813 for node, vol_list in inst_lvs.iteritems():
815 nv_dict[(node, vol)] = inst
820 node_lvs = rpc.call_volume_list(nodes, vg_name)
827 if isinstance(lvs, basestring):
828 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
830 elif not isinstance(lvs, dict):
831 logger.Info("connection to node %s failed or invalid data returned" %
833 res_nodes.append(node)
836 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
837 inst = nv_dict.pop((node, lv_name), None)
838 if (not lv_online and inst is not None
839 and inst.name not in res_instances):
840 res_instances.append(inst.name)
842 # any leftover items in nv_dict are missing LVs, let's arrange the
844 for key, inst in nv_dict.iteritems():
845 if inst.name not in res_missing:
846 res_missing[inst.name] = []
847 res_missing[inst.name].append(key)
852 class LURenameCluster(LogicalUnit):
853 """Rename the cluster.
856 HPATH = "cluster-rename"
857 HTYPE = constants.HTYPE_CLUSTER
861 def BuildHooksEnv(self):
866 "OP_TARGET": self.sstore.GetClusterName(),
867 "NEW_NAME": self.op.name,
869 mn = self.sstore.GetMasterNode()
870 return env, [mn], [mn]
872 def CheckPrereq(self):
873 """Verify that the passed name is a valid one.
876 hostname = utils.HostInfo(self.op.name)
878 new_name = hostname.name
879 self.ip = new_ip = hostname.ip
880 old_name = self.sstore.GetClusterName()
881 old_ip = self.sstore.GetMasterIP()
882 if new_name == old_name and new_ip == old_ip:
883 raise errors.OpPrereqError("Neither the name nor the IP address of the"
884 " cluster has changed")
886 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
887 raise errors.OpPrereqError("The given cluster IP address (%s) is"
888 " reachable on the network. Aborting." %
891 self.op.name = new_name
893 def Exec(self, feedback_fn):
894 """Rename the cluster.
897 clustername = self.op.name
901 # shutdown the master IP
902 master = ss.GetMasterNode()
903 if not rpc.call_node_stop_master(master):
904 raise errors.OpExecError("Could not disable the master role")
908 ss.SetKey(ss.SS_MASTER_IP, ip)
909 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
911 # Distribute updated ss config to all nodes
912 myself = self.cfg.GetNodeInfo(master)
913 dist_nodes = self.cfg.GetNodeList()
914 if myself.name in dist_nodes:
915 dist_nodes.remove(myself.name)
917 logger.Debug("Copying updated ssconf data to all nodes")
918 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
919 fname = ss.KeyToFilename(keyname)
920 result = rpc.call_upload_file(dist_nodes, fname)
921 for to_node in dist_nodes:
922 if not result[to_node]:
923 logger.Error("copy of file %s to node %s failed" %
926 if not rpc.call_node_start_master(master):
927 logger.Error("Could not re-enable the master role on the master,"
928 " please restart manually.")
931 def _RecursiveCheckIfLVMBased(disk):
932 """Check if the given disk or its children are lvm-based.
935 disk: ganeti.objects.Disk object
938 boolean indicating whether a LD_LV dev_type was found or not
942 for chdisk in disk.children:
943 if _RecursiveCheckIfLVMBased(chdisk):
945 return disk.dev_type == constants.LD_LV
948 class LUSetClusterParams(LogicalUnit):
949 """Change the parameters of the cluster.
952 HPATH = "cluster-modify"
953 HTYPE = constants.HTYPE_CLUSTER
956 def BuildHooksEnv(self):
961 "OP_TARGET": self.sstore.GetClusterName(),
962 "NEW_VG_NAME": self.op.vg_name,
964 mn = self.sstore.GetMasterNode()
965 return env, [mn], [mn]
967 def CheckPrereq(self):
968 """Check prerequisites.
970 This checks whether the given params don't conflict and
971 if the given volume group is valid.
974 if not self.op.vg_name:
975 instances = [self.cfg.GetInstanceInfo(name)
976 for name in self.cfg.GetInstanceList()]
977 for inst in instances:
978 for disk in inst.disks:
979 if _RecursiveCheckIfLVMBased(disk):
980 raise errors.OpPrereqError("Cannot disable lvm storage while"
981 " lvm-based instances exist")
983 # if vg_name not None, checks given volume group on all nodes
985 node_list = self.cfg.GetNodeList()
986 vglist = rpc.call_vg_list(node_list)
987 for node in node_list:
988 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
989 constants.MIN_VG_SIZE)
991 raise errors.OpPrereqError("Error on node '%s': %s" %
994 def Exec(self, feedback_fn):
995 """Change the parameters of the cluster.
998 if self.op.vg_name != self.cfg.GetVGName():
999 self.cfg.SetVGName(self.op.vg_name)
1001 feedback_fn("Cluster LVM configuration already in desired"
1002 " state, not changing")
1005 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1006 """Sleep and poll for an instance's disk to sync.
1009 if not instance.disks:
1013 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1015 node = instance.primary_node
1017 for dev in instance.disks:
1018 cfgw.SetDiskID(dev, node)
1024 cumul_degraded = False
1025 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1027 proc.LogWarning("Can't get any data from node %s" % node)
1030 raise errors.RemoteError("Can't contact node %s for mirror data,"
1031 " aborting." % node)
1035 for i in range(len(rstats)):
1038 proc.LogWarning("Can't compute data for node %s/%s" %
1039 (node, instance.disks[i].iv_name))
1041 # we ignore the ldisk parameter
1042 perc_done, est_time, is_degraded, _ = mstat
1043 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1044 if perc_done is not None:
1046 if est_time is not None:
1047 rem_time = "%d estimated seconds remaining" % est_time
1050 rem_time = "no time estimate"
1051 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1052 (instance.disks[i].iv_name, perc_done, rem_time))
1057 #utils.Unlock('cmd')
1060 time.sleep(min(60, max_time))
1067 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1068 return not cumul_degraded
1071 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1072 """Check that mirrors are not degraded.
1074 The ldisk parameter, if True, will change the test from the
1075 is_degraded attribute (which represents overall non-ok status for
1076 the device(s)) to the ldisk (representing the local storage status).
1079 cfgw.SetDiskID(dev, node)
1086 if on_primary or dev.AssembleOnSecondary():
1087 rstats = rpc.call_blockdev_find(node, dev)
1089 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1092 result = result and (not rstats[idx])
1094 for child in dev.children:
1095 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1100 class LUDiagnoseOS(NoHooksLU):
1101 """Logical unit for OS diagnose/query.
1104 _OP_REQP = ["output_fields", "names"]
1106 def CheckPrereq(self):
1107 """Check prerequisites.
1109 This always succeeds, since this is a pure query LU.
1113 raise errors.OpPrereqError("Selective OS query not supported")
1115 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1116 _CheckOutputFields(static=[],
1117 dynamic=self.dynamic_fields,
1118 selected=self.op.output_fields)
1121 def _DiagnoseByOS(node_list, rlist):
1122 """Remaps a per-node return list into an a per-os per-node dictionary
1125 node_list: a list with the names of all nodes
1126 rlist: a map with node names as keys and OS objects as values
1129 map: a map with osnames as keys and as value another map, with
1131 keys and list of OS objects as values
1132 e.g. {"debian-etch": {"node1": [<object>,...],
1133 "node2": [<object>,]}
1138 for node_name, nr in rlist.iteritems():
1142 if os_obj.name not in all_os:
1143 # build a list of nodes for this os containing empty lists
1144 # for each node in node_list
1145 all_os[os_obj.name] = {}
1146 for nname in node_list:
1147 all_os[os_obj.name][nname] = []
1148 all_os[os_obj.name][node_name].append(os_obj)
1151 def Exec(self, feedback_fn):
1152 """Compute the list of OSes.
1155 node_list = self.cfg.GetNodeList()
1156 node_data = rpc.call_os_diagnose(node_list)
1157 if node_data == False:
1158 raise errors.OpExecError("Can't gather the list of OSes")
1159 pol = self._DiagnoseByOS(node_list, node_data)
1161 for os_name, os_data in pol.iteritems():
1163 for field in self.op.output_fields:
1166 elif field == "valid":
1167 val = utils.all([osl and osl[0] for osl in os_data.values()])
1168 elif field == "node_status":
1170 for node_name, nos_list in os_data.iteritems():
1171 val[node_name] = [(v.status, v.path) for v in nos_list]
1173 raise errors.ParameterError(field)
1180 class LURemoveNode(LogicalUnit):
1181 """Logical unit for removing a node.
1184 HPATH = "node-remove"
1185 HTYPE = constants.HTYPE_NODE
1186 _OP_REQP = ["node_name"]
1188 def BuildHooksEnv(self):
1191 This doesn't run on the target node in the pre phase as a failed
1192 node would then be impossible to remove.
1196 "OP_TARGET": self.op.node_name,
1197 "NODE_NAME": self.op.node_name,
1199 all_nodes = self.cfg.GetNodeList()
1200 all_nodes.remove(self.op.node_name)
1201 return env, all_nodes, all_nodes
1203 def CheckPrereq(self):
1204 """Check prerequisites.
1207 - the node exists in the configuration
1208 - it does not have primary or secondary instances
1209 - it's not the master
1211 Any errors are signalled by raising errors.OpPrereqError.
1214 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1216 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1218 instance_list = self.cfg.GetInstanceList()
1220 masternode = self.sstore.GetMasterNode()
1221 if node.name == masternode:
1222 raise errors.OpPrereqError("Node is the master node,"
1223 " you need to failover first.")
1225 for instance_name in instance_list:
1226 instance = self.cfg.GetInstanceInfo(instance_name)
1227 if node.name == instance.primary_node:
1228 raise errors.OpPrereqError("Instance %s still running on the node,"
1229 " please remove first." % instance_name)
1230 if node.name in instance.secondary_nodes:
1231 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1232 " please remove first." % instance_name)
1233 self.op.node_name = node.name
1236 def Exec(self, feedback_fn):
1237 """Removes the node from the cluster.
1241 logger.Info("stopping the node daemon and removing configs from node %s" %
1244 rpc.call_node_leave_cluster(node.name)
1246 logger.Info("Removing node %s from config" % node.name)
1248 self.cfg.RemoveNode(node.name)
1250 utils.RemoveHostFromEtcHosts(node.name)
1253 class LUQueryNodes(NoHooksLU):
1254 """Logical unit for querying nodes.
1257 _OP_REQP = ["output_fields", "names"]
1259 def CheckPrereq(self):
1260 """Check prerequisites.
1262 This checks that the fields required are valid output fields.
1265 self.dynamic_fields = frozenset([
1267 "mtotal", "mnode", "mfree",
1272 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273 "pinst_list", "sinst_list",
1274 "pip", "sip", "tags"],
1275 dynamic=self.dynamic_fields,
1276 selected=self.op.output_fields)
1278 self.wanted = _GetWantedNodes(self, self.op.names)
1280 def Exec(self, feedback_fn):
1281 """Computes the list of nodes and their attributes.
1284 nodenames = self.wanted
1285 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1287 # begin data gathering
1289 if self.dynamic_fields.intersection(self.op.output_fields):
1291 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292 for name in nodenames:
1293 nodeinfo = node_data.get(name, None)
1296 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302 "bootid": nodeinfo['bootid'],
1305 live_data[name] = {}
1307 live_data = dict.fromkeys(nodenames, {})
1309 node_to_primary = dict([(name, set()) for name in nodenames])
1310 node_to_secondary = dict([(name, set()) for name in nodenames])
1312 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313 "sinst_cnt", "sinst_list"))
1314 if inst_fields & frozenset(self.op.output_fields):
1315 instancelist = self.cfg.GetInstanceList()
1317 for instance_name in instancelist:
1318 inst = self.cfg.GetInstanceInfo(instance_name)
1319 if inst.primary_node in node_to_primary:
1320 node_to_primary[inst.primary_node].add(inst.name)
1321 for secnode in inst.secondary_nodes:
1322 if secnode in node_to_secondary:
1323 node_to_secondary[secnode].add(inst.name)
1325 # end data gathering
1328 for node in nodelist:
1330 for field in self.op.output_fields:
1333 elif field == "pinst_list":
1334 val = list(node_to_primary[node.name])
1335 elif field == "sinst_list":
1336 val = list(node_to_secondary[node.name])
1337 elif field == "pinst_cnt":
1338 val = len(node_to_primary[node.name])
1339 elif field == "sinst_cnt":
1340 val = len(node_to_secondary[node.name])
1341 elif field == "pip":
1342 val = node.primary_ip
1343 elif field == "sip":
1344 val = node.secondary_ip
1345 elif field == "tags":
1346 val = list(node.GetTags())
1347 elif field in self.dynamic_fields:
1348 val = live_data[node.name].get(field, None)
1350 raise errors.ParameterError(field)
1351 node_output.append(val)
1352 output.append(node_output)
1357 class LUQueryNodeVolumes(NoHooksLU):
1358 """Logical unit for getting volumes on node(s).
1361 _OP_REQP = ["nodes", "output_fields"]
1363 def CheckPrereq(self):
1364 """Check prerequisites.
1366 This checks that the fields required are valid output fields.
1369 self.nodes = _GetWantedNodes(self, self.op.nodes)
1371 _CheckOutputFields(static=["node"],
1372 dynamic=["phys", "vg", "name", "size", "instance"],
1373 selected=self.op.output_fields)
1376 def Exec(self, feedback_fn):
1377 """Computes the list of nodes and their attributes.
1380 nodenames = self.nodes
1381 volumes = rpc.call_node_volumes(nodenames)
1383 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1384 in self.cfg.GetInstanceList()]
1386 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1389 for node in nodenames:
1390 if node not in volumes or not volumes[node]:
1393 node_vols = volumes[node][:]
1394 node_vols.sort(key=lambda vol: vol['dev'])
1396 for vol in node_vols:
1398 for field in self.op.output_fields:
1401 elif field == "phys":
1405 elif field == "name":
1407 elif field == "size":
1408 val = int(float(vol['size']))
1409 elif field == "instance":
1411 if node not in lv_by_node[inst]:
1413 if vol['name'] in lv_by_node[inst][node]:
1419 raise errors.ParameterError(field)
1420 node_output.append(str(val))
1422 output.append(node_output)
1427 class LUAddNode(LogicalUnit):
1428 """Logical unit for adding node to the cluster.
1432 HTYPE = constants.HTYPE_NODE
1433 _OP_REQP = ["node_name"]
1435 def BuildHooksEnv(self):
1438 This will run on all nodes before, and on all nodes + the new node after.
1442 "OP_TARGET": self.op.node_name,
1443 "NODE_NAME": self.op.node_name,
1444 "NODE_PIP": self.op.primary_ip,
1445 "NODE_SIP": self.op.secondary_ip,
1447 nodes_0 = self.cfg.GetNodeList()
1448 nodes_1 = nodes_0 + [self.op.node_name, ]
1449 return env, nodes_0, nodes_1
1451 def CheckPrereq(self):
1452 """Check prerequisites.
1455 - the new node is not already in the config
1457 - its parameters (single/dual homed) matches the cluster
1459 Any errors are signalled by raising errors.OpPrereqError.
1462 node_name = self.op.node_name
1465 dns_data = utils.HostInfo(node_name)
1467 node = dns_data.name
1468 primary_ip = self.op.primary_ip = dns_data.ip
1469 secondary_ip = getattr(self.op, "secondary_ip", None)
1470 if secondary_ip is None:
1471 secondary_ip = primary_ip
1472 if not utils.IsValidIP(secondary_ip):
1473 raise errors.OpPrereqError("Invalid secondary IP given")
1474 self.op.secondary_ip = secondary_ip
1476 node_list = cfg.GetNodeList()
1477 if not self.op.readd and node in node_list:
1478 raise errors.OpPrereqError("Node %s is already in the configuration" %
1480 elif self.op.readd and node not in node_list:
1481 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1483 for existing_node_name in node_list:
1484 existing_node = cfg.GetNodeInfo(existing_node_name)
1486 if self.op.readd and node == existing_node_name:
1487 if (existing_node.primary_ip != primary_ip or
1488 existing_node.secondary_ip != secondary_ip):
1489 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1490 " address configuration as before")
1493 if (existing_node.primary_ip == primary_ip or
1494 existing_node.secondary_ip == primary_ip or
1495 existing_node.primary_ip == secondary_ip or
1496 existing_node.secondary_ip == secondary_ip):
1497 raise errors.OpPrereqError("New node ip address(es) conflict with"
1498 " existing node %s" % existing_node.name)
1500 # check that the type of the node (single versus dual homed) is the
1501 # same as for the master
1502 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1503 master_singlehomed = myself.secondary_ip == myself.primary_ip
1504 newbie_singlehomed = secondary_ip == primary_ip
1505 if master_singlehomed != newbie_singlehomed:
1506 if master_singlehomed:
1507 raise errors.OpPrereqError("The master has no private ip but the"
1508 " new node has one")
1510 raise errors.OpPrereqError("The master has a private ip but the"
1511 " new node doesn't have one")
1513 # checks reachablity
1514 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1515 raise errors.OpPrereqError("Node not reachable by ping")
1517 if not newbie_singlehomed:
1518 # check reachability from my secondary ip to newbie's secondary ip
1519 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1520 source=myself.secondary_ip):
1521 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1522 " based ping to noded port")
1524 self.new_node = objects.Node(name=node,
1525 primary_ip=primary_ip,
1526 secondary_ip=secondary_ip)
1528 def Exec(self, feedback_fn):
1529 """Adds the new node to the cluster.
1532 new_node = self.new_node
1533 node = new_node.name
1535 # check connectivity
1536 result = rpc.call_version([node])[node]
1538 if constants.PROTOCOL_VERSION == result:
1539 logger.Info("communication to node %s fine, sw version %s match" %
1542 raise errors.OpExecError("Version mismatch master version %s,"
1543 " node version %s" %
1544 (constants.PROTOCOL_VERSION, result))
1546 raise errors.OpExecError("Cannot get version from the new node")
1549 logger.Info("copy ssh key to node %s" % node)
1550 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1552 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1553 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1559 keyarray.append(f.read())
1563 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1564 keyarray[3], keyarray[4], keyarray[5])
1567 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1569 # Add node to our /etc/hosts, and add key to known_hosts
1570 utils.AddHostToEtcHosts(new_node.name)
1572 if new_node.secondary_ip != new_node.primary_ip:
1573 if not rpc.call_node_tcp_ping(new_node.name,
1574 constants.LOCALHOST_IP_ADDRESS,
1575 new_node.secondary_ip,
1576 constants.DEFAULT_NODED_PORT,
1578 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1579 " you gave (%s). Please fix and re-run this"
1580 " command." % new_node.secondary_ip)
1582 node_verify_list = [self.sstore.GetMasterNode()]
1583 node_verify_param = {
1585 # TODO: do a node-net-test as well?
1588 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1589 for verifier in node_verify_list:
1590 if not result[verifier]:
1591 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1592 " for remote verification" % verifier)
1593 if result[verifier]['nodelist']:
1594 for failed in result[verifier]['nodelist']:
1595 feedback_fn("ssh/hostname verification failed %s -> %s" %
1596 (verifier, result[verifier]['nodelist'][failed]))
1597 raise errors.OpExecError("ssh/hostname verification failed.")
1599 # Distribute updated /etc/hosts and known_hosts to all nodes,
1600 # including the node just added
1601 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1602 dist_nodes = self.cfg.GetNodeList()
1603 if not self.op.readd:
1604 dist_nodes.append(node)
1605 if myself.name in dist_nodes:
1606 dist_nodes.remove(myself.name)
1608 logger.Debug("Copying hosts and known_hosts to all nodes")
1609 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1610 result = rpc.call_upload_file(dist_nodes, fname)
1611 for to_node in dist_nodes:
1612 if not result[to_node]:
1613 logger.Error("copy of file %s to node %s failed" %
1616 to_copy = self.sstore.GetFileList()
1617 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1618 to_copy.append(constants.VNC_PASSWORD_FILE)
1619 for fname in to_copy:
1620 result = rpc.call_upload_file([node], fname)
1621 if not result[node]:
1622 logger.Error("could not copy file %s to node %s" % (fname, node))
1624 if not self.op.readd:
1625 logger.Info("adding node %s to cluster.conf" % node)
1626 self.cfg.AddNode(new_node)
1629 class LUMasterFailover(LogicalUnit):
1630 """Failover the master node to the current node.
1632 This is a special LU in that it must run on a non-master node.
1635 HPATH = "master-failover"
1636 HTYPE = constants.HTYPE_CLUSTER
1641 def BuildHooksEnv(self):
1644 This will run on the new master only in the pre phase, and on all
1645 the nodes in the post phase.
1649 "OP_TARGET": self.new_master,
1650 "NEW_MASTER": self.new_master,
1651 "OLD_MASTER": self.old_master,
1653 return env, [self.new_master], self.cfg.GetNodeList()
1655 def CheckPrereq(self):
1656 """Check prerequisites.
1658 This checks that we are not already the master.
1661 self.new_master = utils.HostInfo().name
1662 self.old_master = self.sstore.GetMasterNode()
1664 if self.old_master == self.new_master:
1665 raise errors.OpPrereqError("This commands must be run on the node"
1666 " where you want the new master to be."
1667 " %s is already the master" %
1670 def Exec(self, feedback_fn):
1671 """Failover the master node.
1673 This command, when run on a non-master node, will cause the current
1674 master to cease being master, and the non-master to become new
1678 #TODO: do not rely on gethostname returning the FQDN
1679 logger.Info("setting master to %s, old master: %s" %
1680 (self.new_master, self.old_master))
1682 if not rpc.call_node_stop_master(self.old_master):
1683 logger.Error("could disable the master role on the old master"
1684 " %s, please disable manually" % self.old_master)
1687 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1688 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1689 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1690 logger.Error("could not distribute the new simple store master file"
1691 " to the other nodes, please check.")
1693 if not rpc.call_node_start_master(self.new_master):
1694 logger.Error("could not start the master role on the new master"
1695 " %s, please check" % self.new_master)
1696 feedback_fn("Error in activating the master IP on the new master,"
1697 " please fix manually.")
1701 class LUQueryClusterInfo(NoHooksLU):
1702 """Query cluster configuration.
1708 def CheckPrereq(self):
1709 """No prerequsites needed for this LU.
1714 def Exec(self, feedback_fn):
1715 """Return cluster config.
1719 "name": self.sstore.GetClusterName(),
1720 "software_version": constants.RELEASE_VERSION,
1721 "protocol_version": constants.PROTOCOL_VERSION,
1722 "config_version": constants.CONFIG_VERSION,
1723 "os_api_version": constants.OS_API_VERSION,
1724 "export_version": constants.EXPORT_VERSION,
1725 "master": self.sstore.GetMasterNode(),
1726 "architecture": (platform.architecture()[0], platform.machine()),
1727 "hypervisor_type": self.sstore.GetHypervisorType(),
1733 class LUDumpClusterConfig(NoHooksLU):
1734 """Return a text-representation of the cluster-config.
1739 def CheckPrereq(self):
1740 """No prerequisites.
1745 def Exec(self, feedback_fn):
1746 """Dump a representation of the cluster config to the standard output.
1749 return self.cfg.DumpConfig()
1752 class LUActivateInstanceDisks(NoHooksLU):
1753 """Bring up an instance's disks.
1756 _OP_REQP = ["instance_name"]
1758 def CheckPrereq(self):
1759 """Check prerequisites.
1761 This checks that the instance is in the cluster.
1764 instance = self.cfg.GetInstanceInfo(
1765 self.cfg.ExpandInstanceName(self.op.instance_name))
1766 if instance is None:
1767 raise errors.OpPrereqError("Instance '%s' not known" %
1768 self.op.instance_name)
1769 self.instance = instance
1772 def Exec(self, feedback_fn):
1773 """Activate the disks.
1776 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1778 raise errors.OpExecError("Cannot activate block devices")
1783 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1784 """Prepare the block devices for an instance.
1786 This sets up the block devices on all nodes.
1789 instance: a ganeti.objects.Instance object
1790 ignore_secondaries: if true, errors on secondary nodes won't result
1791 in an error return from the function
1794 false if the operation failed
1795 list of (host, instance_visible_name, node_visible_name) if the operation
1796 suceeded with the mapping from node devices to instance devices
1800 iname = instance.name
1801 # With the two passes mechanism we try to reduce the window of
1802 # opportunity for the race condition of switching DRBD to primary
1803 # before handshaking occured, but we do not eliminate it
1805 # The proper fix would be to wait (with some limits) until the
1806 # connection has been made and drbd transitions from WFConnection
1807 # into any other network-connected state (Connected, SyncTarget,
1810 # 1st pass, assemble on all nodes in secondary mode
1811 for inst_disk in instance.disks:
1812 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1813 cfg.SetDiskID(node_disk, node)
1814 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1816 logger.Error("could not prepare block device %s on node %s"
1817 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1818 if not ignore_secondaries:
1821 # FIXME: race condition on drbd migration to primary
1823 # 2nd pass, do only the primary node
1824 for inst_disk in instance.disks:
1825 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1826 if node != instance.primary_node:
1828 cfg.SetDiskID(node_disk, node)
1829 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1831 logger.Error("could not prepare block device %s on node %s"
1832 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1834 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1836 # leave the disks configured for the primary node
1837 # this is a workaround that would be fixed better by
1838 # improving the logical/physical id handling
1839 for disk in instance.disks:
1840 cfg.SetDiskID(disk, instance.primary_node)
1842 return disks_ok, device_info
1845 def _StartInstanceDisks(cfg, instance, force):
1846 """Start the disks of an instance.
1849 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1850 ignore_secondaries=force)
1852 _ShutdownInstanceDisks(instance, cfg)
1853 if force is not None and not force:
1854 logger.Error("If the message above refers to a secondary node,"
1855 " you can retry the operation using '--force'.")
1856 raise errors.OpExecError("Disk consistency error")
1859 class LUDeactivateInstanceDisks(NoHooksLU):
1860 """Shutdown an instance's disks.
1863 _OP_REQP = ["instance_name"]
1865 def CheckPrereq(self):
1866 """Check prerequisites.
1868 This checks that the instance is in the cluster.
1871 instance = self.cfg.GetInstanceInfo(
1872 self.cfg.ExpandInstanceName(self.op.instance_name))
1873 if instance is None:
1874 raise errors.OpPrereqError("Instance '%s' not known" %
1875 self.op.instance_name)
1876 self.instance = instance
1878 def Exec(self, feedback_fn):
1879 """Deactivate the disks
1882 instance = self.instance
1883 ins_l = rpc.call_instance_list([instance.primary_node])
1884 ins_l = ins_l[instance.primary_node]
1885 if not type(ins_l) is list:
1886 raise errors.OpExecError("Can't contact node '%s'" %
1887 instance.primary_node)
1889 if self.instance.name in ins_l:
1890 raise errors.OpExecError("Instance is running, can't shutdown"
1893 _ShutdownInstanceDisks(instance, self.cfg)
1896 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1897 """Shutdown block devices of an instance.
1899 This does the shutdown on all nodes of the instance.
1901 If the ignore_primary is false, errors on the primary node are
1906 for disk in instance.disks:
1907 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1908 cfg.SetDiskID(top_disk, node)
1909 if not rpc.call_blockdev_shutdown(node, top_disk):
1910 logger.Error("could not shutdown block device %s on node %s" %
1911 (disk.iv_name, node))
1912 if not ignore_primary or node != instance.primary_node:
1917 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1918 """Checks if a node has enough free memory.
1920 This function check if a given node has the needed amount of free
1921 memory. In case the node has less memory or we cannot get the
1922 information from the node, this function raise an OpPrereqError
1926 - cfg: a ConfigWriter instance
1927 - node: the node name
1928 - reason: string to use in the error message
1929 - requested: the amount of memory in MiB
1932 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1933 if not nodeinfo or not isinstance(nodeinfo, dict):
1934 raise errors.OpPrereqError("Could not contact node %s for resource"
1935 " information" % (node,))
1937 free_mem = nodeinfo[node].get('memory_free')
1938 if not isinstance(free_mem, int):
1939 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1940 " was '%s'" % (node, free_mem))
1941 if requested > free_mem:
1942 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1943 " needed %s MiB, available %s MiB" %
1944 (node, reason, requested, free_mem))
1947 class LUStartupInstance(LogicalUnit):
1948 """Starts an instance.
1951 HPATH = "instance-start"
1952 HTYPE = constants.HTYPE_INSTANCE
1953 _OP_REQP = ["instance_name", "force"]
1955 def BuildHooksEnv(self):
1958 This runs on master, primary and secondary nodes of the instance.
1962 "FORCE": self.op.force,
1964 env.update(_BuildInstanceHookEnvByObject(self.instance))
1965 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1966 list(self.instance.secondary_nodes))
1969 def CheckPrereq(self):
1970 """Check prerequisites.
1972 This checks that the instance is in the cluster.
1975 instance = self.cfg.GetInstanceInfo(
1976 self.cfg.ExpandInstanceName(self.op.instance_name))
1977 if instance is None:
1978 raise errors.OpPrereqError("Instance '%s' not known" %
1979 self.op.instance_name)
1981 # check bridges existance
1982 _CheckInstanceBridgesExist(instance)
1984 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1985 "starting instance %s" % instance.name,
1988 self.instance = instance
1989 self.op.instance_name = instance.name
1991 def Exec(self, feedback_fn):
1992 """Start the instance.
1995 instance = self.instance
1996 force = self.op.force
1997 extra_args = getattr(self.op, "extra_args", "")
1999 self.cfg.MarkInstanceUp(instance.name)
2001 node_current = instance.primary_node
2003 _StartInstanceDisks(self.cfg, instance, force)
2005 if not rpc.call_instance_start(node_current, instance, extra_args):
2006 _ShutdownInstanceDisks(instance, self.cfg)
2007 raise errors.OpExecError("Could not start instance")
2010 class LURebootInstance(LogicalUnit):
2011 """Reboot an instance.
2014 HPATH = "instance-reboot"
2015 HTYPE = constants.HTYPE_INSTANCE
2016 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2018 def BuildHooksEnv(self):
2021 This runs on master, primary and secondary nodes of the instance.
2025 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2027 env.update(_BuildInstanceHookEnvByObject(self.instance))
2028 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029 list(self.instance.secondary_nodes))
2032 def CheckPrereq(self):
2033 """Check prerequisites.
2035 This checks that the instance is in the cluster.
2038 instance = self.cfg.GetInstanceInfo(
2039 self.cfg.ExpandInstanceName(self.op.instance_name))
2040 if instance is None:
2041 raise errors.OpPrereqError("Instance '%s' not known" %
2042 self.op.instance_name)
2044 # check bridges existance
2045 _CheckInstanceBridgesExist(instance)
2047 self.instance = instance
2048 self.op.instance_name = instance.name
2050 def Exec(self, feedback_fn):
2051 """Reboot the instance.
2054 instance = self.instance
2055 ignore_secondaries = self.op.ignore_secondaries
2056 reboot_type = self.op.reboot_type
2057 extra_args = getattr(self.op, "extra_args", "")
2059 node_current = instance.primary_node
2061 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2062 constants.INSTANCE_REBOOT_HARD,
2063 constants.INSTANCE_REBOOT_FULL]:
2064 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2065 (constants.INSTANCE_REBOOT_SOFT,
2066 constants.INSTANCE_REBOOT_HARD,
2067 constants.INSTANCE_REBOOT_FULL))
2069 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2070 constants.INSTANCE_REBOOT_HARD]:
2071 if not rpc.call_instance_reboot(node_current, instance,
2072 reboot_type, extra_args):
2073 raise errors.OpExecError("Could not reboot instance")
2075 if not rpc.call_instance_shutdown(node_current, instance):
2076 raise errors.OpExecError("could not shutdown instance for full reboot")
2077 _ShutdownInstanceDisks(instance, self.cfg)
2078 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2079 if not rpc.call_instance_start(node_current, instance, extra_args):
2080 _ShutdownInstanceDisks(instance, self.cfg)
2081 raise errors.OpExecError("Could not start instance for full reboot")
2083 self.cfg.MarkInstanceUp(instance.name)
2086 class LUShutdownInstance(LogicalUnit):
2087 """Shutdown an instance.
2090 HPATH = "instance-stop"
2091 HTYPE = constants.HTYPE_INSTANCE
2092 _OP_REQP = ["instance_name"]
2094 def BuildHooksEnv(self):
2097 This runs on master, primary and secondary nodes of the instance.
2100 env = _BuildInstanceHookEnvByObject(self.instance)
2101 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2102 list(self.instance.secondary_nodes))
2105 def CheckPrereq(self):
2106 """Check prerequisites.
2108 This checks that the instance is in the cluster.
2111 instance = self.cfg.GetInstanceInfo(
2112 self.cfg.ExpandInstanceName(self.op.instance_name))
2113 if instance is None:
2114 raise errors.OpPrereqError("Instance '%s' not known" %
2115 self.op.instance_name)
2116 self.instance = instance
2118 def Exec(self, feedback_fn):
2119 """Shutdown the instance.
2122 instance = self.instance
2123 node_current = instance.primary_node
2124 self.cfg.MarkInstanceDown(instance.name)
2125 if not rpc.call_instance_shutdown(node_current, instance):
2126 logger.Error("could not shutdown instance")
2128 _ShutdownInstanceDisks(instance, self.cfg)
2131 class LUReinstallInstance(LogicalUnit):
2132 """Reinstall an instance.
2135 HPATH = "instance-reinstall"
2136 HTYPE = constants.HTYPE_INSTANCE
2137 _OP_REQP = ["instance_name"]
2139 def BuildHooksEnv(self):
2142 This runs on master, primary and secondary nodes of the instance.
2145 env = _BuildInstanceHookEnvByObject(self.instance)
2146 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2147 list(self.instance.secondary_nodes))
2150 def CheckPrereq(self):
2151 """Check prerequisites.
2153 This checks that the instance is in the cluster and is not running.
2156 instance = self.cfg.GetInstanceInfo(
2157 self.cfg.ExpandInstanceName(self.op.instance_name))
2158 if instance is None:
2159 raise errors.OpPrereqError("Instance '%s' not known" %
2160 self.op.instance_name)
2161 if instance.disk_template == constants.DT_DISKLESS:
2162 raise errors.OpPrereqError("Instance '%s' has no disks" %
2163 self.op.instance_name)
2164 if instance.status != "down":
2165 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2166 self.op.instance_name)
2167 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2169 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2170 (self.op.instance_name,
2171 instance.primary_node))
2173 self.op.os_type = getattr(self.op, "os_type", None)
2174 if self.op.os_type is not None:
2176 pnode = self.cfg.GetNodeInfo(
2177 self.cfg.ExpandNodeName(instance.primary_node))
2179 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2181 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2183 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2184 " primary node" % self.op.os_type)
2186 self.instance = instance
2188 def Exec(self, feedback_fn):
2189 """Reinstall the instance.
2192 inst = self.instance
2194 if self.op.os_type is not None:
2195 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2196 inst.os = self.op.os_type
2197 self.cfg.AddInstance(inst)
2199 _StartInstanceDisks(self.cfg, inst, None)
2201 feedback_fn("Running the instance OS create scripts...")
2202 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2203 raise errors.OpExecError("Could not install OS for instance %s"
2205 (inst.name, inst.primary_node))
2207 _ShutdownInstanceDisks(inst, self.cfg)
2210 class LURenameInstance(LogicalUnit):
2211 """Rename an instance.
2214 HPATH = "instance-rename"
2215 HTYPE = constants.HTYPE_INSTANCE
2216 _OP_REQP = ["instance_name", "new_name"]
2218 def BuildHooksEnv(self):
2221 This runs on master, primary and secondary nodes of the instance.
2224 env = _BuildInstanceHookEnvByObject(self.instance)
2225 env["INSTANCE_NEW_NAME"] = self.op.new_name
2226 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227 list(self.instance.secondary_nodes))
2230 def CheckPrereq(self):
2231 """Check prerequisites.
2233 This checks that the instance is in the cluster and is not running.
2236 instance = self.cfg.GetInstanceInfo(
2237 self.cfg.ExpandInstanceName(self.op.instance_name))
2238 if instance is None:
2239 raise errors.OpPrereqError("Instance '%s' not known" %
2240 self.op.instance_name)
2241 if instance.status != "down":
2242 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2243 self.op.instance_name)
2244 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2246 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2247 (self.op.instance_name,
2248 instance.primary_node))
2249 self.instance = instance
2251 # new name verification
2252 name_info = utils.HostInfo(self.op.new_name)
2254 self.op.new_name = new_name = name_info.name
2255 instance_list = self.cfg.GetInstanceList()
2256 if new_name in instance_list:
2257 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2260 if not getattr(self.op, "ignore_ip", False):
2261 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2262 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2263 (name_info.ip, new_name))
2266 def Exec(self, feedback_fn):
2267 """Reinstall the instance.
2270 inst = self.instance
2271 old_name = inst.name
2273 if inst.disk_template == constants.DT_FILE:
2274 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2276 self.cfg.RenameInstance(inst.name, self.op.new_name)
2278 # re-read the instance from the configuration after rename
2279 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2281 if inst.disk_template == constants.DT_FILE:
2282 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2283 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2284 old_file_storage_dir,
2285 new_file_storage_dir)
2288 raise errors.OpExecError("Could not connect to node '%s' to rename"
2289 " directory '%s' to '%s' (but the instance"
2290 " has been renamed in Ganeti)" % (
2291 inst.primary_node, old_file_storage_dir,
2292 new_file_storage_dir))
2295 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2296 " (but the instance has been renamed in"
2297 " Ganeti)" % (old_file_storage_dir,
2298 new_file_storage_dir))
2300 _StartInstanceDisks(self.cfg, inst, None)
2302 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2304 msg = ("Could run OS rename script for instance %s on node %s (but the"
2305 " instance has been renamed in Ganeti)" %
2306 (inst.name, inst.primary_node))
2309 _ShutdownInstanceDisks(inst, self.cfg)
2312 class LURemoveInstance(LogicalUnit):
2313 """Remove an instance.
2316 HPATH = "instance-remove"
2317 HTYPE = constants.HTYPE_INSTANCE
2318 _OP_REQP = ["instance_name", "ignore_failures"]
2320 def BuildHooksEnv(self):
2323 This runs on master, primary and secondary nodes of the instance.
2326 env = _BuildInstanceHookEnvByObject(self.instance)
2327 nl = [self.sstore.GetMasterNode()]
2330 def CheckPrereq(self):
2331 """Check prerequisites.
2333 This checks that the instance is in the cluster.
2336 instance = self.cfg.GetInstanceInfo(
2337 self.cfg.ExpandInstanceName(self.op.instance_name))
2338 if instance is None:
2339 raise errors.OpPrereqError("Instance '%s' not known" %
2340 self.op.instance_name)
2341 self.instance = instance
2343 def Exec(self, feedback_fn):
2344 """Remove the instance.
2347 instance = self.instance
2348 logger.Info("shutting down instance %s on node %s" %
2349 (instance.name, instance.primary_node))
2351 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2352 if self.op.ignore_failures:
2353 feedback_fn("Warning: can't shutdown instance")
2355 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2356 (instance.name, instance.primary_node))
2358 logger.Info("removing block devices for instance %s" % instance.name)
2360 if not _RemoveDisks(instance, self.cfg):
2361 if self.op.ignore_failures:
2362 feedback_fn("Warning: can't remove instance's disks")
2364 raise errors.OpExecError("Can't remove instance's disks")
2366 logger.Info("removing instance %s out of cluster config" % instance.name)
2368 self.cfg.RemoveInstance(instance.name)
2371 class LUQueryInstances(NoHooksLU):
2372 """Logical unit for querying instances.
2375 _OP_REQP = ["output_fields", "names"]
2377 def CheckPrereq(self):
2378 """Check prerequisites.
2380 This checks that the fields required are valid output fields.
2383 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2384 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2385 "admin_state", "admin_ram",
2386 "disk_template", "ip", "mac", "bridge",
2387 "sda_size", "sdb_size", "vcpus", "tags"],
2388 dynamic=self.dynamic_fields,
2389 selected=self.op.output_fields)
2391 self.wanted = _GetWantedInstances(self, self.op.names)
2393 def Exec(self, feedback_fn):
2394 """Computes the list of nodes and their attributes.
2397 instance_names = self.wanted
2398 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2401 # begin data gathering
2403 nodes = frozenset([inst.primary_node for inst in instance_list])
2406 if self.dynamic_fields.intersection(self.op.output_fields):
2408 node_data = rpc.call_all_instances_info(nodes)
2410 result = node_data[name]
2412 live_data.update(result)
2413 elif result == False:
2414 bad_nodes.append(name)
2415 # else no instance is alive
2417 live_data = dict([(name, {}) for name in instance_names])
2419 # end data gathering
2422 for instance in instance_list:
2424 for field in self.op.output_fields:
2429 elif field == "pnode":
2430 val = instance.primary_node
2431 elif field == "snodes":
2432 val = list(instance.secondary_nodes)
2433 elif field == "admin_state":
2434 val = (instance.status != "down")
2435 elif field == "oper_state":
2436 if instance.primary_node in bad_nodes:
2439 val = bool(live_data.get(instance.name))
2440 elif field == "status":
2441 if instance.primary_node in bad_nodes:
2442 val = "ERROR_nodedown"
2444 running = bool(live_data.get(instance.name))
2446 if instance.status != "down":
2451 if instance.status != "down":
2455 elif field == "admin_ram":
2456 val = instance.memory
2457 elif field == "oper_ram":
2458 if instance.primary_node in bad_nodes:
2460 elif instance.name in live_data:
2461 val = live_data[instance.name].get("memory", "?")
2464 elif field == "disk_template":
2465 val = instance.disk_template
2467 val = instance.nics[0].ip
2468 elif field == "bridge":
2469 val = instance.nics[0].bridge
2470 elif field == "mac":
2471 val = instance.nics[0].mac
2472 elif field == "sda_size" or field == "sdb_size":
2473 disk = instance.FindDisk(field[:3])
2478 elif field == "vcpus":
2479 val = instance.vcpus
2480 elif field == "tags":
2481 val = list(instance.GetTags())
2483 raise errors.ParameterError(field)
2490 class LUFailoverInstance(LogicalUnit):
2491 """Failover an instance.
2494 HPATH = "instance-failover"
2495 HTYPE = constants.HTYPE_INSTANCE
2496 _OP_REQP = ["instance_name", "ignore_consistency"]
2498 def BuildHooksEnv(self):
2501 This runs on master, primary and secondary nodes of the instance.
2505 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2507 env.update(_BuildInstanceHookEnvByObject(self.instance))
2508 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2511 def CheckPrereq(self):
2512 """Check prerequisites.
2514 This checks that the instance is in the cluster.
2517 instance = self.cfg.GetInstanceInfo(
2518 self.cfg.ExpandInstanceName(self.op.instance_name))
2519 if instance is None:
2520 raise errors.OpPrereqError("Instance '%s' not known" %
2521 self.op.instance_name)
2523 if instance.disk_template not in constants.DTS_NET_MIRROR:
2524 raise errors.OpPrereqError("Instance's disk layout is not"
2525 " network mirrored, cannot failover.")
2527 secondary_nodes = instance.secondary_nodes
2528 if not secondary_nodes:
2529 raise errors.ProgrammerError("no secondary node but using "
2530 "a mirrored disk template")
2532 target_node = secondary_nodes[0]
2533 # check memory requirements on the secondary node
2534 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2535 instance.name, instance.memory)
2537 # check bridge existance
2538 brlist = [nic.bridge for nic in instance.nics]
2539 if not rpc.call_bridges_exist(target_node, brlist):
2540 raise errors.OpPrereqError("One or more target bridges %s does not"
2541 " exist on destination node '%s'" %
2542 (brlist, target_node))
2544 self.instance = instance
2546 def Exec(self, feedback_fn):
2547 """Failover an instance.
2549 The failover is done by shutting it down on its present node and
2550 starting it on the secondary.
2553 instance = self.instance
2555 source_node = instance.primary_node
2556 target_node = instance.secondary_nodes[0]
2558 feedback_fn("* checking disk consistency between source and target")
2559 for dev in instance.disks:
2560 # for drbd, these are drbd over lvm
2561 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2562 if instance.status == "up" and not self.op.ignore_consistency:
2563 raise errors.OpExecError("Disk %s is degraded on target node,"
2564 " aborting failover." % dev.iv_name)
2566 feedback_fn("* shutting down instance on source node")
2567 logger.Info("Shutting down instance %s on node %s" %
2568 (instance.name, source_node))
2570 if not rpc.call_instance_shutdown(source_node, instance):
2571 if self.op.ignore_consistency:
2572 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2573 " anyway. Please make sure node %s is down" %
2574 (instance.name, source_node, source_node))
2576 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2577 (instance.name, source_node))
2579 feedback_fn("* deactivating the instance's disks on source node")
2580 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2581 raise errors.OpExecError("Can't shut down the instance's disks.")
2583 instance.primary_node = target_node
2584 # distribute new instance config to the other nodes
2585 self.cfg.Update(instance)
2587 # Only start the instance if it's marked as up
2588 if instance.status == "up":
2589 feedback_fn("* activating the instance's disks on target node")
2590 logger.Info("Starting instance %s on node %s" %
2591 (instance.name, target_node))
2593 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2594 ignore_secondaries=True)
2596 _ShutdownInstanceDisks(instance, self.cfg)
2597 raise errors.OpExecError("Can't activate the instance's disks")
2599 feedback_fn("* starting the instance on the target node")
2600 if not rpc.call_instance_start(target_node, instance, None):
2601 _ShutdownInstanceDisks(instance, self.cfg)
2602 raise errors.OpExecError("Could not start instance %s on node %s." %
2603 (instance.name, target_node))
2606 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2607 """Create a tree of block devices on the primary node.
2609 This always creates all devices.
2613 for child in device.children:
2614 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2617 cfg.SetDiskID(device, node)
2618 new_id = rpc.call_blockdev_create(node, device, device.size,
2619 instance.name, True, info)
2622 if device.physical_id is None:
2623 device.physical_id = new_id
2627 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2628 """Create a tree of block devices on a secondary node.
2630 If this device type has to be created on secondaries, create it and
2633 If not, just recurse to children keeping the same 'force' value.
2636 if device.CreateOnSecondary():
2639 for child in device.children:
2640 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2641 child, force, info):
2646 cfg.SetDiskID(device, node)
2647 new_id = rpc.call_blockdev_create(node, device, device.size,
2648 instance.name, False, info)
2651 if device.physical_id is None:
2652 device.physical_id = new_id
2656 def _GenerateUniqueNames(cfg, exts):
2657 """Generate a suitable LV name.
2659 This will generate a logical volume name for the given instance.
2664 new_id = cfg.GenerateUniqueID()
2665 results.append("%s%s" % (new_id, val))
2669 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2670 """Generate a drbd8 device complete with its children.
2673 port = cfg.AllocatePort()
2674 vgname = cfg.GetVGName()
2675 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2676 logical_id=(vgname, names[0]))
2677 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2678 logical_id=(vgname, names[1]))
2679 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2680 logical_id = (primary, secondary, port),
2681 children = [dev_data, dev_meta],
2686 def _GenerateDiskTemplate(cfg, template_name,
2687 instance_name, primary_node,
2688 secondary_nodes, disk_sz, swap_sz,
2689 file_storage_dir, file_driver):
2690 """Generate the entire disk layout for a given template type.
2693 #TODO: compute space requirements
2695 vgname = cfg.GetVGName()
2696 if template_name == constants.DT_DISKLESS:
2698 elif template_name == constants.DT_PLAIN:
2699 if len(secondary_nodes) != 0:
2700 raise errors.ProgrammerError("Wrong template configuration")
2702 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704 logical_id=(vgname, names[0]),
2706 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707 logical_id=(vgname, names[1]),
2709 disks = [sda_dev, sdb_dev]
2710 elif template_name == constants.DT_DRBD8:
2711 if len(secondary_nodes) != 1:
2712 raise errors.ProgrammerError("Wrong template configuration")
2713 remote_node = secondary_nodes[0]
2714 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2715 ".sdb_data", ".sdb_meta"])
2716 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2717 disk_sz, names[0:2], "sda")
2718 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2719 swap_sz, names[2:4], "sdb")
2720 disks = [drbd_sda_dev, drbd_sdb_dev]
2721 elif template_name == constants.DT_FILE:
2722 if len(secondary_nodes) != 0:
2723 raise errors.ProgrammerError("Wrong template configuration")
2725 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2726 iv_name="sda", logical_id=(file_driver,
2727 "%s/sda" % file_storage_dir))
2728 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2729 iv_name="sdb", logical_id=(file_driver,
2730 "%s/sdb" % file_storage_dir))
2731 disks = [file_sda_dev, file_sdb_dev]
2733 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2737 def _GetInstanceInfoText(instance):
2738 """Compute that text that should be added to the disk's metadata.
2741 return "originstname+%s" % instance.name
2744 def _CreateDisks(cfg, instance):
2745 """Create all disks for an instance.
2747 This abstracts away some work from AddInstance.
2750 instance: the instance object
2753 True or False showing the success of the creation process
2756 info = _GetInstanceInfoText(instance)
2758 if instance.disk_template == constants.DT_FILE:
2759 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2760 result = rpc.call_file_storage_dir_create(instance.primary_node,
2764 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2768 logger.Error("failed to create directory '%s'" % file_storage_dir)
2771 for device in instance.disks:
2772 logger.Info("creating volume %s for instance %s" %
2773 (device.iv_name, instance.name))
2775 for secondary_node in instance.secondary_nodes:
2776 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777 device, False, info):
2778 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779 (device.iv_name, device, secondary_node))
2782 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783 instance, device, info):
2784 logger.Error("failed to create volume %s on primary!" %
2791 def _RemoveDisks(instance, cfg):
2792 """Remove all disks for an instance.
2794 This abstracts away some work from `AddInstance()` and
2795 `RemoveInstance()`. Note that in case some of the devices couldn't
2796 be removed, the removal will continue with the other ones (compare
2797 with `_CreateDisks()`).
2800 instance: the instance object
2803 True or False showing the success of the removal proces
2806 logger.Info("removing block devices for instance %s" % instance.name)
2809 for device in instance.disks:
2810 for node, disk in device.ComputeNodeTree(instance.primary_node):
2811 cfg.SetDiskID(disk, node)
2812 if not rpc.call_blockdev_remove(node, disk):
2813 logger.Error("could not remove block device %s on node %s,"
2814 " continuing anyway" %
2815 (device.iv_name, node))
2818 if instance.disk_template == constants.DT_FILE:
2819 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2820 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2822 logger.Error("could not remove directory '%s'" % file_storage_dir)
2828 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2829 """Compute disk size requirements in the volume group
2831 This is currently hard-coded for the two-drive layout.
2834 # Required free disk space as a function of disk and swap space
2836 constants.DT_DISKLESS: None,
2837 constants.DT_PLAIN: disk_size + swap_size,
2838 # 256 MB are added for drbd metadata, 128MB for each drbd device
2839 constants.DT_DRBD8: disk_size + swap_size + 256,
2840 constants.DT_FILE: None,
2843 if disk_template not in req_size_dict:
2844 raise errors.ProgrammerError("Disk template '%s' size requirement"
2845 " is unknown" % disk_template)
2847 return req_size_dict[disk_template]
2850 class LUCreateInstance(LogicalUnit):
2851 """Create an instance.
2854 HPATH = "instance-add"
2855 HTYPE = constants.HTYPE_INSTANCE
2856 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2857 "disk_template", "swap_size", "mode", "start", "vcpus",
2858 "wait_for_sync", "ip_check", "mac"]
2860 def _RunAllocator(self):
2861 """Run the allocator based on input opcode.
2864 disks = [{"size": self.op.disk_size, "mode": "w"},
2865 {"size": self.op.swap_size, "mode": "w"}]
2866 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2867 "bridge": self.op.bridge}]
2868 ial = IAllocator(self.cfg, self.sstore,
2869 mode=constants.IALLOCATOR_MODE_ALLOC,
2870 name=self.op.instance_name,
2871 disk_template=self.op.disk_template,
2874 vcpus=self.op.vcpus,
2875 mem_size=self.op.mem_size,
2880 ial.Run(self.op.iallocator)
2883 raise errors.OpPrereqError("Can't compute nodes using"
2884 " iallocator '%s': %s" % (self.op.iallocator,
2886 if len(ial.nodes) != ial.required_nodes:
2887 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2888 " of nodes (%s), required %s" %
2889 (len(ial.nodes), ial.required_nodes))
2890 self.op.pnode = ial.nodes[0]
2891 logger.ToStdout("Selected nodes for the instance: %s" %
2892 (", ".join(ial.nodes),))
2893 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2894 (self.op.instance_name, self.op.iallocator, ial.nodes))
2895 if ial.required_nodes == 2:
2896 self.op.snode = ial.nodes[1]
2898 def BuildHooksEnv(self):
2901 This runs on master, primary and secondary nodes of the instance.
2905 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2906 "INSTANCE_DISK_SIZE": self.op.disk_size,
2907 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2908 "INSTANCE_ADD_MODE": self.op.mode,
2910 if self.op.mode == constants.INSTANCE_IMPORT:
2911 env["INSTANCE_SRC_NODE"] = self.op.src_node
2912 env["INSTANCE_SRC_PATH"] = self.op.src_path
2913 env["INSTANCE_SRC_IMAGE"] = self.src_image
2915 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2916 primary_node=self.op.pnode,
2917 secondary_nodes=self.secondaries,
2918 status=self.instance_status,
2919 os_type=self.op.os_type,
2920 memory=self.op.mem_size,
2921 vcpus=self.op.vcpus,
2922 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2925 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2930 def CheckPrereq(self):
2931 """Check prerequisites.
2934 # set optional parameters to none if they don't exist
2935 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2936 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2937 "vnc_bind_address"]:
2938 if not hasattr(self.op, attr):
2939 setattr(self.op, attr, None)
2941 if self.op.mode not in (constants.INSTANCE_CREATE,
2942 constants.INSTANCE_IMPORT):
2943 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2946 if (not self.cfg.GetVGName() and
2947 self.op.disk_template not in constants.DTS_NOT_LVM):
2948 raise errors.OpPrereqError("Cluster does not support lvm-based"
2951 if self.op.mode == constants.INSTANCE_IMPORT:
2952 src_node = getattr(self.op, "src_node", None)
2953 src_path = getattr(self.op, "src_path", None)
2954 if src_node is None or src_path is None:
2955 raise errors.OpPrereqError("Importing an instance requires source"
2956 " node and path options")
2957 src_node_full = self.cfg.ExpandNodeName(src_node)
2958 if src_node_full is None:
2959 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2960 self.op.src_node = src_node = src_node_full
2962 if not os.path.isabs(src_path):
2963 raise errors.OpPrereqError("The source path must be absolute")
2965 export_info = rpc.call_export_info(src_node, src_path)
2968 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2970 if not export_info.has_section(constants.INISECT_EXP):
2971 raise errors.ProgrammerError("Corrupted export config")
2973 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2974 if (int(ei_version) != constants.EXPORT_VERSION):
2975 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2976 (ei_version, constants.EXPORT_VERSION))
2978 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2979 raise errors.OpPrereqError("Can't import instance with more than"
2982 # FIXME: are the old os-es, disk sizes, etc. useful?
2983 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2984 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2986 self.src_image = diskimage
2987 else: # INSTANCE_CREATE
2988 if getattr(self.op, "os_type", None) is None:
2989 raise errors.OpPrereqError("No guest OS specified")
2991 #### instance parameters check
2993 # disk template and mirror node verification
2994 if self.op.disk_template not in constants.DISK_TEMPLATES:
2995 raise errors.OpPrereqError("Invalid disk template name")
2997 # instance name verification
2998 hostname1 = utils.HostInfo(self.op.instance_name)
3000 self.op.instance_name = instance_name = hostname1.name
3001 instance_list = self.cfg.GetInstanceList()
3002 if instance_name in instance_list:
3003 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3006 # ip validity checks
3007 ip = getattr(self.op, "ip", None)
3008 if ip is None or ip.lower() == "none":
3010 elif ip.lower() == "auto":
3011 inst_ip = hostname1.ip
3013 if not utils.IsValidIP(ip):
3014 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3015 " like a valid IP" % ip)
3017 self.inst_ip = self.op.ip = inst_ip
3019 if self.op.start and not self.op.ip_check:
3020 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3021 " adding an instance in start mode")
3023 if self.op.ip_check:
3024 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3025 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3026 (hostname1.ip, instance_name))
3028 # MAC address verification
3029 if self.op.mac != "auto":
3030 if not utils.IsValidMac(self.op.mac.lower()):
3031 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3034 # bridge verification
3035 bridge = getattr(self.op, "bridge", None)
3037 self.op.bridge = self.cfg.GetDefBridge()
3039 self.op.bridge = bridge
3041 # boot order verification
3042 if self.op.hvm_boot_order is not None:
3043 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3044 raise errors.OpPrereqError("invalid boot order specified,"
3045 " must be one or more of [acdn]")
3046 # file storage checks
3047 if (self.op.file_driver and
3048 not self.op.file_driver in constants.FILE_DRIVER):
3049 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3050 self.op.file_driver)
3052 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3053 raise errors.OpPrereqError("File storage directory not a relative"
3057 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3058 raise errors.OpPrereqError("One and only one of iallocator and primary"
3059 " node must be given")
3061 if self.op.iallocator is not None:
3062 self._RunAllocator()
3064 #### node related checks
3066 # check primary node
3067 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3069 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3071 self.op.pnode = pnode.name
3073 self.secondaries = []
3075 # mirror node verification
3076 if self.op.disk_template in constants.DTS_NET_MIRROR:
3077 if getattr(self.op, "snode", None) is None:
3078 raise errors.OpPrereqError("The networked disk templates need"
3081 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3082 if snode_name is None:
3083 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3085 elif snode_name == pnode.name:
3086 raise errors.OpPrereqError("The secondary node cannot be"
3087 " the primary node.")
3088 self.secondaries.append(snode_name)
3090 req_size = _ComputeDiskSize(self.op.disk_template,
3091 self.op.disk_size, self.op.swap_size)
3093 # Check lv size requirements
3094 if req_size is not None:
3095 nodenames = [pnode.name] + self.secondaries
3096 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3097 for node in nodenames:
3098 info = nodeinfo.get(node, None)
3100 raise errors.OpPrereqError("Cannot get current information"
3101 " from node '%s'" % node)
3102 vg_free = info.get('vg_free', None)
3103 if not isinstance(vg_free, int):
3104 raise errors.OpPrereqError("Can't compute free disk space on"
3106 if req_size > info['vg_free']:
3107 raise errors.OpPrereqError("Not enough disk space on target node %s."
3108 " %d MB available, %d MB required" %
3109 (node, info['vg_free'], req_size))
3112 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3114 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3115 " primary node" % self.op.os_type)
3117 if self.op.kernel_path == constants.VALUE_NONE:
3118 raise errors.OpPrereqError("Can't set instance kernel to none")
3121 # bridge check on primary node
3122 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3123 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3124 " destination node '%s'" %
3125 (self.op.bridge, pnode.name))
3127 # memory check on primary node
3129 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3130 "creating instance %s" % self.op.instance_name,
3133 # hvm_cdrom_image_path verification
3134 if self.op.hvm_cdrom_image_path is not None:
3135 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3136 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3137 " be an absolute path or None, not %s" %
3138 self.op.hvm_cdrom_image_path)
3139 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3140 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3141 " regular file or a symlink pointing to"
3142 " an existing regular file, not %s" %
3143 self.op.hvm_cdrom_image_path)
3145 # vnc_bind_address verification
3146 if self.op.vnc_bind_address is not None:
3147 if not utils.IsValidIP(self.op.vnc_bind_address):
3148 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3149 " like a valid IP address" %
3150 self.op.vnc_bind_address)
3153 self.instance_status = 'up'
3155 self.instance_status = 'down'
3157 def Exec(self, feedback_fn):
3158 """Create and add the instance to the cluster.
3161 instance = self.op.instance_name
3162 pnode_name = self.pnode.name
3164 if self.op.mac == "auto":
3165 mac_address = self.cfg.GenerateMAC()
3167 mac_address = self.op.mac
3169 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3170 if self.inst_ip is not None:
3171 nic.ip = self.inst_ip
3173 ht_kind = self.sstore.GetHypervisorType()
3174 if ht_kind in constants.HTS_REQ_PORT:
3175 network_port = self.cfg.AllocatePort()
3179 if self.op.vnc_bind_address is None:
3180 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3182 # this is needed because os.path.join does not accept None arguments
3183 if self.op.file_storage_dir is None:
3184 string_file_storage_dir = ""
3186 string_file_storage_dir = self.op.file_storage_dir
3188 # build the full file storage dir path
3189 file_storage_dir = os.path.normpath(os.path.join(
3190 self.sstore.GetFileStorageDir(),
3191 string_file_storage_dir, instance))
3194 disks = _GenerateDiskTemplate(self.cfg,
3195 self.op.disk_template,
3196 instance, pnode_name,
3197 self.secondaries, self.op.disk_size,
3200 self.op.file_driver)
3202 iobj = objects.Instance(name=instance, os=self.op.os_type,
3203 primary_node=pnode_name,
3204 memory=self.op.mem_size,
3205 vcpus=self.op.vcpus,
3206 nics=[nic], disks=disks,
3207 disk_template=self.op.disk_template,
3208 status=self.instance_status,
3209 network_port=network_port,
3210 kernel_path=self.op.kernel_path,
3211 initrd_path=self.op.initrd_path,
3212 hvm_boot_order=self.op.hvm_boot_order,
3213 hvm_acpi=self.op.hvm_acpi,
3214 hvm_pae=self.op.hvm_pae,
3215 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3216 vnc_bind_address=self.op.vnc_bind_address,
3219 feedback_fn("* creating instance disks...")
3220 if not _CreateDisks(self.cfg, iobj):
3221 _RemoveDisks(iobj, self.cfg)
3222 raise errors.OpExecError("Device creation failed, reverting...")
3224 feedback_fn("adding instance %s to cluster config" % instance)
3226 self.cfg.AddInstance(iobj)
3228 if self.op.wait_for_sync:
3229 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3230 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3231 # make sure the disks are not degraded (still sync-ing is ok)
3233 feedback_fn("* checking mirrors status")
3234 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3239 _RemoveDisks(iobj, self.cfg)
3240 self.cfg.RemoveInstance(iobj.name)
3241 raise errors.OpExecError("There are some degraded disks for"
3244 feedback_fn("creating os for instance %s on node %s" %
3245 (instance, pnode_name))
3247 if iobj.disk_template != constants.DT_DISKLESS:
3248 if self.op.mode == constants.INSTANCE_CREATE:
3249 feedback_fn("* running the instance OS create scripts...")
3250 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3251 raise errors.OpExecError("could not add os for instance %s"
3253 (instance, pnode_name))
3255 elif self.op.mode == constants.INSTANCE_IMPORT:
3256 feedback_fn("* running the instance OS import scripts...")
3257 src_node = self.op.src_node
3258 src_image = self.src_image
3259 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3260 src_node, src_image):
3261 raise errors.OpExecError("Could not import os for instance"
3263 (instance, pnode_name))
3265 # also checked in the prereq part
3266 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3270 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3271 feedback_fn("* starting instance...")
3272 if not rpc.call_instance_start(pnode_name, iobj, None):
3273 raise errors.OpExecError("Could not start instance")
3276 class LUConnectConsole(NoHooksLU):
3277 """Connect to an instance's console.
3279 This is somewhat special in that it returns the command line that
3280 you need to run on the master node in order to connect to the
3284 _OP_REQP = ["instance_name"]
3286 def CheckPrereq(self):
3287 """Check prerequisites.
3289 This checks that the instance is in the cluster.
3292 instance = self.cfg.GetInstanceInfo(
3293 self.cfg.ExpandInstanceName(self.op.instance_name))
3294 if instance is None:
3295 raise errors.OpPrereqError("Instance '%s' not known" %
3296 self.op.instance_name)
3297 self.instance = instance
3299 def Exec(self, feedback_fn):
3300 """Connect to the console of an instance
3303 instance = self.instance
3304 node = instance.primary_node
3306 node_insts = rpc.call_instance_list([node])[node]
3307 if node_insts is False:
3308 raise errors.OpExecError("Can't connect to node %s." % node)
3310 if instance.name not in node_insts:
3311 raise errors.OpExecError("Instance %s is not running." % instance.name)
3313 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3315 hyper = hypervisor.GetHypervisor()
3316 console_cmd = hyper.GetShellCommandForConsole(instance)
3319 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3322 class LUReplaceDisks(LogicalUnit):
3323 """Replace the disks of an instance.
3326 HPATH = "mirrors-replace"
3327 HTYPE = constants.HTYPE_INSTANCE
3328 _OP_REQP = ["instance_name", "mode", "disks"]
3330 def _RunAllocator(self):
3331 """Compute a new secondary node using an IAllocator.
3334 ial = IAllocator(self.cfg, self.sstore,
3335 mode=constants.IALLOCATOR_MODE_RELOC,
3336 name=self.op.instance_name,
3337 relocate_from=[self.sec_node])
3339 ial.Run(self.op.iallocator)
3342 raise errors.OpPrereqError("Can't compute nodes using"
3343 " iallocator '%s': %s" % (self.op.iallocator,
3345 if len(ial.nodes) != ial.required_nodes:
3346 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3347 " of nodes (%s), required %s" %
3348 (len(ial.nodes), ial.required_nodes))
3349 self.op.remote_node = ial.nodes[0]
3350 logger.ToStdout("Selected new secondary for the instance: %s" %
3351 self.op.remote_node)
3353 def BuildHooksEnv(self):
3356 This runs on the master, the primary and all the secondaries.
3360 "MODE": self.op.mode,
3361 "NEW_SECONDARY": self.op.remote_node,
3362 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3364 env.update(_BuildInstanceHookEnvByObject(self.instance))
3366 self.sstore.GetMasterNode(),
3367 self.instance.primary_node,
3369 if self.op.remote_node is not None:
3370 nl.append(self.op.remote_node)
3373 def CheckPrereq(self):
3374 """Check prerequisites.
3376 This checks that the instance is in the cluster.
3379 if not hasattr(self.op, "remote_node"):
3380 self.op.remote_node = None
3382 instance = self.cfg.GetInstanceInfo(
3383 self.cfg.ExpandInstanceName(self.op.instance_name))
3384 if instance is None:
3385 raise errors.OpPrereqError("Instance '%s' not known" %
3386 self.op.instance_name)
3387 self.instance = instance
3388 self.op.instance_name = instance.name
3390 if instance.disk_template not in constants.DTS_NET_MIRROR:
3391 raise errors.OpPrereqError("Instance's disk layout is not"
3392 " network mirrored.")
3394 if len(instance.secondary_nodes) != 1:
3395 raise errors.OpPrereqError("The instance has a strange layout,"
3396 " expected one secondary but found %d" %
3397 len(instance.secondary_nodes))
3399 self.sec_node = instance.secondary_nodes[0]
3401 ia_name = getattr(self.op, "iallocator", None)
3402 if ia_name is not None:
3403 if self.op.remote_node is not None:
3404 raise errors.OpPrereqError("Give either the iallocator or the new"
3405 " secondary, not both")
3406 self.op.remote_node = self._RunAllocator()
3408 remote_node = self.op.remote_node
3409 if remote_node is not None:
3410 remote_node = self.cfg.ExpandNodeName(remote_node)
3411 if remote_node is None:
3412 raise errors.OpPrereqError("Node '%s' not known" %
3413 self.op.remote_node)
3414 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3416 self.remote_node_info = None
3417 if remote_node == instance.primary_node:
3418 raise errors.OpPrereqError("The specified node is the primary node of"
3420 elif remote_node == self.sec_node:
3421 if self.op.mode == constants.REPLACE_DISK_SEC:
3422 # this is for DRBD8, where we can't execute the same mode of
3423 # replacement as for drbd7 (no different port allocated)
3424 raise errors.OpPrereqError("Same secondary given, cannot execute"
3426 if instance.disk_template == constants.DT_DRBD8:
3427 if (self.op.mode == constants.REPLACE_DISK_ALL and
3428 remote_node is not None):
3429 # switch to replace secondary mode
3430 self.op.mode = constants.REPLACE_DISK_SEC
3432 if self.op.mode == constants.REPLACE_DISK_ALL:
3433 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3434 " secondary disk replacement, not"
3436 elif self.op.mode == constants.REPLACE_DISK_PRI:
3437 if remote_node is not None:
3438 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3439 " the secondary while doing a primary"
3440 " node disk replacement")
3441 self.tgt_node = instance.primary_node
3442 self.oth_node = instance.secondary_nodes[0]
3443 elif self.op.mode == constants.REPLACE_DISK_SEC:
3444 self.new_node = remote_node # this can be None, in which case
3445 # we don't change the secondary
3446 self.tgt_node = instance.secondary_nodes[0]
3447 self.oth_node = instance.primary_node
3449 raise errors.ProgrammerError("Unhandled disk replace mode")
3451 for name in self.op.disks:
3452 if instance.FindDisk(name) is None:
3453 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3454 (name, instance.name))
3455 self.op.remote_node = remote_node
3457 def _ExecD8DiskOnly(self, feedback_fn):
3458 """Replace a disk on the primary or secondary for dbrd8.
3460 The algorithm for replace is quite complicated:
3461 - for each disk to be replaced:
3462 - create new LVs on the target node with unique names
3463 - detach old LVs from the drbd device
3464 - rename old LVs to name_replaced.<time_t>
3465 - rename new LVs to old LVs
3466 - attach the new LVs (with the old names now) to the drbd device
3467 - wait for sync across all devices
3468 - for each modified disk:
3469 - remove old LVs (which have the name name_replaces.<time_t>)
3471 Failures are not very well handled.
3475 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3476 instance = self.instance
3478 vgname = self.cfg.GetVGName()
3481 tgt_node = self.tgt_node
3482 oth_node = self.oth_node
3484 # Step: check device activation
3485 self.proc.LogStep(1, steps_total, "check device existence")
3486 info("checking volume groups")
3487 my_vg = cfg.GetVGName()
3488 results = rpc.call_vg_list([oth_node, tgt_node])
3490 raise errors.OpExecError("Can't list volume groups on the nodes")
3491 for node in oth_node, tgt_node:
3492 res = results.get(node, False)
3493 if not res or my_vg not in res:
3494 raise errors.OpExecError("Volume group '%s' not found on %s" %
3496 for dev in instance.disks:
3497 if not dev.iv_name in self.op.disks:
3499 for node in tgt_node, oth_node:
3500 info("checking %s on %s" % (dev.iv_name, node))
3501 cfg.SetDiskID(dev, node)
3502 if not rpc.call_blockdev_find(node, dev):
3503 raise errors.OpExecError("Can't find device %s on node %s" %
3504 (dev.iv_name, node))
3506 # Step: check other node consistency
3507 self.proc.LogStep(2, steps_total, "check peer consistency")
3508 for dev in instance.disks:
3509 if not dev.iv_name in self.op.disks:
3511 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3512 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3513 oth_node==instance.primary_node):
3514 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3515 " to replace disks on this node (%s)" %
3516 (oth_node, tgt_node))
3518 # Step: create new storage
3519 self.proc.LogStep(3, steps_total, "allocate new storage")
3520 for dev in instance.disks:
3521 if not dev.iv_name in self.op.disks:
3524 cfg.SetDiskID(dev, tgt_node)
3525 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3526 names = _GenerateUniqueNames(cfg, lv_names)
3527 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3528 logical_id=(vgname, names[0]))
3529 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3530 logical_id=(vgname, names[1]))
3531 new_lvs = [lv_data, lv_meta]
3532 old_lvs = dev.children
3533 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3534 info("creating new local storage on %s for %s" %
3535 (tgt_node, dev.iv_name))
3536 # since we *always* want to create this LV, we use the
3537 # _Create...OnPrimary (which forces the creation), even if we
3538 # are talking about the secondary node
3539 for new_lv in new_lvs:
3540 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3541 _GetInstanceInfoText(instance)):
3542 raise errors.OpExecError("Failed to create new LV named '%s' on"
3544 (new_lv.logical_id[1], tgt_node))
3546 # Step: for each lv, detach+rename*2+attach
3547 self.proc.LogStep(4, steps_total, "change drbd configuration")
3548 for dev, old_lvs, new_lvs in iv_names.itervalues():
3549 info("detaching %s drbd from local storage" % dev.iv_name)
3550 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3551 raise errors.OpExecError("Can't detach drbd from local storage on node"
3552 " %s for device %s" % (tgt_node, dev.iv_name))
3554 #cfg.Update(instance)
3556 # ok, we created the new LVs, so now we know we have the needed
3557 # storage; as such, we proceed on the target node to rename
3558 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3559 # using the assumption that logical_id == physical_id (which in
3560 # turn is the unique_id on that node)
3562 # FIXME(iustin): use a better name for the replaced LVs
3563 temp_suffix = int(time.time())
3564 ren_fn = lambda d, suff: (d.physical_id[0],
3565 d.physical_id[1] + "_replaced-%s" % suff)
3566 # build the rename list based on what LVs exist on the node
3568 for to_ren in old_lvs:
3569 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3570 if find_res is not None: # device exists
3571 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3573 info("renaming the old LVs on the target node")
3574 if not rpc.call_blockdev_rename(tgt_node, rlist):
3575 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3576 # now we rename the new LVs to the old LVs
3577 info("renaming the new LVs on the target node")
3578 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3579 if not rpc.call_blockdev_rename(tgt_node, rlist):
3580 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3582 for old, new in zip(old_lvs, new_lvs):
3583 new.logical_id = old.logical_id
3584 cfg.SetDiskID(new, tgt_node)
3586 for disk in old_lvs:
3587 disk.logical_id = ren_fn(disk, temp_suffix)
3588 cfg.SetDiskID(disk, tgt_node)
3590 # now that the new lvs have the old name, we can add them to the device
3591 info("adding new mirror component on %s" % tgt_node)
3592 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3593 for new_lv in new_lvs:
3594 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3595 warning("Can't rollback device %s", hint="manually cleanup unused"
3597 raise errors.OpExecError("Can't add local storage to drbd")
3599 dev.children = new_lvs
3600 cfg.Update(instance)
3602 # Step: wait for sync
3604 # this can fail as the old devices are degraded and _WaitForSync
3605 # does a combined result over all disks, so we don't check its
3607 self.proc.LogStep(5, steps_total, "sync devices")
3608 _WaitForSync(cfg, instance, self.proc, unlock=True)
3610 # so check manually all the devices
3611 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3612 cfg.SetDiskID(dev, instance.primary_node)
3613 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3615 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3617 # Step: remove old storage
3618 self.proc.LogStep(6, steps_total, "removing old storage")
3619 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3620 info("remove logical volumes for %s" % name)
3622 cfg.SetDiskID(lv, tgt_node)
3623 if not rpc.call_blockdev_remove(tgt_node, lv):
3624 warning("Can't remove old LV", hint="manually remove unused LVs")
3627 def _ExecD8Secondary(self, feedback_fn):
3628 """Replace the secondary node for drbd8.
3630 The algorithm for replace is quite complicated:
3631 - for all disks of the instance:
3632 - create new LVs on the new node with same names
3633 - shutdown the drbd device on the old secondary
3634 - disconnect the drbd network on the primary
3635 - create the drbd device on the new secondary
3636 - network attach the drbd on the primary, using an artifice:
3637 the drbd code for Attach() will connect to the network if it
3638 finds a device which is connected to the good local disks but
3640 - wait for sync across all devices
3641 - remove all disks from the old secondary
3643 Failures are not very well handled.
3647 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3648 instance = self.instance
3650 vgname = self.cfg.GetVGName()
3653 old_node = self.tgt_node
3654 new_node = self.new_node
3655 pri_node = instance.primary_node
3657 # Step: check device activation
3658 self.proc.LogStep(1, steps_total, "check device existence")
3659 info("checking volume groups")
3660 my_vg = cfg.GetVGName()
3661 results = rpc.call_vg_list([pri_node, new_node])
3663 raise errors.OpExecError("Can't list volume groups on the nodes")
3664 for node in pri_node, new_node:
3665 res = results.get(node, False)
3666 if not res or my_vg not in res:
3667 raise errors.OpExecError("Volume group '%s' not found on %s" %
3669 for dev in instance.disks:
3670 if not dev.iv_name in self.op.disks:
3672 info("checking %s on %s" % (dev.iv_name, pri_node))
3673 cfg.SetDiskID(dev, pri_node)
3674 if not rpc.call_blockdev_find(pri_node, dev):
3675 raise errors.OpExecError("Can't find device %s on node %s" %
3676 (dev.iv_name, pri_node))
3678 # Step: check other node consistency
3679 self.proc.LogStep(2, steps_total, "check peer consistency")
3680 for dev in instance.disks:
3681 if not dev.iv_name in self.op.disks:
3683 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3684 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3685 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3686 " unsafe to replace the secondary" %
3689 # Step: create new storage
3690 self.proc.LogStep(3, steps_total, "allocate new storage")
3691 for dev in instance.disks:
3693 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3694 # since we *always* want to create this LV, we use the
3695 # _Create...OnPrimary (which forces the creation), even if we
3696 # are talking about the secondary node
3697 for new_lv in dev.children:
3698 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3699 _GetInstanceInfoText(instance)):
3700 raise errors.OpExecError("Failed to create new LV named '%s' on"
3702 (new_lv.logical_id[1], new_node))
3704 iv_names[dev.iv_name] = (dev, dev.children)
3706 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3707 for dev in instance.disks:
3709 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3710 # create new devices on new_node
3711 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3712 logical_id=(pri_node, new_node,
3714 children=dev.children)
3715 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3717 _GetInstanceInfoText(instance)):
3718 raise errors.OpExecError("Failed to create new DRBD on"
3719 " node '%s'" % new_node)
3721 for dev in instance.disks:
3722 # we have new devices, shutdown the drbd on the old secondary
3723 info("shutting down drbd for %s on old node" % dev.iv_name)
3724 cfg.SetDiskID(dev, old_node)
3725 if not rpc.call_blockdev_shutdown(old_node, dev):
3726 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3727 hint="Please cleanup this device manually as soon as possible")
3729 info("detaching primary drbds from the network (=> standalone)")
3731 for dev in instance.disks:
3732 cfg.SetDiskID(dev, pri_node)
3733 # set the physical (unique in bdev terms) id to None, meaning
3734 # detach from network
3735 dev.physical_id = (None,) * len(dev.physical_id)
3736 # and 'find' the device, which will 'fix' it to match the
3738 if rpc.call_blockdev_find(pri_node, dev):
3741 warning("Failed to detach drbd %s from network, unusual case" %
3745 # no detaches succeeded (very unlikely)
3746 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3748 # if we managed to detach at least one, we update all the disks of
3749 # the instance to point to the new secondary
3750 info("updating instance configuration")
3751 for dev in instance.disks:
3752 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3753 cfg.SetDiskID(dev, pri_node)
3754 cfg.Update(instance)
3756 # and now perform the drbd attach
3757 info("attaching primary drbds to new secondary (standalone => connected)")
3759 for dev in instance.disks:
3760 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3761 # since the attach is smart, it's enough to 'find' the device,
3762 # it will automatically activate the network, if the physical_id
3764 cfg.SetDiskID(dev, pri_node)
3765 if not rpc.call_blockdev_find(pri_node, dev):
3766 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3767 "please do a gnt-instance info to see the status of disks")
3769 # this can fail as the old devices are degraded and _WaitForSync
3770 # does a combined result over all disks, so we don't check its
3772 self.proc.LogStep(5, steps_total, "sync devices")
3773 _WaitForSync(cfg, instance, self.proc, unlock=True)
3775 # so check manually all the devices
3776 for name, (dev, old_lvs) in iv_names.iteritems():
3777 cfg.SetDiskID(dev, pri_node)
3778 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3780 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3782 self.proc.LogStep(6, steps_total, "removing old storage")
3783 for name, (dev, old_lvs) in iv_names.iteritems():
3784 info("remove logical volumes for %s" % name)
3786 cfg.SetDiskID(lv, old_node)
3787 if not rpc.call_blockdev_remove(old_node, lv):
3788 warning("Can't remove LV on old secondary",
3789 hint="Cleanup stale volumes by hand")
3791 def Exec(self, feedback_fn):
3792 """Execute disk replacement.
3794 This dispatches the disk replacement to the appropriate handler.
3797 instance = self.instance
3799 # Activate the instance disks if we're replacing them on a down instance
3800 if instance.status == "down":
3801 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3802 self.proc.ChainOpCode(op)
3804 if instance.disk_template == constants.DT_DRBD8:
3805 if self.op.remote_node is None:
3806 fn = self._ExecD8DiskOnly
3808 fn = self._ExecD8Secondary
3810 raise errors.ProgrammerError("Unhandled disk replacement case")
3812 ret = fn(feedback_fn)
3814 # Deactivate the instance disks if we're replacing them on a down instance
3815 if instance.status == "down":
3816 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3817 self.proc.ChainOpCode(op)
3822 class LUGrowDisk(LogicalUnit):
3823 """Grow a disk of an instance.
3827 HTYPE = constants.HTYPE_INSTANCE
3828 _OP_REQP = ["instance_name", "disk", "amount"]
3830 def BuildHooksEnv(self):
3833 This runs on the master, the primary and all the secondaries.
3837 "DISK": self.op.disk,
3838 "AMOUNT": self.op.amount,
3840 env.update(_BuildInstanceHookEnvByObject(self.instance))
3842 self.sstore.GetMasterNode(),
3843 self.instance.primary_node,
3847 def CheckPrereq(self):
3848 """Check prerequisites.
3850 This checks that the instance is in the cluster.
3853 instance = self.cfg.GetInstanceInfo(
3854 self.cfg.ExpandInstanceName(self.op.instance_name))
3855 if instance is None:
3856 raise errors.OpPrereqError("Instance '%s' not known" %
3857 self.op.instance_name)
3858 self.instance = instance
3859 self.op.instance_name = instance.name
3861 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3862 raise errors.OpPrereqError("Instance's disk layout does not support"
3865 if instance.FindDisk(self.op.disk) is None:
3866 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3867 (self.op.disk, instance.name))
3869 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3870 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3871 for node in nodenames:
3872 info = nodeinfo.get(node, None)
3874 raise errors.OpPrereqError("Cannot get current information"
3875 " from node '%s'" % node)
3876 vg_free = info.get('vg_free', None)
3877 if not isinstance(vg_free, int):
3878 raise errors.OpPrereqError("Can't compute free disk space on"
3880 if self.op.amount > info['vg_free']:
3881 raise errors.OpPrereqError("Not enough disk space on target node %s:"
3882 " %d MiB available, %d MiB required" %
3883 (node, info['vg_free'], self.op.amount))
3885 def Exec(self, feedback_fn):
3886 """Execute disk grow.
3889 instance = self.instance
3890 disk = instance.FindDisk(self.op.disk)
3891 for node in (instance.secondary_nodes + (instance.primary_node,)):
3892 self.cfg.SetDiskID(disk, node)
3893 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3894 if not result or not isinstance(result, tuple) or len(result) != 2:
3895 raise errors.OpExecError("grow request failed to node %s" % node)
3897 raise errors.OpExecError("grow request failed to node %s: %s" %
3899 disk.RecordGrow(self.op.amount)
3900 self.cfg.Update(instance)
3904 class LUQueryInstanceData(NoHooksLU):
3905 """Query runtime instance data.
3908 _OP_REQP = ["instances"]
3910 def CheckPrereq(self):
3911 """Check prerequisites.
3913 This only checks the optional instance list against the existing names.
3916 if not isinstance(self.op.instances, list):
3917 raise errors.OpPrereqError("Invalid argument type 'instances'")
3918 if self.op.instances:
3919 self.wanted_instances = []
3920 names = self.op.instances
3922 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3923 if instance is None:
3924 raise errors.OpPrereqError("No such instance name '%s'" % name)
3925 self.wanted_instances.append(instance)
3927 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3928 in self.cfg.GetInstanceList()]
3932 def _ComputeDiskStatus(self, instance, snode, dev):
3933 """Compute block device status.
3936 self.cfg.SetDiskID(dev, instance.primary_node)
3937 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3938 if dev.dev_type in constants.LDS_DRBD:
3939 # we change the snode then (otherwise we use the one passed in)
3940 if dev.logical_id[0] == instance.primary_node:
3941 snode = dev.logical_id[1]
3943 snode = dev.logical_id[0]
3946 self.cfg.SetDiskID(dev, snode)
3947 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3952 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3953 for child in dev.children]
3958 "iv_name": dev.iv_name,
3959 "dev_type": dev.dev_type,
3960 "logical_id": dev.logical_id,
3961 "physical_id": dev.physical_id,
3962 "pstatus": dev_pstatus,
3963 "sstatus": dev_sstatus,
3964 "children": dev_children,
3969 def Exec(self, feedback_fn):
3970 """Gather and return data"""
3972 for instance in self.wanted_instances:
3973 remote_info = rpc.call_instance_info(instance.primary_node,
3975 if remote_info and "state" in remote_info:
3978 remote_state = "down"
3979 if instance.status == "down":
3980 config_state = "down"
3984 disks = [self._ComputeDiskStatus(instance, None, device)
3985 for device in instance.disks]
3988 "name": instance.name,
3989 "config_state": config_state,
3990 "run_state": remote_state,
3991 "pnode": instance.primary_node,
3992 "snodes": instance.secondary_nodes,
3994 "memory": instance.memory,
3995 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3997 "vcpus": instance.vcpus,
4000 htkind = self.sstore.GetHypervisorType()
4001 if htkind == constants.HT_XEN_PVM30:
4002 idict["kernel_path"] = instance.kernel_path
4003 idict["initrd_path"] = instance.initrd_path
4005 if htkind == constants.HT_XEN_HVM31:
4006 idict["hvm_boot_order"] = instance.hvm_boot_order
4007 idict["hvm_acpi"] = instance.hvm_acpi
4008 idict["hvm_pae"] = instance.hvm_pae
4009 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4011 if htkind in constants.HTS_REQ_PORT:
4012 idict["vnc_bind_address"] = instance.vnc_bind_address
4013 idict["network_port"] = instance.network_port
4015 result[instance.name] = idict
4020 class LUSetInstanceParams(LogicalUnit):
4021 """Modifies an instances's parameters.
4024 HPATH = "instance-modify"
4025 HTYPE = constants.HTYPE_INSTANCE
4026 _OP_REQP = ["instance_name"]
4028 def BuildHooksEnv(self):
4031 This runs on the master, primary and secondaries.
4036 args['memory'] = self.mem
4038 args['vcpus'] = self.vcpus
4039 if self.do_ip or self.do_bridge or self.mac:
4043 ip = self.instance.nics[0].ip
4045 bridge = self.bridge
4047 bridge = self.instance.nics[0].bridge
4051 mac = self.instance.nics[0].mac
4052 args['nics'] = [(ip, bridge, mac)]
4053 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4054 nl = [self.sstore.GetMasterNode(),
4055 self.instance.primary_node] + list(self.instance.secondary_nodes)
4058 def CheckPrereq(self):
4059 """Check prerequisites.
4061 This only checks the instance list against the existing names.
4064 self.mem = getattr(self.op, "mem", None)
4065 self.vcpus = getattr(self.op, "vcpus", None)
4066 self.ip = getattr(self.op, "ip", None)
4067 self.mac = getattr(self.op, "mac", None)
4068 self.bridge = getattr(self.op, "bridge", None)
4069 self.kernel_path = getattr(self.op, "kernel_path", None)
4070 self.initrd_path = getattr(self.op, "initrd_path", None)
4071 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4072 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4073 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4074 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4075 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4076 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4077 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4078 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4079 self.vnc_bind_address]
4080 if all_parms.count(None) == len(all_parms):
4081 raise errors.OpPrereqError("No changes submitted")
4082 if self.mem is not None:
4084 self.mem = int(self.mem)
4085 except ValueError, err:
4086 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4087 if self.vcpus is not None:
4089 self.vcpus = int(self.vcpus)
4090 except ValueError, err:
4091 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4092 if self.ip is not None:
4094 if self.ip.lower() == "none":
4097 if not utils.IsValidIP(self.ip):
4098 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4101 self.do_bridge = (self.bridge is not None)
4102 if self.mac is not None:
4103 if self.cfg.IsMacInUse(self.mac):
4104 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4106 if not utils.IsValidMac(self.mac):
4107 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4109 if self.kernel_path is not None:
4110 self.do_kernel_path = True
4111 if self.kernel_path == constants.VALUE_NONE:
4112 raise errors.OpPrereqError("Can't set instance to no kernel")
4114 if self.kernel_path != constants.VALUE_DEFAULT:
4115 if not os.path.isabs(self.kernel_path):
4116 raise errors.OpPrereqError("The kernel path must be an absolute"
4119 self.do_kernel_path = False
4121 if self.initrd_path is not None:
4122 self.do_initrd_path = True
4123 if self.initrd_path not in (constants.VALUE_NONE,
4124 constants.VALUE_DEFAULT):
4125 if not os.path.isabs(self.initrd_path):
4126 raise errors.OpPrereqError("The initrd path must be an absolute"
4129 self.do_initrd_path = False
4131 # boot order verification
4132 if self.hvm_boot_order is not None:
4133 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4134 if len(self.hvm_boot_order.strip("acdn")) != 0:
4135 raise errors.OpPrereqError("invalid boot order specified,"
4136 " must be one or more of [acdn]"
4139 # hvm_cdrom_image_path verification
4140 if self.op.hvm_cdrom_image_path is not None:
4141 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4142 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4143 " be an absolute path or None, not %s" %
4144 self.op.hvm_cdrom_image_path)
4145 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4146 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4147 " regular file or a symlink pointing to"
4148 " an existing regular file, not %s" %
4149 self.op.hvm_cdrom_image_path)
4151 # vnc_bind_address verification
4152 if self.op.vnc_bind_address is not None:
4153 if not utils.IsValidIP(self.op.vnc_bind_address):
4154 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4155 " like a valid IP address" %
4156 self.op.vnc_bind_address)
4158 instance = self.cfg.GetInstanceInfo(
4159 self.cfg.ExpandInstanceName(self.op.instance_name))
4160 if instance is None:
4161 raise errors.OpPrereqError("No such instance name '%s'" %
4162 self.op.instance_name)
4163 self.op.instance_name = instance.name
4164 self.instance = instance
4167 def Exec(self, feedback_fn):
4168 """Modifies an instance.
4170 All parameters take effect only at the next restart of the instance.
4173 instance = self.instance
4175 instance.memory = self.mem
4176 result.append(("mem", self.mem))
4178 instance.vcpus = self.vcpus
4179 result.append(("vcpus", self.vcpus))
4181 instance.nics[0].ip = self.ip
4182 result.append(("ip", self.ip))
4184 instance.nics[0].bridge = self.bridge
4185 result.append(("bridge", self.bridge))
4187 instance.nics[0].mac = self.mac
4188 result.append(("mac", self.mac))
4189 if self.do_kernel_path:
4190 instance.kernel_path = self.kernel_path
4191 result.append(("kernel_path", self.kernel_path))
4192 if self.do_initrd_path:
4193 instance.initrd_path = self.initrd_path
4194 result.append(("initrd_path", self.initrd_path))
4195 if self.hvm_boot_order:
4196 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4197 instance.hvm_boot_order = None
4199 instance.hvm_boot_order = self.hvm_boot_order
4200 result.append(("hvm_boot_order", self.hvm_boot_order))
4202 instance.hvm_acpi = self.hvm_acpi
4203 result.append(("hvm_acpi", self.hvm_acpi))
4205 instance.hvm_pae = self.hvm_pae
4206 result.append(("hvm_pae", self.hvm_pae))
4207 if self.hvm_cdrom_image_path:
4208 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4209 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4210 if self.vnc_bind_address:
4211 instance.vnc_bind_address = self.vnc_bind_address
4212 result.append(("vnc_bind_address", self.vnc_bind_address))
4214 self.cfg.AddInstance(instance)
4219 class LUQueryExports(NoHooksLU):
4220 """Query the exports list
4225 def CheckPrereq(self):
4226 """Check that the nodelist contains only existing nodes.
4229 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4231 def Exec(self, feedback_fn):
4232 """Compute the list of all the exported system images.
4235 a dictionary with the structure node->(export-list)
4236 where export-list is a list of the instances exported on
4240 return rpc.call_export_list(self.nodes)
4243 class LUExportInstance(LogicalUnit):
4244 """Export an instance to an image in the cluster.
4247 HPATH = "instance-export"
4248 HTYPE = constants.HTYPE_INSTANCE
4249 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4251 def BuildHooksEnv(self):
4254 This will run on the master, primary node and target node.
4258 "EXPORT_NODE": self.op.target_node,
4259 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4261 env.update(_BuildInstanceHookEnvByObject(self.instance))
4262 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4263 self.op.target_node]
4266 def CheckPrereq(self):
4267 """Check prerequisites.
4269 This checks that the instance and node names are valid.
4272 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4273 self.instance = self.cfg.GetInstanceInfo(instance_name)
4274 if self.instance is None:
4275 raise errors.OpPrereqError("Instance '%s' not found" %
4276 self.op.instance_name)
4279 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4280 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4282 if self.dst_node is None:
4283 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4284 self.op.target_node)
4285 self.op.target_node = self.dst_node.name
4287 # instance disk type verification
4288 for disk in self.instance.disks:
4289 if disk.dev_type == constants.LD_FILE:
4290 raise errors.OpPrereqError("Export not supported for instances with"
4291 " file-based disks")
4293 def Exec(self, feedback_fn):
4294 """Export an instance to an image in the cluster.
4297 instance = self.instance
4298 dst_node = self.dst_node
4299 src_node = instance.primary_node
4300 if self.op.shutdown:
4301 # shutdown the instance, but not the disks
4302 if not rpc.call_instance_shutdown(src_node, instance):
4303 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4304 (instance.name, src_node))
4306 vgname = self.cfg.GetVGName()
4311 for disk in instance.disks:
4312 if disk.iv_name == "sda":
4313 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4314 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4316 if not new_dev_name:
4317 logger.Error("could not snapshot block device %s on node %s" %
4318 (disk.logical_id[1], src_node))
4320 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4321 logical_id=(vgname, new_dev_name),
4322 physical_id=(vgname, new_dev_name),
4323 iv_name=disk.iv_name)
4324 snap_disks.append(new_dev)
4327 if self.op.shutdown and instance.status == "up":
4328 if not rpc.call_instance_start(src_node, instance, None):
4329 _ShutdownInstanceDisks(instance, self.cfg)
4330 raise errors.OpExecError("Could not start instance")
4332 # TODO: check for size
4334 for dev in snap_disks:
4335 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4336 logger.Error("could not export block device %s from node %s to node %s"
4337 % (dev.logical_id[1], src_node, dst_node.name))
4338 if not rpc.call_blockdev_remove(src_node, dev):
4339 logger.Error("could not remove snapshot block device %s from node %s" %
4340 (dev.logical_id[1], src_node))
4342 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4343 logger.Error("could not finalize export for instance %s on node %s" %
4344 (instance.name, dst_node.name))
4346 nodelist = self.cfg.GetNodeList()
4347 nodelist.remove(dst_node.name)
4349 # on one-node clusters nodelist will be empty after the removal
4350 # if we proceed the backup would be removed because OpQueryExports
4351 # substitutes an empty list with the full cluster node list.
4353 op = opcodes.OpQueryExports(nodes=nodelist)
4354 exportlist = self.proc.ChainOpCode(op)
4355 for node in exportlist:
4356 if instance.name in exportlist[node]:
4357 if not rpc.call_export_remove(node, instance.name):
4358 logger.Error("could not remove older export for instance %s"
4359 " on node %s" % (instance.name, node))
4362 class LURemoveExport(NoHooksLU):
4363 """Remove exports related to the named instance.
4366 _OP_REQP = ["instance_name"]
4368 def CheckPrereq(self):
4369 """Check prerequisites.
4373 def Exec(self, feedback_fn):
4374 """Remove any export.
4377 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4378 # If the instance was not found we'll try with the name that was passed in.
4379 # This will only work if it was an FQDN, though.
4381 if not instance_name:
4383 instance_name = self.op.instance_name
4385 op = opcodes.OpQueryExports(nodes=[])
4386 exportlist = self.proc.ChainOpCode(op)
4388 for node in exportlist:
4389 if instance_name in exportlist[node]:
4391 if not rpc.call_export_remove(node, instance_name):
4392 logger.Error("could not remove export for instance %s"
4393 " on node %s" % (instance_name, node))
4395 if fqdn_warn and not found:
4396 feedback_fn("Export not found. If trying to remove an export belonging"
4397 " to a deleted instance please use its Fully Qualified"
4401 class TagsLU(NoHooksLU):
4404 This is an abstract class which is the parent of all the other tags LUs.
4407 def CheckPrereq(self):
4408 """Check prerequisites.
4411 if self.op.kind == constants.TAG_CLUSTER:
4412 self.target = self.cfg.GetClusterInfo()
4413 elif self.op.kind == constants.TAG_NODE:
4414 name = self.cfg.ExpandNodeName(self.op.name)
4416 raise errors.OpPrereqError("Invalid node name (%s)" %
4419 self.target = self.cfg.GetNodeInfo(name)
4420 elif self.op.kind == constants.TAG_INSTANCE:
4421 name = self.cfg.ExpandInstanceName(self.op.name)
4423 raise errors.OpPrereqError("Invalid instance name (%s)" %
4426 self.target = self.cfg.GetInstanceInfo(name)
4428 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4432 class LUGetTags(TagsLU):
4433 """Returns the tags of a given object.
4436 _OP_REQP = ["kind", "name"]
4438 def Exec(self, feedback_fn):
4439 """Returns the tag list.
4442 return self.target.GetTags()
4445 class LUSearchTags(NoHooksLU):
4446 """Searches the tags for a given pattern.
4449 _OP_REQP = ["pattern"]
4451 def CheckPrereq(self):
4452 """Check prerequisites.
4454 This checks the pattern passed for validity by compiling it.
4458 self.re = re.compile(self.op.pattern)
4459 except re.error, err:
4460 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4461 (self.op.pattern, err))
4463 def Exec(self, feedback_fn):
4464 """Returns the tag list.
4468 tgts = [("/cluster", cfg.GetClusterInfo())]
4469 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4470 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4471 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4472 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4474 for path, target in tgts:
4475 for tag in target.GetTags():
4476 if self.re.search(tag):
4477 results.append((path, tag))
4481 class LUAddTags(TagsLU):
4482 """Sets a tag on a given object.
4485 _OP_REQP = ["kind", "name", "tags"]
4487 def CheckPrereq(self):
4488 """Check prerequisites.
4490 This checks the type and length of the tag name and value.
4493 TagsLU.CheckPrereq(self)
4494 for tag in self.op.tags:
4495 objects.TaggableObject.ValidateTag(tag)
4497 def Exec(self, feedback_fn):
4502 for tag in self.op.tags:
4503 self.target.AddTag(tag)
4504 except errors.TagError, err:
4505 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4507 self.cfg.Update(self.target)
4508 except errors.ConfigurationError:
4509 raise errors.OpRetryError("There has been a modification to the"
4510 " config file and the operation has been"
4511 " aborted. Please retry.")
4514 class LUDelTags(TagsLU):
4515 """Delete a list of tags from a given object.
4518 _OP_REQP = ["kind", "name", "tags"]
4520 def CheckPrereq(self):
4521 """Check prerequisites.
4523 This checks that we have the given tag.
4526 TagsLU.CheckPrereq(self)
4527 for tag in self.op.tags:
4528 objects.TaggableObject.ValidateTag(tag)
4529 del_tags = frozenset(self.op.tags)
4530 cur_tags = self.target.GetTags()
4531 if not del_tags <= cur_tags:
4532 diff_tags = del_tags - cur_tags
4533 diff_names = ["'%s'" % tag for tag in diff_tags]
4535 raise errors.OpPrereqError("Tag(s) %s not found" %
4536 (",".join(diff_names)))
4538 def Exec(self, feedback_fn):
4539 """Remove the tag from the object.
4542 for tag in self.op.tags:
4543 self.target.RemoveTag(tag)
4545 self.cfg.Update(self.target)
4546 except errors.ConfigurationError:
4547 raise errors.OpRetryError("There has been a modification to the"
4548 " config file and the operation has been"
4549 " aborted. Please retry.")
4551 class LUTestDelay(NoHooksLU):
4552 """Sleep for a specified amount of time.
4554 This LU sleeps on the master and/or nodes for a specified amount of
4558 _OP_REQP = ["duration", "on_master", "on_nodes"]
4560 def CheckPrereq(self):
4561 """Check prerequisites.
4563 This checks that we have a good list of nodes and/or the duration
4568 if self.op.on_nodes:
4569 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4571 def Exec(self, feedback_fn):
4572 """Do the actual sleep.
4575 if self.op.on_master:
4576 if not utils.TestDelay(self.op.duration):
4577 raise errors.OpExecError("Error during master delay test")
4578 if self.op.on_nodes:
4579 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4581 raise errors.OpExecError("Complete failure from rpc call")
4582 for node, node_result in result.items():
4584 raise errors.OpExecError("Failure during rpc call to node %s,"
4585 " result: %s" % (node, node_result))
4588 class IAllocator(object):
4589 """IAllocator framework.
4591 An IAllocator instance has three sets of attributes:
4592 - cfg/sstore that are needed to query the cluster
4593 - input data (all members of the _KEYS class attribute are required)
4594 - four buffer attributes (in|out_data|text), that represent the
4595 input (to the external script) in text and data structure format,
4596 and the output from it, again in two formats
4597 - the result variables from the script (success, info, nodes) for
4602 "mem_size", "disks", "disk_template",
4603 "os", "tags", "nics", "vcpus",
4609 def __init__(self, cfg, sstore, mode, name, **kwargs):
4611 self.sstore = sstore
4612 # init buffer variables
4613 self.in_text = self.out_text = self.in_data = self.out_data = None
4614 # init all input fields so that pylint is happy
4617 self.mem_size = self.disks = self.disk_template = None
4618 self.os = self.tags = self.nics = self.vcpus = None
4619 self.relocate_from = None
4621 self.required_nodes = None
4622 # init result fields
4623 self.success = self.info = self.nodes = None
4624 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4625 keyset = self._ALLO_KEYS
4626 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4627 keyset = self._RELO_KEYS
4629 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4630 " IAllocator" % self.mode)
4632 if key not in keyset:
4633 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4634 " IAllocator" % key)
4635 setattr(self, key, kwargs[key])
4637 if key not in kwargs:
4638 raise errors.ProgrammerError("Missing input parameter '%s' to"
4639 " IAllocator" % key)
4640 self._BuildInputData()
4642 def _ComputeClusterData(self):
4643 """Compute the generic allocator input data.
4645 This is the data that is independent of the actual operation.
4652 "cluster_name": self.sstore.GetClusterName(),
4653 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4654 "hypervisor_type": self.sstore.GetHypervisorType(),
4655 # we don't have job IDs
4658 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4662 node_list = cfg.GetNodeList()
4663 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4664 for nname in node_list:
4665 ninfo = cfg.GetNodeInfo(nname)
4666 if nname not in node_data or not isinstance(node_data[nname], dict):
4667 raise errors.OpExecError("Can't get data for node %s" % nname)
4668 remote_info = node_data[nname]
4669 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4670 'vg_size', 'vg_free', 'cpu_total']:
4671 if attr not in remote_info:
4672 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4675 remote_info[attr] = int(remote_info[attr])
4676 except ValueError, err:
4677 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4678 " %s" % (nname, attr, str(err)))
4679 # compute memory used by primary instances
4680 i_p_mem = i_p_up_mem = 0
4681 for iinfo in i_list:
4682 if iinfo.primary_node == nname:
4683 i_p_mem += iinfo.memory
4684 if iinfo.status == "up":
4685 i_p_up_mem += iinfo.memory
4687 # compute memory used by instances
4689 "tags": list(ninfo.GetTags()),
4690 "total_memory": remote_info['memory_total'],
4691 "reserved_memory": remote_info['memory_dom0'],
4692 "free_memory": remote_info['memory_free'],
4693 "i_pri_memory": i_p_mem,
4694 "i_pri_up_memory": i_p_up_mem,
4695 "total_disk": remote_info['vg_size'],
4696 "free_disk": remote_info['vg_free'],
4697 "primary_ip": ninfo.primary_ip,
4698 "secondary_ip": ninfo.secondary_ip,
4699 "total_cpus": remote_info['cpu_total'],
4701 node_results[nname] = pnr
4702 data["nodes"] = node_results
4706 for iinfo in i_list:
4707 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4708 for n in iinfo.nics]
4710 "tags": list(iinfo.GetTags()),
4711 "should_run": iinfo.status == "up",
4712 "vcpus": iinfo.vcpus,
4713 "memory": iinfo.memory,
4715 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4717 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4718 "disk_template": iinfo.disk_template,
4720 instance_data[iinfo.name] = pir
4722 data["instances"] = instance_data
4726 def _AddNewInstance(self):
4727 """Add new instance data to allocator structure.
4729 This in combination with _AllocatorGetClusterData will create the
4730 correct structure needed as input for the allocator.
4732 The checks for the completeness of the opcode must have already been
4737 if len(self.disks) != 2:
4738 raise errors.OpExecError("Only two-disk configurations supported")
4740 disk_space = _ComputeDiskSize(self.disk_template,
4741 self.disks[0]["size"], self.disks[1]["size"])
4743 if self.disk_template in constants.DTS_NET_MIRROR:
4744 self.required_nodes = 2
4746 self.required_nodes = 1
4750 "disk_template": self.disk_template,
4753 "vcpus": self.vcpus,
4754 "memory": self.mem_size,
4755 "disks": self.disks,
4756 "disk_space_total": disk_space,
4758 "required_nodes": self.required_nodes,
4760 data["request"] = request
4762 def _AddRelocateInstance(self):
4763 """Add relocate instance data to allocator structure.
4765 This in combination with _IAllocatorGetClusterData will create the
4766 correct structure needed as input for the allocator.
4768 The checks for the completeness of the opcode must have already been
4772 instance = self.cfg.GetInstanceInfo(self.name)
4773 if instance is None:
4774 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4775 " IAllocator" % self.name)
4777 if instance.disk_template not in constants.DTS_NET_MIRROR:
4778 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4780 if len(instance.secondary_nodes) != 1:
4781 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4783 self.required_nodes = 1
4785 disk_space = _ComputeDiskSize(instance.disk_template,
4786 instance.disks[0].size,
4787 instance.disks[1].size)
4792 "disk_space_total": disk_space,
4793 "required_nodes": self.required_nodes,
4794 "relocate_from": self.relocate_from,
4796 self.in_data["request"] = request
4798 def _BuildInputData(self):
4799 """Build input data structures.
4802 self._ComputeClusterData()
4804 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4805 self._AddNewInstance()
4807 self._AddRelocateInstance()
4809 self.in_text = serializer.Dump(self.in_data)
4811 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4812 """Run an instance allocator and return the results.
4817 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4819 if not isinstance(result, tuple) or len(result) != 4:
4820 raise errors.OpExecError("Invalid result from master iallocator runner")
4822 rcode, stdout, stderr, fail = result
4824 if rcode == constants.IARUN_NOTFOUND:
4825 raise errors.OpExecError("Can't find allocator '%s'" % name)
4826 elif rcode == constants.IARUN_FAILURE:
4827 raise errors.OpExecError("Instance allocator call failed: %s,"
4829 (fail, stdout+stderr))
4830 self.out_text = stdout
4832 self._ValidateResult()
4834 def _ValidateResult(self):
4835 """Process the allocator results.
4837 This will process and if successful save the result in
4838 self.out_data and the other parameters.
4842 rdict = serializer.Load(self.out_text)
4843 except Exception, err:
4844 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4846 if not isinstance(rdict, dict):
4847 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4849 for key in "success", "info", "nodes":
4850 if key not in rdict:
4851 raise errors.OpExecError("Can't parse iallocator results:"
4852 " missing key '%s'" % key)
4853 setattr(self, key, rdict[key])
4855 if not isinstance(rdict["nodes"], list):
4856 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4858 self.out_data = rdict
4861 class LUTestAllocator(NoHooksLU):
4862 """Run allocator tests.
4864 This LU runs the allocator tests
4867 _OP_REQP = ["direction", "mode", "name"]
4869 def CheckPrereq(self):
4870 """Check prerequisites.
4872 This checks the opcode parameters depending on the director and mode test.
4875 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4876 for attr in ["name", "mem_size", "disks", "disk_template",
4877 "os", "tags", "nics", "vcpus"]:
4878 if not hasattr(self.op, attr):
4879 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4881 iname = self.cfg.ExpandInstanceName(self.op.name)
4882 if iname is not None:
4883 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4885 if not isinstance(self.op.nics, list):
4886 raise errors.OpPrereqError("Invalid parameter 'nics'")
4887 for row in self.op.nics:
4888 if (not isinstance(row, dict) or
4891 "bridge" not in row):
4892 raise errors.OpPrereqError("Invalid contents of the"
4893 " 'nics' parameter")
4894 if not isinstance(self.op.disks, list):
4895 raise errors.OpPrereqError("Invalid parameter 'disks'")
4896 if len(self.op.disks) != 2:
4897 raise errors.OpPrereqError("Only two-disk configurations supported")
4898 for row in self.op.disks:
4899 if (not isinstance(row, dict) or
4900 "size" not in row or
4901 not isinstance(row["size"], int) or
4902 "mode" not in row or
4903 row["mode"] not in ['r', 'w']):
4904 raise errors.OpPrereqError("Invalid contents of the"
4905 " 'disks' parameter")
4906 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4907 if not hasattr(self.op, "name"):
4908 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4909 fname = self.cfg.ExpandInstanceName(self.op.name)
4911 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4913 self.op.name = fname
4914 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4916 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4919 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4920 if not hasattr(self.op, "allocator") or self.op.allocator is None:
4921 raise errors.OpPrereqError("Missing allocator name")
4922 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4923 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4926 def Exec(self, feedback_fn):
4927 """Run the allocator test.
4930 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4931 ial = IAllocator(self.cfg, self.sstore,
4934 mem_size=self.op.mem_size,
4935 disks=self.op.disks,
4936 disk_template=self.op.disk_template,
4940 vcpus=self.op.vcpus,
4943 ial = IAllocator(self.cfg, self.sstore,
4946 relocate_from=list(self.relocate_from),
4949 if self.op.direction == constants.IALLOCATOR_DIR_IN:
4950 result = ial.in_text
4952 ial.Run(self.op.allocator, validate=False)
4953 result = ial.out_text