4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
79 for attr_name in self._OP_REQP:
80 attr_val = getattr(op, attr_name, None)
82 raise errors.OpPrereqError("Required parameter '%s' missing" %
85 if not cfg.IsCluster():
86 raise errors.OpPrereqError("Cluster not initialized yet,"
87 " use 'gnt-cluster init' first.")
89 master = sstore.GetMasterNode()
90 if master != utils.HostInfo().name:
91 raise errors.OpPrereqError("Commands must be run on the master"
95 """Returns the SshRunner object
99 self.__ssh = ssh.SshRunner(self.sstore)
102 ssh = property(fget=__GetSSH)
104 def CheckPrereq(self):
105 """Check prerequisites for this LU.
107 This method should check that the prerequisites for the execution
108 of this LU are fulfilled. It can do internode communication, but
109 it should be idempotent - no cluster or system changes are
112 The method should raise errors.OpPrereqError in case something is
113 not fulfilled. Its return value is ignored.
115 This method should also update all the parameters of the opcode to
116 their canonical form; e.g. a short node name must be fully
117 expanded after this method has successfully completed (so that
118 hooks, logging, etc. work correctly).
121 raise NotImplementedError
123 def Exec(self, feedback_fn):
126 This method should implement the actual work. It should raise
127 errors.OpExecError for failures that are somewhat dealt with in
131 raise NotImplementedError
133 def BuildHooksEnv(self):
134 """Build hooks environment for this LU.
136 This method should return a three-node tuple consisting of: a dict
137 containing the environment that will be used for running the
138 specific hook for this LU, a list of node names on which the hook
139 should run before the execution, and a list of node names on which
140 the hook should run after the execution.
142 The keys of the dict must not have 'GANETI_' prefixed as this will
143 be handled in the hooks runner. Also note additional keys will be
144 added by the hooks runner. If the LU doesn't define any
145 environment, an empty dict (and not None) should be returned.
147 As for the node lists, the master should not be included in the
148 them, as it will be added by the hooks runner in case this LU
149 requires a cluster to run on (otherwise we don't have a node
150 list). No nodes should be returned as an empty list (and not
153 Note that if the HPATH for a LU class is None, this function will
157 raise NotImplementedError
160 class NoHooksLU(LogicalUnit):
161 """Simple LU which runs no hooks.
163 This LU is intended as a parent for other LogicalUnits which will
164 run no hooks, in order to reduce duplicate code.
170 def BuildHooksEnv(self):
173 This is a no-op, since we don't run hooks.
179 def _AddHostToEtcHosts(hostname):
180 """Wrapper around utils.SetEtcHostsEntry.
183 hi = utils.HostInfo(name=hostname)
184 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
187 def _RemoveHostFromEtcHosts(hostname):
188 """Wrapper around utils.RemoveEtcHostsEntry.
191 hi = utils.HostInfo(name=hostname)
192 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
196 def _GetWantedNodes(lu, nodes):
197 """Returns list of checked and expanded node names.
200 nodes: List of nodes (strings) or None for all
203 if not isinstance(nodes, list):
204 raise errors.OpPrereqError("Invalid argument type 'nodes'")
210 node = lu.cfg.ExpandNodeName(name)
212 raise errors.OpPrereqError("No such node name '%s'" % name)
216 wanted = lu.cfg.GetNodeList()
217 return utils.NiceSort(wanted)
220 def _GetWantedInstances(lu, instances):
221 """Returns list of checked and expanded instance names.
224 instances: List of instances (strings) or None for all
227 if not isinstance(instances, list):
228 raise errors.OpPrereqError("Invalid argument type 'instances'")
233 for name in instances:
234 instance = lu.cfg.ExpandInstanceName(name)
236 raise errors.OpPrereqError("No such instance name '%s'" % name)
237 wanted.append(instance)
240 wanted = lu.cfg.GetInstanceList()
241 return utils.NiceSort(wanted)
244 def _CheckOutputFields(static, dynamic, selected):
245 """Checks whether all selected fields are valid.
248 static: Static fields
249 dynamic: Dynamic fields
252 static_fields = frozenset(static)
253 dynamic_fields = frozenset(dynamic)
255 all_fields = static_fields | dynamic_fields
257 if not all_fields.issuperset(selected):
258 raise errors.OpPrereqError("Unknown output fields selected: %s"
259 % ",".join(frozenset(selected).
260 difference(all_fields)))
263 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264 memory, vcpus, nics):
265 """Builds instance related env variables for hooks from single variables.
268 secondary_nodes: List of secondary nodes as strings
272 "INSTANCE_NAME": name,
273 "INSTANCE_PRIMARY": primary_node,
274 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275 "INSTANCE_OS_TYPE": os_type,
276 "INSTANCE_STATUS": status,
277 "INSTANCE_MEMORY": memory,
278 "INSTANCE_VCPUS": vcpus,
282 nic_count = len(nics)
283 for idx, (ip, bridge, mac) in enumerate(nics):
286 env["INSTANCE_NIC%d_IP" % idx] = ip
287 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
292 env["INSTANCE_NIC_COUNT"] = nic_count
297 def _BuildInstanceHookEnvByObject(instance, override=None):
298 """Builds instance related env variables for hooks from an object.
301 instance: objects.Instance object of instance
302 override: dict of values to override
305 'name': instance.name,
306 'primary_node': instance.primary_node,
307 'secondary_nodes': instance.secondary_nodes,
308 'os_type': instance.os,
309 'status': instance.os,
310 'memory': instance.memory,
311 'vcpus': instance.vcpus,
312 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
315 args.update(override)
316 return _BuildInstanceHookEnv(**args)
319 def _HasValidVG(vglist, vgname):
320 """Checks if the volume group list is valid.
322 A non-None return value means there's an error, and the return value
323 is the error message.
326 vgsize = vglist.get(vgname, None)
328 return "volume group '%s' missing" % vgname
330 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
335 def _InitSSHSetup(node):
336 """Setup the SSH configuration for the cluster.
339 This generates a dsa keypair for root, adds the pub key to the
340 permitted hosts and adds the hostkey to its own known hosts.
343 node: the name of this host as a fqdn
346 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348 for name in priv_key, pub_key:
349 if os.path.exists(name):
350 utils.CreateBackup(name)
351 utils.RemoveFile(name)
353 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
357 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
360 f = open(pub_key, 'r')
362 utils.AddAuthorizedKey(auth_keys, f.read(8192))
367 def _InitGanetiServerSetup(ss):
368 """Setup the necessary configuration for the initial node daemon.
370 This creates the nodepass file containing the shared password for
371 the cluster and also generates the SSL certificate.
374 # Create pseudo random password
375 randpass = sha.new(os.urandom(64)).hexdigest()
376 # and write it into sstore
377 ss.SetKey(ss.SS_NODED_PASS, randpass)
379 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380 "-days", str(365*5), "-nodes", "-x509",
381 "-keyout", constants.SSL_CERT_FILE,
382 "-out", constants.SSL_CERT_FILE, "-batch"])
384 raise errors.OpExecError("could not generate server ssl cert, command"
385 " %s had exitcode %s and error message %s" %
386 (result.cmd, result.exit_code, result.output))
388 os.chmod(constants.SSL_CERT_FILE, 0400)
390 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
393 raise errors.OpExecError("Could not start the node daemon, command %s"
394 " had exitcode %s and error %s" %
395 (result.cmd, result.exit_code, result.output))
398 def _CheckInstanceBridgesExist(instance):
399 """Check that the brigdes needed by an instance exist.
402 # check bridges existance
403 brlist = [nic.bridge for nic in instance.nics]
404 if not rpc.call_bridges_exist(instance.primary_node, brlist):
405 raise errors.OpPrereqError("one or more target bridges %s does not"
406 " exist on destination node '%s'" %
407 (brlist, instance.primary_node))
410 class LUInitCluster(LogicalUnit):
411 """Initialise the cluster.
414 HPATH = "cluster-init"
415 HTYPE = constants.HTYPE_CLUSTER
416 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
417 "def_bridge", "master_netdev", "file_storage_dir"]
420 def BuildHooksEnv(self):
423 Notes: Since we don't require a cluster, we must manually add
424 ourselves in the post-run node list.
427 env = {"OP_TARGET": self.op.cluster_name}
428 return env, [], [self.hostname.name]
430 def CheckPrereq(self):
431 """Verify that the passed name is a valid one.
434 if config.ConfigWriter.IsCluster():
435 raise errors.OpPrereqError("Cluster is already initialised")
437 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438 if not os.path.exists(constants.VNC_PASSWORD_FILE):
439 raise errors.OpPrereqError("Please prepare the cluster VNC"
441 constants.VNC_PASSWORD_FILE)
443 self.hostname = hostname = utils.HostInfo()
445 if hostname.ip.startswith("127."):
446 raise errors.OpPrereqError("This host's IP resolves to the private"
447 " range (%s). Please fix DNS or %s." %
448 (hostname.ip, constants.ETC_HOSTS))
450 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451 source=constants.LOCALHOST_IP_ADDRESS):
452 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453 " to %s,\nbut this ip address does not"
454 " belong to this host."
455 " Aborting." % hostname.ip)
457 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463 secondary_ip = getattr(self.op, "secondary_ip", None)
464 if secondary_ip and not utils.IsValidIP(secondary_ip):
465 raise errors.OpPrereqError("Invalid secondary ip given")
467 secondary_ip != hostname.ip and
468 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469 source=constants.LOCALHOST_IP_ADDRESS))):
470 raise errors.OpPrereqError("You gave %s as secondary IP,"
471 " but it does not belong to this host." %
473 self.secondary_ip = secondary_ip
475 # checks presence of the volume group given
476 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
479 raise errors.OpPrereqError("Error: %s" % vgstatus)
481 self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
483 if not os.path.isabs(self.op.file_storage_dir):
484 raise errors.OpPrereqError("The file storage directory you have is"
485 " not an absolute path.")
487 if not os.path.exists(self.op.file_storage_dir):
489 os.makedirs(self.op.file_storage_dir, 0750)
491 raise errors.OpPrereqError("Cannot create file storage directory"
493 (self.op.file_storage_dir, err))
495 if not os.path.isdir(self.op.file_storage_dir):
496 raise errors.OpPrereqError("The file storage directory '%s' is not"
497 " a directory." % self.op.file_storage_dir)
499 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
501 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
504 if self.op.hypervisor_type not in constants.HYPER_TYPES:
505 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
506 self.op.hypervisor_type)
508 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
510 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
511 (self.op.master_netdev,
512 result.output.strip()))
514 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
515 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
516 raise errors.OpPrereqError("Init.d script '%s' missing or not"
517 " executable." % constants.NODE_INITD_SCRIPT)
519 def Exec(self, feedback_fn):
520 """Initialize the cluster.
523 clustername = self.clustername
524 hostname = self.hostname
526 # set up the simple store
527 self.sstore = ss = ssconf.SimpleStore()
528 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
529 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
530 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
531 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
532 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
533 ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
535 # set up the inter-node password and certificate
536 _InitGanetiServerSetup(ss)
538 # start the master ip
539 rpc.call_node_start_master(hostname.name)
541 # set up ssh config and /etc/hosts
542 f = open(constants.SSH_HOST_RSA_PUB, 'r')
547 sshkey = sshline.split(" ")[1]
549 _AddHostToEtcHosts(hostname.name)
550 _InitSSHSetup(hostname.name)
552 # init of cluster config file
553 self.cfg = cfgw = config.ConfigWriter()
554 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
555 sshkey, self.op.mac_prefix,
556 self.op.vg_name, self.op.def_bridge)
558 ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
561 class LUDestroyCluster(NoHooksLU):
562 """Logical unit for destroying the cluster.
567 def CheckPrereq(self):
568 """Check prerequisites.
570 This checks whether the cluster is empty.
572 Any errors are signalled by raising errors.OpPrereqError.
575 master = self.sstore.GetMasterNode()
577 nodelist = self.cfg.GetNodeList()
578 if len(nodelist) != 1 or nodelist[0] != master:
579 raise errors.OpPrereqError("There are still %d node(s) in"
580 " this cluster." % (len(nodelist) - 1))
581 instancelist = self.cfg.GetInstanceList()
583 raise errors.OpPrereqError("There are still %d instance(s) in"
584 " this cluster." % len(instancelist))
586 def Exec(self, feedback_fn):
587 """Destroys the cluster.
590 master = self.sstore.GetMasterNode()
591 if not rpc.call_node_stop_master(master):
592 raise errors.OpExecError("Could not disable the master role")
593 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594 utils.CreateBackup(priv_key)
595 utils.CreateBackup(pub_key)
596 rpc.call_node_leave_cluster(master)
599 class LUVerifyCluster(NoHooksLU):
600 """Verifies the cluster status.
605 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
606 remote_version, feedback_fn):
607 """Run multiple tests against a node.
610 - compares ganeti version
611 - checks vg existance and size > 20G
612 - checks config file checksum
613 - checks ssh to other nodes
616 node: name of the node to check
617 file_list: required list of files
618 local_cksum: dictionary of local files and their checksums
621 # compares ganeti version
622 local_version = constants.PROTOCOL_VERSION
623 if not remote_version:
624 feedback_fn(" - ERROR: connection to %s failed" % (node))
627 if local_version != remote_version:
628 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
629 (local_version, node, remote_version))
632 # checks vg existance and size > 20G
636 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
640 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
642 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
645 # checks config file checksum
648 if 'filelist' not in node_result:
650 feedback_fn(" - ERROR: node hasn't returned file checksum data")
652 remote_cksum = node_result['filelist']
653 for file_name in file_list:
654 if file_name not in remote_cksum:
656 feedback_fn(" - ERROR: file '%s' missing" % file_name)
657 elif remote_cksum[file_name] != local_cksum[file_name]:
659 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
661 if 'nodelist' not in node_result:
663 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
665 if node_result['nodelist']:
667 for node in node_result['nodelist']:
668 feedback_fn(" - ERROR: communication with node '%s': %s" %
669 (node, node_result['nodelist'][node]))
670 hyp_result = node_result.get('hypervisor', None)
671 if hyp_result is not None:
672 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
675 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
676 """Verify an instance.
678 This function checks to see if the required block devices are
679 available on the instance's node.
684 instancelist = self.cfg.GetInstanceList()
685 if not instance in instancelist:
686 feedback_fn(" - ERROR: instance %s not in instance list %s" %
687 (instance, instancelist))
690 instanceconfig = self.cfg.GetInstanceInfo(instance)
691 node_current = instanceconfig.primary_node
694 instanceconfig.MapLVsByNode(node_vol_should)
696 for node in node_vol_should:
697 for volume in node_vol_should[node]:
698 if node not in node_vol_is or volume not in node_vol_is[node]:
699 feedback_fn(" - ERROR: volume %s missing on node %s" %
703 if not instanceconfig.status == 'down':
704 if not instance in node_instance[node_current]:
705 feedback_fn(" - ERROR: instance %s not running on node %s" %
706 (instance, node_current))
709 for node in node_instance:
710 if (not node == node_current):
711 if instance in node_instance[node]:
712 feedback_fn(" - ERROR: instance %s should not run on node %s" %
718 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
719 """Verify if there are any unknown volumes in the cluster.
721 The .os, .swap and backup volumes are ignored. All other volumes are
727 for node in node_vol_is:
728 for volume in node_vol_is[node]:
729 if node not in node_vol_should or volume not in node_vol_should[node]:
730 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
735 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
736 """Verify the list of running instances.
738 This checks what instances are running but unknown to the cluster.
742 for node in node_instance:
743 for runninginstance in node_instance[node]:
744 if runninginstance not in instancelist:
745 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
746 (runninginstance, node))
750 def CheckPrereq(self):
751 """Check prerequisites.
753 This has no prerequisites.
758 def Exec(self, feedback_fn):
759 """Verify integrity of cluster, performing various test on nodes.
763 feedback_fn("* Verifying global settings")
764 for msg in self.cfg.VerifyConfig():
765 feedback_fn(" - ERROR: %s" % msg)
767 vg_name = self.cfg.GetVGName()
768 nodelist = utils.NiceSort(self.cfg.GetNodeList())
769 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
773 # FIXME: verify OS list
775 file_names = list(self.sstore.GetFileList())
776 file_names.append(constants.SSL_CERT_FILE)
777 file_names.append(constants.CLUSTER_CONF_FILE)
778 local_checksums = utils.FingerprintFiles(file_names)
780 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
781 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
782 all_instanceinfo = rpc.call_instance_list(nodelist)
783 all_vglist = rpc.call_vg_list(nodelist)
784 node_verify_param = {
785 'filelist': file_names,
786 'nodelist': nodelist,
789 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
790 all_rversion = rpc.call_version(nodelist)
792 for node in nodelist:
793 feedback_fn("* Verifying node %s" % node)
794 result = self._VerifyNode(node, file_names, local_checksums,
795 all_vglist[node], all_nvinfo[node],
796 all_rversion[node], feedback_fn)
800 volumeinfo = all_volumeinfo[node]
802 if isinstance(volumeinfo, basestring):
803 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
804 (node, volumeinfo[-400:].encode('string_escape')))
806 node_volume[node] = {}
807 elif not isinstance(volumeinfo, dict):
808 feedback_fn(" - ERROR: connection to %s failed" % (node,))
812 node_volume[node] = volumeinfo
815 nodeinstance = all_instanceinfo[node]
816 if type(nodeinstance) != list:
817 feedback_fn(" - ERROR: connection to %s failed" % (node,))
821 node_instance[node] = nodeinstance
825 for instance in instancelist:
826 feedback_fn("* Verifying instance %s" % instance)
827 result = self._VerifyInstance(instance, node_volume, node_instance,
831 inst_config = self.cfg.GetInstanceInfo(instance)
833 inst_config.MapLVsByNode(node_vol_should)
835 feedback_fn("* Verifying orphan volumes")
836 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840 feedback_fn("* Verifying remaining instances")
841 result = self._VerifyOrphanInstances(instancelist, node_instance,
848 class LUVerifyDisks(NoHooksLU):
849 """Verifies the cluster disks status.
854 def CheckPrereq(self):
855 """Check prerequisites.
857 This has no prerequisites.
862 def Exec(self, feedback_fn):
863 """Verify integrity of cluster disks.
866 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
868 vg_name = self.cfg.GetVGName()
869 nodes = utils.NiceSort(self.cfg.GetNodeList())
870 instances = [self.cfg.GetInstanceInfo(name)
871 for name in self.cfg.GetInstanceList()]
874 for inst in instances:
876 if (inst.status != "up" or
877 inst.disk_template not in constants.DTS_NET_MIRROR):
879 inst.MapLVsByNode(inst_lvs)
880 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
881 for node, vol_list in inst_lvs.iteritems():
883 nv_dict[(node, vol)] = inst
888 node_lvs = rpc.call_volume_list(nodes, vg_name)
895 if isinstance(lvs, basestring):
896 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
898 elif not isinstance(lvs, dict):
899 logger.Info("connection to node %s failed or invalid data returned" %
901 res_nodes.append(node)
904 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
905 inst = nv_dict.pop((node, lv_name), None)
906 if (not lv_online and inst is not None
907 and inst.name not in res_instances):
908 res_instances.append(inst.name)
910 # any leftover items in nv_dict are missing LVs, let's arrange the
912 for key, inst in nv_dict.iteritems():
913 if inst.name not in res_missing:
914 res_missing[inst.name] = []
915 res_missing[inst.name].append(key)
920 class LURenameCluster(LogicalUnit):
921 """Rename the cluster.
924 HPATH = "cluster-rename"
925 HTYPE = constants.HTYPE_CLUSTER
928 def BuildHooksEnv(self):
933 "OP_TARGET": self.sstore.GetClusterName(),
934 "NEW_NAME": self.op.name,
936 mn = self.sstore.GetMasterNode()
937 return env, [mn], [mn]
939 def CheckPrereq(self):
940 """Verify that the passed name is a valid one.
943 hostname = utils.HostInfo(self.op.name)
945 new_name = hostname.name
946 self.ip = new_ip = hostname.ip
947 old_name = self.sstore.GetClusterName()
948 old_ip = self.sstore.GetMasterIP()
949 if new_name == old_name and new_ip == old_ip:
950 raise errors.OpPrereqError("Neither the name nor the IP address of the"
951 " cluster has changed")
953 result = utils.RunCmd(["fping", "-q", new_ip])
954 if not result.failed:
955 raise errors.OpPrereqError("The given cluster IP address (%s) is"
956 " reachable on the network. Aborting." %
959 self.op.name = new_name
961 def Exec(self, feedback_fn):
962 """Rename the cluster.
965 clustername = self.op.name
969 # shutdown the master IP
970 master = ss.GetMasterNode()
971 if not rpc.call_node_stop_master(master):
972 raise errors.OpExecError("Could not disable the master role")
976 ss.SetKey(ss.SS_MASTER_IP, ip)
977 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
979 # Distribute updated ss config to all nodes
980 myself = self.cfg.GetNodeInfo(master)
981 dist_nodes = self.cfg.GetNodeList()
982 if myself.name in dist_nodes:
983 dist_nodes.remove(myself.name)
985 logger.Debug("Copying updated ssconf data to all nodes")
986 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
987 fname = ss.KeyToFilename(keyname)
988 result = rpc.call_upload_file(dist_nodes, fname)
989 for to_node in dist_nodes:
990 if not result[to_node]:
991 logger.Error("copy of file %s to node %s failed" %
994 if not rpc.call_node_start_master(master):
995 logger.Error("Could not re-enable the master role on the master,"
996 " please restart manually.")
999 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1000 """Sleep and poll for an instance's disk to sync.
1003 if not instance.disks:
1007 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1009 node = instance.primary_node
1011 for dev in instance.disks:
1012 cfgw.SetDiskID(dev, node)
1018 cumul_degraded = False
1019 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1021 proc.LogWarning("Can't get any data from node %s" % node)
1024 raise errors.RemoteError("Can't contact node %s for mirror data,"
1025 " aborting." % node)
1029 for i in range(len(rstats)):
1032 proc.LogWarning("Can't compute data for node %s/%s" %
1033 (node, instance.disks[i].iv_name))
1035 # we ignore the ldisk parameter
1036 perc_done, est_time, is_degraded, _ = mstat
1037 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1038 if perc_done is not None:
1040 if est_time is not None:
1041 rem_time = "%d estimated seconds remaining" % est_time
1044 rem_time = "no time estimate"
1045 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1046 (instance.disks[i].iv_name, perc_done, rem_time))
1053 time.sleep(min(60, max_time))
1059 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1060 return not cumul_degraded
1063 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1064 """Check that mirrors are not degraded.
1066 The ldisk parameter, if True, will change the test from the
1067 is_degraded attribute (which represents overall non-ok status for
1068 the device(s)) to the ldisk (representing the local storage status).
1071 cfgw.SetDiskID(dev, node)
1078 if on_primary or dev.AssembleOnSecondary():
1079 rstats = rpc.call_blockdev_find(node, dev)
1081 logger.ToStderr("Can't get any data from node %s" % node)
1084 result = result and (not rstats[idx])
1086 for child in dev.children:
1087 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1092 class LUDiagnoseOS(NoHooksLU):
1093 """Logical unit for OS diagnose/query.
1098 def CheckPrereq(self):
1099 """Check prerequisites.
1101 This always succeeds, since this is a pure query LU.
1106 def Exec(self, feedback_fn):
1107 """Compute the list of OSes.
1110 node_list = self.cfg.GetNodeList()
1111 node_data = rpc.call_os_diagnose(node_list)
1112 if node_data == False:
1113 raise errors.OpExecError("Can't gather the list of OSes")
1117 class LURemoveNode(LogicalUnit):
1118 """Logical unit for removing a node.
1121 HPATH = "node-remove"
1122 HTYPE = constants.HTYPE_NODE
1123 _OP_REQP = ["node_name"]
1125 def BuildHooksEnv(self):
1128 This doesn't run on the target node in the pre phase as a failed
1129 node would not allows itself to run.
1133 "OP_TARGET": self.op.node_name,
1134 "NODE_NAME": self.op.node_name,
1136 all_nodes = self.cfg.GetNodeList()
1137 all_nodes.remove(self.op.node_name)
1138 return env, all_nodes, all_nodes
1140 def CheckPrereq(self):
1141 """Check prerequisites.
1144 - the node exists in the configuration
1145 - it does not have primary or secondary instances
1146 - it's not the master
1148 Any errors are signalled by raising errors.OpPrereqError.
1151 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1153 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1155 instance_list = self.cfg.GetInstanceList()
1157 masternode = self.sstore.GetMasterNode()
1158 if node.name == masternode:
1159 raise errors.OpPrereqError("Node is the master node,"
1160 " you need to failover first.")
1162 for instance_name in instance_list:
1163 instance = self.cfg.GetInstanceInfo(instance_name)
1164 if node.name == instance.primary_node:
1165 raise errors.OpPrereqError("Instance %s still running on the node,"
1166 " please remove first." % instance_name)
1167 if node.name in instance.secondary_nodes:
1168 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1169 " please remove first." % instance_name)
1170 self.op.node_name = node.name
1173 def Exec(self, feedback_fn):
1174 """Removes the node from the cluster.
1178 logger.Info("stopping the node daemon and removing configs from node %s" %
1181 rpc.call_node_leave_cluster(node.name)
1183 self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1185 logger.Info("Removing node %s from config" % node.name)
1187 self.cfg.RemoveNode(node.name)
1189 _RemoveHostFromEtcHosts(node.name)
1192 class LUQueryNodes(NoHooksLU):
1193 """Logical unit for querying nodes.
1196 _OP_REQP = ["output_fields", "names"]
1198 def CheckPrereq(self):
1199 """Check prerequisites.
1201 This checks that the fields required are valid output fields.
1204 self.dynamic_fields = frozenset(["dtotal", "dfree",
1205 "mtotal", "mnode", "mfree",
1208 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1209 "pinst_list", "sinst_list",
1211 dynamic=self.dynamic_fields,
1212 selected=self.op.output_fields)
1214 self.wanted = _GetWantedNodes(self, self.op.names)
1216 def Exec(self, feedback_fn):
1217 """Computes the list of nodes and their attributes.
1220 nodenames = self.wanted
1221 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1223 # begin data gathering
1225 if self.dynamic_fields.intersection(self.op.output_fields):
1227 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1228 for name in nodenames:
1229 nodeinfo = node_data.get(name, None)
1232 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1233 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1234 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1235 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1236 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1237 "bootid": nodeinfo['bootid'],
1240 live_data[name] = {}
1242 live_data = dict.fromkeys(nodenames, {})
1244 node_to_primary = dict([(name, set()) for name in nodenames])
1245 node_to_secondary = dict([(name, set()) for name in nodenames])
1247 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1248 "sinst_cnt", "sinst_list"))
1249 if inst_fields & frozenset(self.op.output_fields):
1250 instancelist = self.cfg.GetInstanceList()
1252 for instance_name in instancelist:
1253 inst = self.cfg.GetInstanceInfo(instance_name)
1254 if inst.primary_node in node_to_primary:
1255 node_to_primary[inst.primary_node].add(inst.name)
1256 for secnode in inst.secondary_nodes:
1257 if secnode in node_to_secondary:
1258 node_to_secondary[secnode].add(inst.name)
1260 # end data gathering
1263 for node in nodelist:
1265 for field in self.op.output_fields:
1268 elif field == "pinst_list":
1269 val = list(node_to_primary[node.name])
1270 elif field == "sinst_list":
1271 val = list(node_to_secondary[node.name])
1272 elif field == "pinst_cnt":
1273 val = len(node_to_primary[node.name])
1274 elif field == "sinst_cnt":
1275 val = len(node_to_secondary[node.name])
1276 elif field == "pip":
1277 val = node.primary_ip
1278 elif field == "sip":
1279 val = node.secondary_ip
1280 elif field in self.dynamic_fields:
1281 val = live_data[node.name].get(field, None)
1283 raise errors.ParameterError(field)
1284 node_output.append(val)
1285 output.append(node_output)
1290 class LUQueryNodeVolumes(NoHooksLU):
1291 """Logical unit for getting volumes on node(s).
1294 _OP_REQP = ["nodes", "output_fields"]
1296 def CheckPrereq(self):
1297 """Check prerequisites.
1299 This checks that the fields required are valid output fields.
1302 self.nodes = _GetWantedNodes(self, self.op.nodes)
1304 _CheckOutputFields(static=["node"],
1305 dynamic=["phys", "vg", "name", "size", "instance"],
1306 selected=self.op.output_fields)
1309 def Exec(self, feedback_fn):
1310 """Computes the list of nodes and their attributes.
1313 nodenames = self.nodes
1314 volumes = rpc.call_node_volumes(nodenames)
1316 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1317 in self.cfg.GetInstanceList()]
1319 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1322 for node in nodenames:
1323 if node not in volumes or not volumes[node]:
1326 node_vols = volumes[node][:]
1327 node_vols.sort(key=lambda vol: vol['dev'])
1329 for vol in node_vols:
1331 for field in self.op.output_fields:
1334 elif field == "phys":
1338 elif field == "name":
1340 elif field == "size":
1341 val = int(float(vol['size']))
1342 elif field == "instance":
1344 if node not in lv_by_node[inst]:
1346 if vol['name'] in lv_by_node[inst][node]:
1352 raise errors.ParameterError(field)
1353 node_output.append(str(val))
1355 output.append(node_output)
1360 class LUAddNode(LogicalUnit):
1361 """Logical unit for adding node to the cluster.
1365 HTYPE = constants.HTYPE_NODE
1366 _OP_REQP = ["node_name"]
1368 def BuildHooksEnv(self):
1371 This will run on all nodes before, and on all nodes + the new node after.
1375 "OP_TARGET": self.op.node_name,
1376 "NODE_NAME": self.op.node_name,
1377 "NODE_PIP": self.op.primary_ip,
1378 "NODE_SIP": self.op.secondary_ip,
1380 nodes_0 = self.cfg.GetNodeList()
1381 nodes_1 = nodes_0 + [self.op.node_name, ]
1382 return env, nodes_0, nodes_1
1384 def CheckPrereq(self):
1385 """Check prerequisites.
1388 - the new node is not already in the config
1390 - its parameters (single/dual homed) matches the cluster
1392 Any errors are signalled by raising errors.OpPrereqError.
1395 node_name = self.op.node_name
1398 dns_data = utils.HostInfo(node_name)
1400 node = dns_data.name
1401 primary_ip = self.op.primary_ip = dns_data.ip
1402 secondary_ip = getattr(self.op, "secondary_ip", None)
1403 if secondary_ip is None:
1404 secondary_ip = primary_ip
1405 if not utils.IsValidIP(secondary_ip):
1406 raise errors.OpPrereqError("Invalid secondary IP given")
1407 self.op.secondary_ip = secondary_ip
1408 node_list = cfg.GetNodeList()
1409 if node in node_list:
1410 raise errors.OpPrereqError("Node %s is already in the configuration"
1413 for existing_node_name in node_list:
1414 existing_node = cfg.GetNodeInfo(existing_node_name)
1415 if (existing_node.primary_ip == primary_ip or
1416 existing_node.secondary_ip == primary_ip or
1417 existing_node.primary_ip == secondary_ip or
1418 existing_node.secondary_ip == secondary_ip):
1419 raise errors.OpPrereqError("New node ip address(es) conflict with"
1420 " existing node %s" % existing_node.name)
1422 # check that the type of the node (single versus dual homed) is the
1423 # same as for the master
1424 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1425 master_singlehomed = myself.secondary_ip == myself.primary_ip
1426 newbie_singlehomed = secondary_ip == primary_ip
1427 if master_singlehomed != newbie_singlehomed:
1428 if master_singlehomed:
1429 raise errors.OpPrereqError("The master has no private ip but the"
1430 " new node has one")
1432 raise errors.OpPrereqError("The master has a private ip but the"
1433 " new node doesn't have one")
1435 # checks reachablity
1436 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1437 raise errors.OpPrereqError("Node not reachable by ping")
1439 if not newbie_singlehomed:
1440 # check reachability from my secondary ip to newbie's secondary ip
1441 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1442 source=myself.secondary_ip):
1443 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1444 " based ping to noded port")
1446 self.new_node = objects.Node(name=node,
1447 primary_ip=primary_ip,
1448 secondary_ip=secondary_ip)
1450 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1451 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1452 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1453 constants.VNC_PASSWORD_FILE)
1455 def Exec(self, feedback_fn):
1456 """Adds the new node to the cluster.
1459 new_node = self.new_node
1460 node = new_node.name
1462 # set up inter-node password and certificate and restarts the node daemon
1463 gntpass = self.sstore.GetNodeDaemonPassword()
1464 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1465 raise errors.OpExecError("ganeti password corruption detected")
1466 f = open(constants.SSL_CERT_FILE)
1468 gntpem = f.read(8192)
1471 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1472 # so we use this to detect an invalid certificate; as long as the
1473 # cert doesn't contain this, the here-document will be correctly
1474 # parsed by the shell sequence below
1475 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1476 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1477 if not gntpem.endswith("\n"):
1478 raise errors.OpExecError("PEM must end with newline")
1479 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1481 # and then connect with ssh to set password and start ganeti-noded
1482 # note that all the below variables are sanitized at this point,
1483 # either by being constants or by the checks above
1485 mycommand = ("umask 077 && "
1486 "echo '%s' > '%s' && "
1487 "cat > '%s' << '!EOF.' && \n"
1488 "%s!EOF.\n%s restart" %
1489 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1490 constants.SSL_CERT_FILE, gntpem,
1491 constants.NODE_INITD_SCRIPT))
1493 result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1495 raise errors.OpExecError("Remote command on node %s, error: %s,"
1497 (node, result.fail_reason, result.output))
1499 # check connectivity
1502 result = rpc.call_version([node])[node]
1504 if constants.PROTOCOL_VERSION == result:
1505 logger.Info("communication to node %s fine, sw version %s match" %
1508 raise errors.OpExecError("Version mismatch master version %s,"
1509 " node version %s" %
1510 (constants.PROTOCOL_VERSION, result))
1512 raise errors.OpExecError("Cannot get version from the new node")
1515 logger.Info("copy ssh key to node %s" % node)
1516 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1518 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1519 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1525 keyarray.append(f.read())
1529 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1530 keyarray[3], keyarray[4], keyarray[5])
1533 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1535 # Add node to our /etc/hosts, and add key to known_hosts
1536 _AddHostToEtcHosts(new_node.name)
1538 if new_node.secondary_ip != new_node.primary_ip:
1539 if not rpc.call_node_tcp_ping(new_node.name,
1540 constants.LOCALHOST_IP_ADDRESS,
1541 new_node.secondary_ip,
1542 constants.DEFAULT_NODED_PORT,
1544 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1545 " you gave (%s). Please fix and re-run this"
1546 " command." % new_node.secondary_ip)
1548 success, msg = self.ssh.VerifyNodeHostname(node)
1550 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1551 " than the one the resolver gives: %s."
1552 " Please fix and re-run this command." %
1555 # Distribute updated /etc/hosts and known_hosts to all nodes,
1556 # including the node just added
1557 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1558 dist_nodes = self.cfg.GetNodeList() + [node]
1559 if myself.name in dist_nodes:
1560 dist_nodes.remove(myself.name)
1562 logger.Debug("Copying hosts and known_hosts to all nodes")
1563 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1564 result = rpc.call_upload_file(dist_nodes, fname)
1565 for to_node in dist_nodes:
1566 if not result[to_node]:
1567 logger.Error("copy of file %s to node %s failed" %
1570 to_copy = ss.GetFileList()
1571 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1572 to_copy.append(constants.VNC_PASSWORD_FILE)
1573 for fname in to_copy:
1574 if not self.ssh.CopyFileToNode(node, fname):
1575 logger.Error("could not copy file %s to node %s" % (fname, node))
1577 logger.Info("adding node %s to cluster.conf" % node)
1578 self.cfg.AddNode(new_node)
1581 class LUMasterFailover(LogicalUnit):
1582 """Failover the master node to the current node.
1584 This is a special LU in that it must run on a non-master node.
1587 HPATH = "master-failover"
1588 HTYPE = constants.HTYPE_CLUSTER
1592 def BuildHooksEnv(self):
1595 This will run on the new master only in the pre phase, and on all
1596 the nodes in the post phase.
1600 "OP_TARGET": self.new_master,
1601 "NEW_MASTER": self.new_master,
1602 "OLD_MASTER": self.old_master,
1604 return env, [self.new_master], self.cfg.GetNodeList()
1606 def CheckPrereq(self):
1607 """Check prerequisites.
1609 This checks that we are not already the master.
1612 self.new_master = utils.HostInfo().name
1613 self.old_master = self.sstore.GetMasterNode()
1615 if self.old_master == self.new_master:
1616 raise errors.OpPrereqError("This commands must be run on the node"
1617 " where you want the new master to be."
1618 " %s is already the master" %
1621 def Exec(self, feedback_fn):
1622 """Failover the master node.
1624 This command, when run on a non-master node, will cause the current
1625 master to cease being master, and the non-master to become new
1629 #TODO: do not rely on gethostname returning the FQDN
1630 logger.Info("setting master to %s, old master: %s" %
1631 (self.new_master, self.old_master))
1633 if not rpc.call_node_stop_master(self.old_master):
1634 logger.Error("could disable the master role on the old master"
1635 " %s, please disable manually" % self.old_master)
1638 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1639 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1640 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1641 logger.Error("could not distribute the new simple store master file"
1642 " to the other nodes, please check.")
1644 if not rpc.call_node_start_master(self.new_master):
1645 logger.Error("could not start the master role on the new master"
1646 " %s, please check" % self.new_master)
1647 feedback_fn("Error in activating the master IP on the new master,"
1648 " please fix manually.")
1652 class LUQueryClusterInfo(NoHooksLU):
1653 """Query cluster configuration.
1659 def CheckPrereq(self):
1660 """No prerequsites needed for this LU.
1665 def Exec(self, feedback_fn):
1666 """Return cluster config.
1670 "name": self.sstore.GetClusterName(),
1671 "software_version": constants.RELEASE_VERSION,
1672 "protocol_version": constants.PROTOCOL_VERSION,
1673 "config_version": constants.CONFIG_VERSION,
1674 "os_api_version": constants.OS_API_VERSION,
1675 "export_version": constants.EXPORT_VERSION,
1676 "master": self.sstore.GetMasterNode(),
1677 "architecture": (platform.architecture()[0], platform.machine()),
1683 class LUClusterCopyFile(NoHooksLU):
1684 """Copy file to cluster.
1687 _OP_REQP = ["nodes", "filename"]
1689 def CheckPrereq(self):
1690 """Check prerequisites.
1692 It should check that the named file exists and that the given list
1696 if not os.path.exists(self.op.filename):
1697 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1699 self.nodes = _GetWantedNodes(self, self.op.nodes)
1701 def Exec(self, feedback_fn):
1702 """Copy a file from master to some nodes.
1705 opts - class with options as members
1706 args - list containing a single element, the file name
1708 nodes - list containing the name of target nodes; if empty, all nodes
1711 filename = self.op.filename
1713 myname = utils.HostInfo().name
1715 for node in self.nodes:
1718 if not self.ssh.CopyFileToNode(node, filename):
1719 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1722 class LUDumpClusterConfig(NoHooksLU):
1723 """Return a text-representation of the cluster-config.
1728 def CheckPrereq(self):
1729 """No prerequisites.
1734 def Exec(self, feedback_fn):
1735 """Dump a representation of the cluster config to the standard output.
1738 return self.cfg.DumpConfig()
1741 class LURunClusterCommand(NoHooksLU):
1742 """Run a command on some nodes.
1745 _OP_REQP = ["command", "nodes"]
1747 def CheckPrereq(self):
1748 """Check prerequisites.
1750 It checks that the given list of nodes is valid.
1753 self.nodes = _GetWantedNodes(self, self.op.nodes)
1755 def Exec(self, feedback_fn):
1756 """Run a command on some nodes.
1760 for node in self.nodes:
1761 result = self.ssh.Run(node, "root", self.op.command)
1762 data.append((node, result.output, result.exit_code))
1767 class LUActivateInstanceDisks(NoHooksLU):
1768 """Bring up an instance's disks.
1771 _OP_REQP = ["instance_name"]
1773 def CheckPrereq(self):
1774 """Check prerequisites.
1776 This checks that the instance is in the cluster.
1779 instance = self.cfg.GetInstanceInfo(
1780 self.cfg.ExpandInstanceName(self.op.instance_name))
1781 if instance is None:
1782 raise errors.OpPrereqError("Instance '%s' not known" %
1783 self.op.instance_name)
1784 self.instance = instance
1787 def Exec(self, feedback_fn):
1788 """Activate the disks.
1791 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1793 raise errors.OpExecError("Cannot activate block devices")
1798 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1799 """Prepare the block devices for an instance.
1801 This sets up the block devices on all nodes.
1804 instance: a ganeti.objects.Instance object
1805 ignore_secondaries: if true, errors on secondary nodes won't result
1806 in an error return from the function
1809 false if the operation failed
1810 list of (host, instance_visible_name, node_visible_name) if the operation
1811 suceeded with the mapping from node devices to instance devices
1815 iname = instance.name
1816 # With the two passes mechanism we try to reduce the window of
1817 # opportunity for the race condition of switching DRBD to primary
1818 # before handshaking occured, but we do not eliminate it
1820 # The proper fix would be to wait (with some limits) until the
1821 # connection has been made and drbd transitions from WFConnection
1822 # into any other network-connected state (Connected, SyncTarget,
1825 # 1st pass, assemble on all nodes in secondary mode
1826 for inst_disk in instance.disks:
1827 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1828 cfg.SetDiskID(node_disk, node)
1829 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1831 logger.Error("could not prepare block device %s on node %s"
1832 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1833 if not ignore_secondaries:
1836 # FIXME: race condition on drbd migration to primary
1838 # 2nd pass, do only the primary node
1839 for inst_disk in instance.disks:
1840 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1841 if node != instance.primary_node:
1843 cfg.SetDiskID(node_disk, node)
1844 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1846 logger.Error("could not prepare block device %s on node %s"
1847 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1849 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1851 # leave the disks configured for the primary node
1852 # this is a workaround that would be fixed better by
1853 # improving the logical/physical id handling
1854 for disk in instance.disks:
1855 cfg.SetDiskID(disk, instance.primary_node)
1857 return disks_ok, device_info
1860 def _StartInstanceDisks(cfg, instance, force):
1861 """Start the disks of an instance.
1864 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1865 ignore_secondaries=force)
1867 _ShutdownInstanceDisks(instance, cfg)
1868 if force is not None and not force:
1869 logger.Error("If the message above refers to a secondary node,"
1870 " you can retry the operation using '--force'.")
1871 raise errors.OpExecError("Disk consistency error")
1874 class LUDeactivateInstanceDisks(NoHooksLU):
1875 """Shutdown an instance's disks.
1878 _OP_REQP = ["instance_name"]
1880 def CheckPrereq(self):
1881 """Check prerequisites.
1883 This checks that the instance is in the cluster.
1886 instance = self.cfg.GetInstanceInfo(
1887 self.cfg.ExpandInstanceName(self.op.instance_name))
1888 if instance is None:
1889 raise errors.OpPrereqError("Instance '%s' not known" %
1890 self.op.instance_name)
1891 self.instance = instance
1893 def Exec(self, feedback_fn):
1894 """Deactivate the disks
1897 instance = self.instance
1898 ins_l = rpc.call_instance_list([instance.primary_node])
1899 ins_l = ins_l[instance.primary_node]
1900 if not type(ins_l) is list:
1901 raise errors.OpExecError("Can't contact node '%s'" %
1902 instance.primary_node)
1904 if self.instance.name in ins_l:
1905 raise errors.OpExecError("Instance is running, can't shutdown"
1908 _ShutdownInstanceDisks(instance, self.cfg)
1911 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1912 """Shutdown block devices of an instance.
1914 This does the shutdown on all nodes of the instance.
1916 If the ignore_primary is false, errors on the primary node are
1921 for disk in instance.disks:
1922 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1923 cfg.SetDiskID(top_disk, node)
1924 if not rpc.call_blockdev_shutdown(node, top_disk):
1925 logger.Error("could not shutdown block device %s on node %s" %
1926 (disk.iv_name, node))
1927 if not ignore_primary or node != instance.primary_node:
1932 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1933 """Checks if a node has enough free memory.
1935 This function check if a given node has the needed amount of free
1936 memory. In case the node has less memory or we cannot get the
1937 information from the node, this function raise an OpPrereqError
1941 - cfg: a ConfigWriter instance
1942 - node: the node name
1943 - reason: string to use in the error message
1944 - requested: the amount of memory in MiB
1947 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1948 if not nodeinfo or not isinstance(nodeinfo, dict):
1949 raise errors.OpPrereqError("Could not contact node %s for resource"
1950 " information" % (node,))
1952 free_mem = nodeinfo[node].get('memory_free')
1953 if not isinstance(free_mem, int):
1954 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1955 " was '%s'" % (node, free_mem))
1956 if requested > free_mem:
1957 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1958 " needed %s MiB, available %s MiB" %
1959 (node, reason, requested, free_mem))
1962 class LUStartupInstance(LogicalUnit):
1963 """Starts an instance.
1966 HPATH = "instance-start"
1967 HTYPE = constants.HTYPE_INSTANCE
1968 _OP_REQP = ["instance_name", "force"]
1970 def BuildHooksEnv(self):
1973 This runs on master, primary and secondary nodes of the instance.
1977 "FORCE": self.op.force,
1979 env.update(_BuildInstanceHookEnvByObject(self.instance))
1980 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1981 list(self.instance.secondary_nodes))
1984 def CheckPrereq(self):
1985 """Check prerequisites.
1987 This checks that the instance is in the cluster.
1990 instance = self.cfg.GetInstanceInfo(
1991 self.cfg.ExpandInstanceName(self.op.instance_name))
1992 if instance is None:
1993 raise errors.OpPrereqError("Instance '%s' not known" %
1994 self.op.instance_name)
1996 # check bridges existance
1997 _CheckInstanceBridgesExist(instance)
1999 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2000 "starting instance %s" % instance.name,
2003 self.instance = instance
2004 self.op.instance_name = instance.name
2006 def Exec(self, feedback_fn):
2007 """Start the instance.
2010 instance = self.instance
2011 force = self.op.force
2012 extra_args = getattr(self.op, "extra_args", "")
2014 self.cfg.MarkInstanceUp(instance.name)
2016 node_current = instance.primary_node
2018 _StartInstanceDisks(self.cfg, instance, force)
2020 if not rpc.call_instance_start(node_current, instance, extra_args):
2021 _ShutdownInstanceDisks(instance, self.cfg)
2022 raise errors.OpExecError("Could not start instance")
2025 class LURebootInstance(LogicalUnit):
2026 """Reboot an instance.
2029 HPATH = "instance-reboot"
2030 HTYPE = constants.HTYPE_INSTANCE
2031 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2033 def BuildHooksEnv(self):
2036 This runs on master, primary and secondary nodes of the instance.
2040 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2042 env.update(_BuildInstanceHookEnvByObject(self.instance))
2043 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2044 list(self.instance.secondary_nodes))
2047 def CheckPrereq(self):
2048 """Check prerequisites.
2050 This checks that the instance is in the cluster.
2053 instance = self.cfg.GetInstanceInfo(
2054 self.cfg.ExpandInstanceName(self.op.instance_name))
2055 if instance is None:
2056 raise errors.OpPrereqError("Instance '%s' not known" %
2057 self.op.instance_name)
2059 # check bridges existance
2060 _CheckInstanceBridgesExist(instance)
2062 self.instance = instance
2063 self.op.instance_name = instance.name
2065 def Exec(self, feedback_fn):
2066 """Reboot the instance.
2069 instance = self.instance
2070 ignore_secondaries = self.op.ignore_secondaries
2071 reboot_type = self.op.reboot_type
2072 extra_args = getattr(self.op, "extra_args", "")
2074 node_current = instance.primary_node
2076 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2077 constants.INSTANCE_REBOOT_HARD,
2078 constants.INSTANCE_REBOOT_FULL]:
2079 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2080 (constants.INSTANCE_REBOOT_SOFT,
2081 constants.INSTANCE_REBOOT_HARD,
2082 constants.INSTANCE_REBOOT_FULL))
2084 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2085 constants.INSTANCE_REBOOT_HARD]:
2086 if not rpc.call_instance_reboot(node_current, instance,
2087 reboot_type, extra_args):
2088 raise errors.OpExecError("Could not reboot instance")
2090 if not rpc.call_instance_shutdown(node_current, instance):
2091 raise errors.OpExecError("could not shutdown instance for full reboot")
2092 _ShutdownInstanceDisks(instance, self.cfg)
2093 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2094 if not rpc.call_instance_start(node_current, instance, extra_args):
2095 _ShutdownInstanceDisks(instance, self.cfg)
2096 raise errors.OpExecError("Could not start instance for full reboot")
2098 self.cfg.MarkInstanceUp(instance.name)
2101 class LUShutdownInstance(LogicalUnit):
2102 """Shutdown an instance.
2105 HPATH = "instance-stop"
2106 HTYPE = constants.HTYPE_INSTANCE
2107 _OP_REQP = ["instance_name"]
2109 def BuildHooksEnv(self):
2112 This runs on master, primary and secondary nodes of the instance.
2115 env = _BuildInstanceHookEnvByObject(self.instance)
2116 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2117 list(self.instance.secondary_nodes))
2120 def CheckPrereq(self):
2121 """Check prerequisites.
2123 This checks that the instance is in the cluster.
2126 instance = self.cfg.GetInstanceInfo(
2127 self.cfg.ExpandInstanceName(self.op.instance_name))
2128 if instance is None:
2129 raise errors.OpPrereqError("Instance '%s' not known" %
2130 self.op.instance_name)
2131 self.instance = instance
2133 def Exec(self, feedback_fn):
2134 """Shutdown the instance.
2137 instance = self.instance
2138 node_current = instance.primary_node
2139 self.cfg.MarkInstanceDown(instance.name)
2140 if not rpc.call_instance_shutdown(node_current, instance):
2141 logger.Error("could not shutdown instance")
2143 _ShutdownInstanceDisks(instance, self.cfg)
2146 class LUReinstallInstance(LogicalUnit):
2147 """Reinstall an instance.
2150 HPATH = "instance-reinstall"
2151 HTYPE = constants.HTYPE_INSTANCE
2152 _OP_REQP = ["instance_name"]
2154 def BuildHooksEnv(self):
2157 This runs on master, primary and secondary nodes of the instance.
2160 env = _BuildInstanceHookEnvByObject(self.instance)
2161 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2162 list(self.instance.secondary_nodes))
2165 def CheckPrereq(self):
2166 """Check prerequisites.
2168 This checks that the instance is in the cluster and is not running.
2171 instance = self.cfg.GetInstanceInfo(
2172 self.cfg.ExpandInstanceName(self.op.instance_name))
2173 if instance is None:
2174 raise errors.OpPrereqError("Instance '%s' not known" %
2175 self.op.instance_name)
2176 if instance.disk_template == constants.DT_DISKLESS:
2177 raise errors.OpPrereqError("Instance '%s' has no disks" %
2178 self.op.instance_name)
2179 if instance.status != "down":
2180 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2181 self.op.instance_name)
2182 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2184 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2185 (self.op.instance_name,
2186 instance.primary_node))
2188 self.op.os_type = getattr(self.op, "os_type", None)
2189 if self.op.os_type is not None:
2191 pnode = self.cfg.GetNodeInfo(
2192 self.cfg.ExpandNodeName(instance.primary_node))
2194 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2196 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2198 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2199 " primary node" % self.op.os_type)
2201 self.instance = instance
2203 def Exec(self, feedback_fn):
2204 """Reinstall the instance.
2207 inst = self.instance
2209 if self.op.os_type is not None:
2210 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2211 inst.os = self.op.os_type
2212 self.cfg.AddInstance(inst)
2214 _StartInstanceDisks(self.cfg, inst, None)
2216 feedback_fn("Running the instance OS create scripts...")
2217 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2218 raise errors.OpExecError("Could not install OS for instance %s"
2220 (inst.name, inst.primary_node))
2222 _ShutdownInstanceDisks(inst, self.cfg)
2225 class LURenameInstance(LogicalUnit):
2226 """Rename an instance.
2229 HPATH = "instance-rename"
2230 HTYPE = constants.HTYPE_INSTANCE
2231 _OP_REQP = ["instance_name", "new_name"]
2233 def BuildHooksEnv(self):
2236 This runs on master, primary and secondary nodes of the instance.
2239 env = _BuildInstanceHookEnvByObject(self.instance)
2240 env["INSTANCE_NEW_NAME"] = self.op.new_name
2241 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2242 list(self.instance.secondary_nodes))
2245 def CheckPrereq(self):
2246 """Check prerequisites.
2248 This checks that the instance is in the cluster and is not running.
2251 instance = self.cfg.GetInstanceInfo(
2252 self.cfg.ExpandInstanceName(self.op.instance_name))
2253 if instance is None:
2254 raise errors.OpPrereqError("Instance '%s' not known" %
2255 self.op.instance_name)
2256 if instance.status != "down":
2257 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2258 self.op.instance_name)
2259 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2261 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2262 (self.op.instance_name,
2263 instance.primary_node))
2264 self.instance = instance
2266 # new name verification
2267 name_info = utils.HostInfo(self.op.new_name)
2269 self.op.new_name = new_name = name_info.name
2270 instance_list = self.cfg.GetInstanceList()
2271 if new_name in instance_list:
2272 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2275 if not getattr(self.op, "ignore_ip", False):
2276 command = ["fping", "-q", name_info.ip]
2277 result = utils.RunCmd(command)
2278 if not result.failed:
2279 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2280 (name_info.ip, new_name))
2283 def Exec(self, feedback_fn):
2284 """Reinstall the instance.
2287 inst = self.instance
2288 old_name = inst.name
2290 self.cfg.RenameInstance(inst.name, self.op.new_name)
2292 # re-read the instance from the configuration after rename
2293 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2295 _StartInstanceDisks(self.cfg, inst, None)
2297 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2299 msg = ("Could run OS rename script for instance %s on node %s (but the"
2300 " instance has been renamed in Ganeti)" %
2301 (inst.name, inst.primary_node))
2304 _ShutdownInstanceDisks(inst, self.cfg)
2307 class LURemoveInstance(LogicalUnit):
2308 """Remove an instance.
2311 HPATH = "instance-remove"
2312 HTYPE = constants.HTYPE_INSTANCE
2313 _OP_REQP = ["instance_name"]
2315 def BuildHooksEnv(self):
2318 This runs on master, primary and secondary nodes of the instance.
2321 env = _BuildInstanceHookEnvByObject(self.instance)
2322 nl = [self.sstore.GetMasterNode()]
2325 def CheckPrereq(self):
2326 """Check prerequisites.
2328 This checks that the instance is in the cluster.
2331 instance = self.cfg.GetInstanceInfo(
2332 self.cfg.ExpandInstanceName(self.op.instance_name))
2333 if instance is None:
2334 raise errors.OpPrereqError("Instance '%s' not known" %
2335 self.op.instance_name)
2336 self.instance = instance
2338 def Exec(self, feedback_fn):
2339 """Remove the instance.
2342 instance = self.instance
2343 logger.Info("shutting down instance %s on node %s" %
2344 (instance.name, instance.primary_node))
2346 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2347 if self.op.ignore_failures:
2348 feedback_fn("Warning: can't shutdown instance")
2350 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2351 (instance.name, instance.primary_node))
2353 logger.Info("removing block devices for instance %s" % instance.name)
2355 if not _RemoveDisks(instance, self.cfg):
2356 if self.op.ignore_failures:
2357 feedback_fn("Warning: can't remove instance's disks")
2359 raise errors.OpExecError("Can't remove instance's disks")
2361 logger.Info("removing instance %s out of cluster config" % instance.name)
2363 self.cfg.RemoveInstance(instance.name)
2366 class LUQueryInstances(NoHooksLU):
2367 """Logical unit for querying instances.
2370 _OP_REQP = ["output_fields", "names"]
2372 def CheckPrereq(self):
2373 """Check prerequisites.
2375 This checks that the fields required are valid output fields.
2378 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2379 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2380 "admin_state", "admin_ram",
2381 "disk_template", "ip", "mac", "bridge",
2382 "sda_size", "sdb_size", "vcpus"],
2383 dynamic=self.dynamic_fields,
2384 selected=self.op.output_fields)
2386 self.wanted = _GetWantedInstances(self, self.op.names)
2388 def Exec(self, feedback_fn):
2389 """Computes the list of nodes and their attributes.
2392 instance_names = self.wanted
2393 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2396 # begin data gathering
2398 nodes = frozenset([inst.primary_node for inst in instance_list])
2401 if self.dynamic_fields.intersection(self.op.output_fields):
2403 node_data = rpc.call_all_instances_info(nodes)
2405 result = node_data[name]
2407 live_data.update(result)
2408 elif result == False:
2409 bad_nodes.append(name)
2410 # else no instance is alive
2412 live_data = dict([(name, {}) for name in instance_names])
2414 # end data gathering
2417 for instance in instance_list:
2419 for field in self.op.output_fields:
2424 elif field == "pnode":
2425 val = instance.primary_node
2426 elif field == "snodes":
2427 val = list(instance.secondary_nodes)
2428 elif field == "admin_state":
2429 val = (instance.status != "down")
2430 elif field == "oper_state":
2431 if instance.primary_node in bad_nodes:
2434 val = bool(live_data.get(instance.name))
2435 elif field == "status":
2436 if instance.primary_node in bad_nodes:
2437 val = "ERROR_nodedown"
2439 running = bool(live_data.get(instance.name))
2441 if instance.status != "down":
2446 if instance.status != "down":
2450 elif field == "admin_ram":
2451 val = instance.memory
2452 elif field == "oper_ram":
2453 if instance.primary_node in bad_nodes:
2455 elif instance.name in live_data:
2456 val = live_data[instance.name].get("memory", "?")
2459 elif field == "disk_template":
2460 val = instance.disk_template
2462 val = instance.nics[0].ip
2463 elif field == "bridge":
2464 val = instance.nics[0].bridge
2465 elif field == "mac":
2466 val = instance.nics[0].mac
2467 elif field == "sda_size" or field == "sdb_size":
2468 disk = instance.FindDisk(field[:3])
2473 elif field == "vcpus":
2474 val = instance.vcpus
2476 raise errors.ParameterError(field)
2483 class LUFailoverInstance(LogicalUnit):
2484 """Failover an instance.
2487 HPATH = "instance-failover"
2488 HTYPE = constants.HTYPE_INSTANCE
2489 _OP_REQP = ["instance_name", "ignore_consistency"]
2491 def BuildHooksEnv(self):
2494 This runs on master, primary and secondary nodes of the instance.
2498 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2500 env.update(_BuildInstanceHookEnvByObject(self.instance))
2501 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2504 def CheckPrereq(self):
2505 """Check prerequisites.
2507 This checks that the instance is in the cluster.
2510 instance = self.cfg.GetInstanceInfo(
2511 self.cfg.ExpandInstanceName(self.op.instance_name))
2512 if instance is None:
2513 raise errors.OpPrereqError("Instance '%s' not known" %
2514 self.op.instance_name)
2516 if instance.disk_template not in constants.DTS_NET_MIRROR:
2517 raise errors.OpPrereqError("Instance's disk layout is not"
2518 " network mirrored, cannot failover.")
2520 secondary_nodes = instance.secondary_nodes
2521 if not secondary_nodes:
2522 raise errors.ProgrammerError("no secondary node but using "
2523 "DT_REMOTE_RAID1 template")
2525 target_node = secondary_nodes[0]
2526 # check memory requirements on the secondary node
2527 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2528 instance.name, instance.memory)
2530 # check bridge existance
2531 brlist = [nic.bridge for nic in instance.nics]
2532 if not rpc.call_bridges_exist(target_node, brlist):
2533 raise errors.OpPrereqError("One or more target bridges %s does not"
2534 " exist on destination node '%s'" %
2535 (brlist, target_node))
2537 self.instance = instance
2539 def Exec(self, feedback_fn):
2540 """Failover an instance.
2542 The failover is done by shutting it down on its present node and
2543 starting it on the secondary.
2546 instance = self.instance
2548 source_node = instance.primary_node
2549 target_node = instance.secondary_nodes[0]
2551 feedback_fn("* checking disk consistency between source and target")
2552 for dev in instance.disks:
2553 # for remote_raid1, these are md over drbd
2554 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2555 if not self.op.ignore_consistency:
2556 raise errors.OpExecError("Disk %s is degraded on target node,"
2557 " aborting failover." % dev.iv_name)
2559 feedback_fn("* shutting down instance on source node")
2560 logger.Info("Shutting down instance %s on node %s" %
2561 (instance.name, source_node))
2563 if not rpc.call_instance_shutdown(source_node, instance):
2564 if self.op.ignore_consistency:
2565 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2566 " anyway. Please make sure node %s is down" %
2567 (instance.name, source_node, source_node))
2569 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2570 (instance.name, source_node))
2572 feedback_fn("* deactivating the instance's disks on source node")
2573 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2574 raise errors.OpExecError("Can't shut down the instance's disks.")
2576 instance.primary_node = target_node
2577 # distribute new instance config to the other nodes
2578 self.cfg.AddInstance(instance)
2580 feedback_fn("* activating the instance's disks on target node")
2581 logger.Info("Starting instance %s on node %s" %
2582 (instance.name, target_node))
2584 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2585 ignore_secondaries=True)
2587 _ShutdownInstanceDisks(instance, self.cfg)
2588 raise errors.OpExecError("Can't activate the instance's disks")
2590 feedback_fn("* starting the instance on the target node")
2591 if not rpc.call_instance_start(target_node, instance, None):
2592 _ShutdownInstanceDisks(instance, self.cfg)
2593 raise errors.OpExecError("Could not start instance %s on node %s." %
2594 (instance.name, target_node))
2597 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2598 """Create a tree of block devices on the primary node.
2600 This always creates all devices.
2604 for child in device.children:
2605 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2608 cfg.SetDiskID(device, node)
2609 new_id = rpc.call_blockdev_create(node, device, device.size,
2610 instance.name, True, info)
2613 if device.physical_id is None:
2614 device.physical_id = new_id
2618 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2619 """Create a tree of block devices on a secondary node.
2621 If this device type has to be created on secondaries, create it and
2624 If not, just recurse to children keeping the same 'force' value.
2627 if device.CreateOnSecondary():
2630 for child in device.children:
2631 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2632 child, force, info):
2637 cfg.SetDiskID(device, node)
2638 new_id = rpc.call_blockdev_create(node, device, device.size,
2639 instance.name, False, info)
2642 if device.physical_id is None:
2643 device.physical_id = new_id
2647 def _GenerateUniqueNames(cfg, exts):
2648 """Generate a suitable LV name.
2650 This will generate a logical volume name for the given instance.
2655 new_id = cfg.GenerateUniqueID()
2656 results.append("%s%s" % (new_id, val))
2660 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2661 """Generate a drbd device complete with its children.
2664 port = cfg.AllocatePort()
2665 vgname = cfg.GetVGName()
2666 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2667 logical_id=(vgname, names[0]))
2668 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2669 logical_id=(vgname, names[1]))
2670 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2671 logical_id = (primary, secondary, port),
2672 children = [dev_data, dev_meta])
2676 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2677 """Generate a drbd8 device complete with its children.
2680 port = cfg.AllocatePort()
2681 vgname = cfg.GetVGName()
2682 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2683 logical_id=(vgname, names[0]))
2684 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2685 logical_id=(vgname, names[1]))
2686 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2687 logical_id = (primary, secondary, port),
2688 children = [dev_data, dev_meta],
2693 def _GenerateDiskTemplate(cfg, template_name,
2694 instance_name, primary_node,
2695 secondary_nodes, disk_sz, swap_sz):
2696 """Generate the entire disk layout for a given template type.
2699 #TODO: compute space requirements
2701 vgname = cfg.GetVGName()
2702 if template_name == constants.DT_DISKLESS:
2704 elif template_name == constants.DT_PLAIN:
2705 if len(secondary_nodes) != 0:
2706 raise errors.ProgrammerError("Wrong template configuration")
2708 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2709 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2710 logical_id=(vgname, names[0]),
2712 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2713 logical_id=(vgname, names[1]),
2715 disks = [sda_dev, sdb_dev]
2716 elif template_name == constants.DT_DRBD8:
2717 if len(secondary_nodes) != 1:
2718 raise errors.ProgrammerError("Wrong template configuration")
2719 remote_node = secondary_nodes[0]
2720 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2721 ".sdb_data", ".sdb_meta"])
2722 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2723 disk_sz, names[0:2], "sda")
2724 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2725 swap_sz, names[2:4], "sdb")
2726 disks = [drbd_sda_dev, drbd_sdb_dev]
2728 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2732 def _GetInstanceInfoText(instance):
2733 """Compute that text that should be added to the disk's metadata.
2736 return "originstname+%s" % instance.name
2739 def _CreateDisks(cfg, instance):
2740 """Create all disks for an instance.
2742 This abstracts away some work from AddInstance.
2745 instance: the instance object
2748 True or False showing the success of the creation process
2751 info = _GetInstanceInfoText(instance)
2753 for device in instance.disks:
2754 logger.Info("creating volume %s for instance %s" %
2755 (device.iv_name, instance.name))
2757 for secondary_node in instance.secondary_nodes:
2758 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2759 device, False, info):
2760 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2761 (device.iv_name, device, secondary_node))
2764 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2765 instance, device, info):
2766 logger.Error("failed to create volume %s on primary!" %
2772 def _RemoveDisks(instance, cfg):
2773 """Remove all disks for an instance.
2775 This abstracts away some work from `AddInstance()` and
2776 `RemoveInstance()`. Note that in case some of the devices couldn't
2777 be removed, the removal will continue with the other ones (compare
2778 with `_CreateDisks()`).
2781 instance: the instance object
2784 True or False showing the success of the removal proces
2787 logger.Info("removing block devices for instance %s" % instance.name)
2790 for device in instance.disks:
2791 for node, disk in device.ComputeNodeTree(instance.primary_node):
2792 cfg.SetDiskID(disk, node)
2793 if not rpc.call_blockdev_remove(node, disk):
2794 logger.Error("could not remove block device %s on node %s,"
2795 " continuing anyway" %
2796 (device.iv_name, node))
2801 class LUCreateInstance(LogicalUnit):
2802 """Create an instance.
2805 HPATH = "instance-add"
2806 HTYPE = constants.HTYPE_INSTANCE
2807 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2808 "disk_template", "swap_size", "mode", "start", "vcpus",
2809 "wait_for_sync", "ip_check", "mac"]
2811 def BuildHooksEnv(self):
2814 This runs on master, primary and secondary nodes of the instance.
2818 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2819 "INSTANCE_DISK_SIZE": self.op.disk_size,
2820 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2821 "INSTANCE_ADD_MODE": self.op.mode,
2823 if self.op.mode == constants.INSTANCE_IMPORT:
2824 env["INSTANCE_SRC_NODE"] = self.op.src_node
2825 env["INSTANCE_SRC_PATH"] = self.op.src_path
2826 env["INSTANCE_SRC_IMAGE"] = self.src_image
2828 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2829 primary_node=self.op.pnode,
2830 secondary_nodes=self.secondaries,
2831 status=self.instance_status,
2832 os_type=self.op.os_type,
2833 memory=self.op.mem_size,
2834 vcpus=self.op.vcpus,
2835 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2838 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2843 def CheckPrereq(self):
2844 """Check prerequisites.
2847 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2848 if not hasattr(self.op, attr):
2849 setattr(self.op, attr, None)
2851 if self.op.mode not in (constants.INSTANCE_CREATE,
2852 constants.INSTANCE_IMPORT):
2853 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2856 if self.op.mode == constants.INSTANCE_IMPORT:
2857 src_node = getattr(self.op, "src_node", None)
2858 src_path = getattr(self.op, "src_path", None)
2859 if src_node is None or src_path is None:
2860 raise errors.OpPrereqError("Importing an instance requires source"
2861 " node and path options")
2862 src_node_full = self.cfg.ExpandNodeName(src_node)
2863 if src_node_full is None:
2864 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2865 self.op.src_node = src_node = src_node_full
2867 if not os.path.isabs(src_path):
2868 raise errors.OpPrereqError("The source path must be absolute")
2870 export_info = rpc.call_export_info(src_node, src_path)
2873 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2875 if not export_info.has_section(constants.INISECT_EXP):
2876 raise errors.ProgrammerError("Corrupted export config")
2878 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2879 if (int(ei_version) != constants.EXPORT_VERSION):
2880 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2881 (ei_version, constants.EXPORT_VERSION))
2883 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2884 raise errors.OpPrereqError("Can't import instance with more than"
2887 # FIXME: are the old os-es, disk sizes, etc. useful?
2888 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2889 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2891 self.src_image = diskimage
2892 else: # INSTANCE_CREATE
2893 if getattr(self.op, "os_type", None) is None:
2894 raise errors.OpPrereqError("No guest OS specified")
2896 # check primary node
2897 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2899 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2901 self.op.pnode = pnode.name
2903 self.secondaries = []
2904 # disk template and mirror node verification
2905 if self.op.disk_template not in constants.DISK_TEMPLATES:
2906 raise errors.OpPrereqError("Invalid disk template name")
2908 if self.op.disk_template in constants.DTS_NET_MIRROR:
2909 if getattr(self.op, "snode", None) is None:
2910 raise errors.OpPrereqError("The networked disk templates need"
2913 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2914 if snode_name is None:
2915 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2917 elif snode_name == pnode.name:
2918 raise errors.OpPrereqError("The secondary node cannot be"
2919 " the primary node.")
2920 self.secondaries.append(snode_name)
2922 # Required free disk space as a function of disk and swap space
2924 constants.DT_DISKLESS: None,
2925 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2926 # 256 MB are added for drbd metadata, 128MB for each drbd device
2927 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2930 if self.op.disk_template not in req_size_dict:
2931 raise errors.ProgrammerError("Disk template '%s' size requirement"
2932 " is unknown" % self.op.disk_template)
2934 req_size = req_size_dict[self.op.disk_template]
2936 # Check lv size requirements
2937 if req_size is not None:
2938 nodenames = [pnode.name] + self.secondaries
2939 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2940 for node in nodenames:
2941 info = nodeinfo.get(node, None)
2943 raise errors.OpPrereqError("Cannot get current information"
2944 " from node '%s'" % nodeinfo)
2945 vg_free = info.get('vg_free', None)
2946 if not isinstance(vg_free, int):
2947 raise errors.OpPrereqError("Can't compute free disk space on"
2949 if req_size > info['vg_free']:
2950 raise errors.OpPrereqError("Not enough disk space on target node %s."
2951 " %d MB available, %d MB required" %
2952 (node, info['vg_free'], req_size))
2955 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2957 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2958 " primary node" % self.op.os_type)
2960 if self.op.kernel_path == constants.VALUE_NONE:
2961 raise errors.OpPrereqError("Can't set instance kernel to none")
2963 # instance verification
2964 hostname1 = utils.HostInfo(self.op.instance_name)
2966 self.op.instance_name = instance_name = hostname1.name
2967 instance_list = self.cfg.GetInstanceList()
2968 if instance_name in instance_list:
2969 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2972 ip = getattr(self.op, "ip", None)
2973 if ip is None or ip.lower() == "none":
2975 elif ip.lower() == "auto":
2976 inst_ip = hostname1.ip
2978 if not utils.IsValidIP(ip):
2979 raise errors.OpPrereqError("given IP address '%s' doesn't look"
2980 " like a valid IP" % ip)
2982 self.inst_ip = inst_ip
2984 if self.op.start and not self.op.ip_check:
2985 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2986 " adding an instance in start mode")
2988 if self.op.ip_check:
2989 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
2990 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2991 (hostname1.ip, instance_name))
2993 # MAC address verification
2994 if self.op.mac != "auto":
2995 if not utils.IsValidMac(self.op.mac.lower()):
2996 raise errors.OpPrereqError("invalid MAC address specified: %s" %
2999 # bridge verification
3000 bridge = getattr(self.op, "bridge", None)
3002 self.op.bridge = self.cfg.GetDefBridge()
3004 self.op.bridge = bridge
3006 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3007 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3008 " destination node '%s'" %
3009 (self.op.bridge, pnode.name))
3011 # boot order verification
3012 if self.op.hvm_boot_order is not None:
3013 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3014 raise errors.OpPrereqError("invalid boot order specified,"
3015 " must be one or more of [acdn]")
3018 self.instance_status = 'up'
3020 self.instance_status = 'down'
3022 def Exec(self, feedback_fn):
3023 """Create and add the instance to the cluster.
3026 instance = self.op.instance_name
3027 pnode_name = self.pnode.name
3029 if self.op.mac == "auto":
3030 mac_address = self.cfg.GenerateMAC()
3032 mac_address = self.op.mac
3034 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3035 if self.inst_ip is not None:
3036 nic.ip = self.inst_ip
3038 ht_kind = self.sstore.GetHypervisorType()
3039 if ht_kind in constants.HTS_REQ_PORT:
3040 network_port = self.cfg.AllocatePort()
3044 disks = _GenerateDiskTemplate(self.cfg,
3045 self.op.disk_template,
3046 instance, pnode_name,
3047 self.secondaries, self.op.disk_size,
3050 iobj = objects.Instance(name=instance, os=self.op.os_type,
3051 primary_node=pnode_name,
3052 memory=self.op.mem_size,
3053 vcpus=self.op.vcpus,
3054 nics=[nic], disks=disks,
3055 disk_template=self.op.disk_template,
3056 status=self.instance_status,
3057 network_port=network_port,
3058 kernel_path=self.op.kernel_path,
3059 initrd_path=self.op.initrd_path,
3060 hvm_boot_order=self.op.hvm_boot_order,
3063 feedback_fn("* creating instance disks...")
3064 if not _CreateDisks(self.cfg, iobj):
3065 _RemoveDisks(iobj, self.cfg)
3066 raise errors.OpExecError("Device creation failed, reverting...")
3068 feedback_fn("adding instance %s to cluster config" % instance)
3070 self.cfg.AddInstance(iobj)
3072 if self.op.wait_for_sync:
3073 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3074 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3075 # make sure the disks are not degraded (still sync-ing is ok)
3077 feedback_fn("* checking mirrors status")
3078 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3083 _RemoveDisks(iobj, self.cfg)
3084 self.cfg.RemoveInstance(iobj.name)
3085 raise errors.OpExecError("There are some degraded disks for"
3088 feedback_fn("creating os for instance %s on node %s" %
3089 (instance, pnode_name))
3091 if iobj.disk_template != constants.DT_DISKLESS:
3092 if self.op.mode == constants.INSTANCE_CREATE:
3093 feedback_fn("* running the instance OS create scripts...")
3094 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3095 raise errors.OpExecError("could not add os for instance %s"
3097 (instance, pnode_name))
3099 elif self.op.mode == constants.INSTANCE_IMPORT:
3100 feedback_fn("* running the instance OS import scripts...")
3101 src_node = self.op.src_node
3102 src_image = self.src_image
3103 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3104 src_node, src_image):
3105 raise errors.OpExecError("Could not import os for instance"
3107 (instance, pnode_name))
3109 # also checked in the prereq part
3110 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3114 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3115 feedback_fn("* starting instance...")
3116 if not rpc.call_instance_start(pnode_name, iobj, None):
3117 raise errors.OpExecError("Could not start instance")
3120 class LUConnectConsole(NoHooksLU):
3121 """Connect to an instance's console.
3123 This is somewhat special in that it returns the command line that
3124 you need to run on the master node in order to connect to the
3128 _OP_REQP = ["instance_name"]
3130 def CheckPrereq(self):
3131 """Check prerequisites.
3133 This checks that the instance is in the cluster.
3136 instance = self.cfg.GetInstanceInfo(
3137 self.cfg.ExpandInstanceName(self.op.instance_name))
3138 if instance is None:
3139 raise errors.OpPrereqError("Instance '%s' not known" %
3140 self.op.instance_name)
3141 self.instance = instance
3143 def Exec(self, feedback_fn):
3144 """Connect to the console of an instance
3147 instance = self.instance
3148 node = instance.primary_node
3150 node_insts = rpc.call_instance_list([node])[node]
3151 if node_insts is False:
3152 raise errors.OpExecError("Can't connect to node %s." % node)
3154 if instance.name not in node_insts:
3155 raise errors.OpExecError("Instance %s is not running." % instance.name)
3157 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3159 hyper = hypervisor.GetHypervisor()
3160 console_cmd = hyper.GetShellCommandForConsole(instance)
3163 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3166 class LUReplaceDisks(LogicalUnit):
3167 """Replace the disks of an instance.
3170 HPATH = "mirrors-replace"
3171 HTYPE = constants.HTYPE_INSTANCE
3172 _OP_REQP = ["instance_name", "mode", "disks"]
3174 def BuildHooksEnv(self):
3177 This runs on the master, the primary and all the secondaries.
3181 "MODE": self.op.mode,
3182 "NEW_SECONDARY": self.op.remote_node,
3183 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3185 env.update(_BuildInstanceHookEnvByObject(self.instance))
3187 self.sstore.GetMasterNode(),
3188 self.instance.primary_node,
3190 if self.op.remote_node is not None:
3191 nl.append(self.op.remote_node)
3194 def CheckPrereq(self):
3195 """Check prerequisites.
3197 This checks that the instance is in the cluster.
3200 instance = self.cfg.GetInstanceInfo(
3201 self.cfg.ExpandInstanceName(self.op.instance_name))
3202 if instance is None:
3203 raise errors.OpPrereqError("Instance '%s' not known" %
3204 self.op.instance_name)
3205 self.instance = instance
3206 self.op.instance_name = instance.name
3208 if instance.disk_template not in constants.DTS_NET_MIRROR:
3209 raise errors.OpPrereqError("Instance's disk layout is not"
3210 " network mirrored.")
3212 if len(instance.secondary_nodes) != 1:
3213 raise errors.OpPrereqError("The instance has a strange layout,"
3214 " expected one secondary but found %d" %
3215 len(instance.secondary_nodes))
3217 self.sec_node = instance.secondary_nodes[0]
3219 remote_node = getattr(self.op, "remote_node", None)
3220 if remote_node is not None:
3221 remote_node = self.cfg.ExpandNodeName(remote_node)
3222 if remote_node is None:
3223 raise errors.OpPrereqError("Node '%s' not known" %
3224 self.op.remote_node)
3225 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3227 self.remote_node_info = None
3228 if remote_node == instance.primary_node:
3229 raise errors.OpPrereqError("The specified node is the primary node of"
3231 elif remote_node == self.sec_node:
3232 if self.op.mode == constants.REPLACE_DISK_SEC:
3233 # this is for DRBD8, where we can't execute the same mode of
3234 # replacement as for drbd7 (no different port allocated)
3235 raise errors.OpPrereqError("Same secondary given, cannot execute"
3237 # the user gave the current secondary, switch to
3238 # 'no-replace-secondary' mode for drbd7
3240 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3241 self.op.mode != constants.REPLACE_DISK_ALL):
3242 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3243 " disks replacement, not individual ones")
3244 if instance.disk_template == constants.DT_DRBD8:
3245 if (self.op.mode == constants.REPLACE_DISK_ALL and
3246 remote_node is not None):
3247 # switch to replace secondary mode
3248 self.op.mode = constants.REPLACE_DISK_SEC
3250 if self.op.mode == constants.REPLACE_DISK_ALL:
3251 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3252 " secondary disk replacement, not"
3254 elif self.op.mode == constants.REPLACE_DISK_PRI:
3255 if remote_node is not None:
3256 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3257 " the secondary while doing a primary"
3258 " node disk replacement")
3259 self.tgt_node = instance.primary_node
3260 self.oth_node = instance.secondary_nodes[0]
3261 elif self.op.mode == constants.REPLACE_DISK_SEC:
3262 self.new_node = remote_node # this can be None, in which case
3263 # we don't change the secondary
3264 self.tgt_node = instance.secondary_nodes[0]
3265 self.oth_node = instance.primary_node
3267 raise errors.ProgrammerError("Unhandled disk replace mode")
3269 for name in self.op.disks:
3270 if instance.FindDisk(name) is None:
3271 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3272 (name, instance.name))
3273 self.op.remote_node = remote_node
3275 def _ExecRR1(self, feedback_fn):
3276 """Replace the disks of an instance.
3279 instance = self.instance
3282 if self.op.remote_node is None:
3283 remote_node = self.sec_node
3285 remote_node = self.op.remote_node
3287 for dev in instance.disks:
3289 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3290 names = _GenerateUniqueNames(cfg, lv_names)
3291 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3292 remote_node, size, names)
3293 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3294 logger.Info("adding new mirror component on secondary for %s" %
3297 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3299 _GetInstanceInfoText(instance)):
3300 raise errors.OpExecError("Failed to create new component on secondary"
3301 " node %s. Full abort, cleanup manually!" %
3304 logger.Info("adding new mirror component on primary")
3306 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3308 _GetInstanceInfoText(instance)):
3309 # remove secondary dev
3310 cfg.SetDiskID(new_drbd, remote_node)
3311 rpc.call_blockdev_remove(remote_node, new_drbd)
3312 raise errors.OpExecError("Failed to create volume on primary!"
3313 " Full abort, cleanup manually!!")
3315 # the device exists now
3316 # call the primary node to add the mirror to md
3317 logger.Info("adding new mirror component to md")
3318 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3320 logger.Error("Can't add mirror compoment to md!")
3321 cfg.SetDiskID(new_drbd, remote_node)
3322 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3323 logger.Error("Can't rollback on secondary")
3324 cfg.SetDiskID(new_drbd, instance.primary_node)
3325 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3326 logger.Error("Can't rollback on primary")
3327 raise errors.OpExecError("Full abort, cleanup manually!!")
3329 dev.children.append(new_drbd)
3330 cfg.AddInstance(instance)
3332 # this can fail as the old devices are degraded and _WaitForSync
3333 # does a combined result over all disks, so we don't check its
3335 _WaitForSync(cfg, instance, self.proc, unlock=True)
3337 # so check manually all the devices
3338 for name in iv_names:
3339 dev, child, new_drbd = iv_names[name]
3340 cfg.SetDiskID(dev, instance.primary_node)
3341 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3343 raise errors.OpExecError("MD device %s is degraded!" % name)
3344 cfg.SetDiskID(new_drbd, instance.primary_node)
3345 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3347 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3349 for name in iv_names:
3350 dev, child, new_drbd = iv_names[name]
3351 logger.Info("remove mirror %s component" % name)
3352 cfg.SetDiskID(dev, instance.primary_node)
3353 if not rpc.call_blockdev_removechildren(instance.primary_node,
3355 logger.Error("Can't remove child from mirror, aborting"
3356 " *this device cleanup*.\nYou need to cleanup manually!!")
3359 for node in child.logical_id[:2]:
3360 logger.Info("remove child device on %s" % node)
3361 cfg.SetDiskID(child, node)
3362 if not rpc.call_blockdev_remove(node, child):
3363 logger.Error("Warning: failed to remove device from node %s,"
3364 " continuing operation." % node)
3366 dev.children.remove(child)
3368 cfg.AddInstance(instance)
3370 def _ExecD8DiskOnly(self, feedback_fn):
3371 """Replace a disk on the primary or secondary for dbrd8.
3373 The algorithm for replace is quite complicated:
3374 - for each disk to be replaced:
3375 - create new LVs on the target node with unique names
3376 - detach old LVs from the drbd device
3377 - rename old LVs to name_replaced.<time_t>
3378 - rename new LVs to old LVs
3379 - attach the new LVs (with the old names now) to the drbd device
3380 - wait for sync across all devices
3381 - for each modified disk:
3382 - remove old LVs (which have the name name_replaces.<time_t>)
3384 Failures are not very well handled.
3388 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3389 instance = self.instance
3391 vgname = self.cfg.GetVGName()
3394 tgt_node = self.tgt_node
3395 oth_node = self.oth_node
3397 # Step: check device activation
3398 self.proc.LogStep(1, steps_total, "check device existence")
3399 info("checking volume groups")
3400 my_vg = cfg.GetVGName()
3401 results = rpc.call_vg_list([oth_node, tgt_node])
3403 raise errors.OpExecError("Can't list volume groups on the nodes")
3404 for node in oth_node, tgt_node:
3405 res = results.get(node, False)
3406 if not res or my_vg not in res:
3407 raise errors.OpExecError("Volume group '%s' not found on %s" %
3409 for dev in instance.disks:
3410 if not dev.iv_name in self.op.disks:
3412 for node in tgt_node, oth_node:
3413 info("checking %s on %s" % (dev.iv_name, node))
3414 cfg.SetDiskID(dev, node)
3415 if not rpc.call_blockdev_find(node, dev):
3416 raise errors.OpExecError("Can't find device %s on node %s" %
3417 (dev.iv_name, node))
3419 # Step: check other node consistency
3420 self.proc.LogStep(2, steps_total, "check peer consistency")
3421 for dev in instance.disks:
3422 if not dev.iv_name in self.op.disks:
3424 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3425 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3426 oth_node==instance.primary_node):
3427 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3428 " to replace disks on this node (%s)" %
3429 (oth_node, tgt_node))
3431 # Step: create new storage
3432 self.proc.LogStep(3, steps_total, "allocate new storage")
3433 for dev in instance.disks:
3434 if not dev.iv_name in self.op.disks:
3437 cfg.SetDiskID(dev, tgt_node)
3438 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3439 names = _GenerateUniqueNames(cfg, lv_names)
3440 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3441 logical_id=(vgname, names[0]))
3442 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3443 logical_id=(vgname, names[1]))
3444 new_lvs = [lv_data, lv_meta]
3445 old_lvs = dev.children
3446 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3447 info("creating new local storage on %s for %s" %
3448 (tgt_node, dev.iv_name))
3449 # since we *always* want to create this LV, we use the
3450 # _Create...OnPrimary (which forces the creation), even if we
3451 # are talking about the secondary node
3452 for new_lv in new_lvs:
3453 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3454 _GetInstanceInfoText(instance)):
3455 raise errors.OpExecError("Failed to create new LV named '%s' on"
3457 (new_lv.logical_id[1], tgt_node))
3459 # Step: for each lv, detach+rename*2+attach
3460 self.proc.LogStep(4, steps_total, "change drbd configuration")
3461 for dev, old_lvs, new_lvs in iv_names.itervalues():
3462 info("detaching %s drbd from local storage" % dev.iv_name)
3463 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3464 raise errors.OpExecError("Can't detach drbd from local storage on node"
3465 " %s for device %s" % (tgt_node, dev.iv_name))
3467 #cfg.Update(instance)
3469 # ok, we created the new LVs, so now we know we have the needed
3470 # storage; as such, we proceed on the target node to rename
3471 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3472 # using the assumption that logical_id == physical_id (which in
3473 # turn is the unique_id on that node)
3475 # FIXME(iustin): use a better name for the replaced LVs
3476 temp_suffix = int(time.time())
3477 ren_fn = lambda d, suff: (d.physical_id[0],
3478 d.physical_id[1] + "_replaced-%s" % suff)
3479 # build the rename list based on what LVs exist on the node
3481 for to_ren in old_lvs:
3482 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3483 if find_res is not None: # device exists
3484 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3486 info("renaming the old LVs on the target node")
3487 if not rpc.call_blockdev_rename(tgt_node, rlist):
3488 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3489 # now we rename the new LVs to the old LVs
3490 info("renaming the new LVs on the target node")
3491 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3492 if not rpc.call_blockdev_rename(tgt_node, rlist):
3493 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3495 for old, new in zip(old_lvs, new_lvs):
3496 new.logical_id = old.logical_id
3497 cfg.SetDiskID(new, tgt_node)
3499 for disk in old_lvs:
3500 disk.logical_id = ren_fn(disk, temp_suffix)
3501 cfg.SetDiskID(disk, tgt_node)
3503 # now that the new lvs have the old name, we can add them to the device
3504 info("adding new mirror component on %s" % tgt_node)
3505 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3506 for new_lv in new_lvs:
3507 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3508 warning("Can't rollback device %s", hint="manually cleanup unused"
3510 raise errors.OpExecError("Can't add local storage to drbd")
3512 dev.children = new_lvs
3513 cfg.Update(instance)
3515 # Step: wait for sync
3517 # this can fail as the old devices are degraded and _WaitForSync
3518 # does a combined result over all disks, so we don't check its
3520 self.proc.LogStep(5, steps_total, "sync devices")
3521 _WaitForSync(cfg, instance, self.proc, unlock=True)
3523 # so check manually all the devices
3524 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3525 cfg.SetDiskID(dev, instance.primary_node)
3526 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3528 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3530 # Step: remove old storage
3531 self.proc.LogStep(6, steps_total, "removing old storage")
3532 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3533 info("remove logical volumes for %s" % name)
3535 cfg.SetDiskID(lv, tgt_node)
3536 if not rpc.call_blockdev_remove(tgt_node, lv):
3537 warning("Can't remove old LV", hint="manually remove unused LVs")
3540 def _ExecD8Secondary(self, feedback_fn):
3541 """Replace the secondary node for drbd8.
3543 The algorithm for replace is quite complicated:
3544 - for all disks of the instance:
3545 - create new LVs on the new node with same names
3546 - shutdown the drbd device on the old secondary
3547 - disconnect the drbd network on the primary
3548 - create the drbd device on the new secondary
3549 - network attach the drbd on the primary, using an artifice:
3550 the drbd code for Attach() will connect to the network if it
3551 finds a device which is connected to the good local disks but
3553 - wait for sync across all devices
3554 - remove all disks from the old secondary
3556 Failures are not very well handled.
3560 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3561 instance = self.instance
3563 vgname = self.cfg.GetVGName()
3566 old_node = self.tgt_node
3567 new_node = self.new_node
3568 pri_node = instance.primary_node
3570 # Step: check device activation
3571 self.proc.LogStep(1, steps_total, "check device existence")
3572 info("checking volume groups")
3573 my_vg = cfg.GetVGName()
3574 results = rpc.call_vg_list([pri_node, new_node])
3576 raise errors.OpExecError("Can't list volume groups on the nodes")
3577 for node in pri_node, new_node:
3578 res = results.get(node, False)
3579 if not res or my_vg not in res:
3580 raise errors.OpExecError("Volume group '%s' not found on %s" %
3582 for dev in instance.disks:
3583 if not dev.iv_name in self.op.disks:
3585 info("checking %s on %s" % (dev.iv_name, pri_node))
3586 cfg.SetDiskID(dev, pri_node)
3587 if not rpc.call_blockdev_find(pri_node, dev):
3588 raise errors.OpExecError("Can't find device %s on node %s" %
3589 (dev.iv_name, pri_node))
3591 # Step: check other node consistency
3592 self.proc.LogStep(2, steps_total, "check peer consistency")
3593 for dev in instance.disks:
3594 if not dev.iv_name in self.op.disks:
3596 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3597 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3598 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3599 " unsafe to replace the secondary" %
3602 # Step: create new storage
3603 self.proc.LogStep(3, steps_total, "allocate new storage")
3604 for dev in instance.disks:
3606 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3607 # since we *always* want to create this LV, we use the
3608 # _Create...OnPrimary (which forces the creation), even if we
3609 # are talking about the secondary node
3610 for new_lv in dev.children:
3611 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3612 _GetInstanceInfoText(instance)):
3613 raise errors.OpExecError("Failed to create new LV named '%s' on"
3615 (new_lv.logical_id[1], new_node))
3617 iv_names[dev.iv_name] = (dev, dev.children)
3619 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3620 for dev in instance.disks:
3622 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3623 # create new devices on new_node
3624 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3625 logical_id=(pri_node, new_node,
3627 children=dev.children)
3628 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3630 _GetInstanceInfoText(instance)):
3631 raise errors.OpExecError("Failed to create new DRBD on"
3632 " node '%s'" % new_node)
3634 for dev in instance.disks:
3635 # we have new devices, shutdown the drbd on the old secondary
3636 info("shutting down drbd for %s on old node" % dev.iv_name)
3637 cfg.SetDiskID(dev, old_node)
3638 if not rpc.call_blockdev_shutdown(old_node, dev):
3639 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3640 hint="Please cleanup this device manually as soon as possible")
3642 info("detaching primary drbds from the network (=> standalone)")
3644 for dev in instance.disks:
3645 cfg.SetDiskID(dev, pri_node)
3646 # set the physical (unique in bdev terms) id to None, meaning
3647 # detach from network
3648 dev.physical_id = (None,) * len(dev.physical_id)
3649 # and 'find' the device, which will 'fix' it to match the
3651 if rpc.call_blockdev_find(pri_node, dev):
3654 warning("Failed to detach drbd %s from network, unusual case" %
3658 # no detaches succeeded (very unlikely)
3659 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3661 # if we managed to detach at least one, we update all the disks of
3662 # the instance to point to the new secondary
3663 info("updating instance configuration")
3664 for dev in instance.disks:
3665 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3666 cfg.SetDiskID(dev, pri_node)
3667 cfg.Update(instance)
3669 # and now perform the drbd attach
3670 info("attaching primary drbds to new secondary (standalone => connected)")
3672 for dev in instance.disks:
3673 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3674 # since the attach is smart, it's enough to 'find' the device,
3675 # it will automatically activate the network, if the physical_id
3677 cfg.SetDiskID(dev, pri_node)
3678 if not rpc.call_blockdev_find(pri_node, dev):
3679 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3680 "please do a gnt-instance info to see the status of disks")
3682 # this can fail as the old devices are degraded and _WaitForSync
3683 # does a combined result over all disks, so we don't check its
3685 self.proc.LogStep(5, steps_total, "sync devices")
3686 _WaitForSync(cfg, instance, self.proc, unlock=True)
3688 # so check manually all the devices
3689 for name, (dev, old_lvs) in iv_names.iteritems():
3690 cfg.SetDiskID(dev, pri_node)
3691 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3693 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3695 self.proc.LogStep(6, steps_total, "removing old storage")
3696 for name, (dev, old_lvs) in iv_names.iteritems():
3697 info("remove logical volumes for %s" % name)
3699 cfg.SetDiskID(lv, old_node)
3700 if not rpc.call_blockdev_remove(old_node, lv):
3701 warning("Can't remove LV on old secondary",
3702 hint="Cleanup stale volumes by hand")
3704 def Exec(self, feedback_fn):
3705 """Execute disk replacement.
3707 This dispatches the disk replacement to the appropriate handler.
3710 instance = self.instance
3711 if instance.disk_template == constants.DT_REMOTE_RAID1:
3713 elif instance.disk_template == constants.DT_DRBD8:
3714 if self.op.remote_node is None:
3715 fn = self._ExecD8DiskOnly
3717 fn = self._ExecD8Secondary
3719 raise errors.ProgrammerError("Unhandled disk replacement case")
3720 return fn(feedback_fn)
3723 class LUQueryInstanceData(NoHooksLU):
3724 """Query runtime instance data.
3727 _OP_REQP = ["instances"]
3729 def CheckPrereq(self):
3730 """Check prerequisites.
3732 This only checks the optional instance list against the existing names.
3735 if not isinstance(self.op.instances, list):
3736 raise errors.OpPrereqError("Invalid argument type 'instances'")
3737 if self.op.instances:
3738 self.wanted_instances = []
3739 names = self.op.instances
3741 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3742 if instance is None:
3743 raise errors.OpPrereqError("No such instance name '%s'" % name)
3744 self.wanted_instances.append(instance)
3746 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3747 in self.cfg.GetInstanceList()]
3751 def _ComputeDiskStatus(self, instance, snode, dev):
3752 """Compute block device status.
3755 self.cfg.SetDiskID(dev, instance.primary_node)
3756 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3757 if dev.dev_type in constants.LDS_DRBD:
3758 # we change the snode then (otherwise we use the one passed in)
3759 if dev.logical_id[0] == instance.primary_node:
3760 snode = dev.logical_id[1]
3762 snode = dev.logical_id[0]
3765 self.cfg.SetDiskID(dev, snode)
3766 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3771 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3772 for child in dev.children]
3777 "iv_name": dev.iv_name,
3778 "dev_type": dev.dev_type,
3779 "logical_id": dev.logical_id,
3780 "physical_id": dev.physical_id,
3781 "pstatus": dev_pstatus,
3782 "sstatus": dev_sstatus,
3783 "children": dev_children,
3788 def Exec(self, feedback_fn):
3789 """Gather and return data"""
3791 for instance in self.wanted_instances:
3792 remote_info = rpc.call_instance_info(instance.primary_node,
3794 if remote_info and "state" in remote_info:
3797 remote_state = "down"
3798 if instance.status == "down":
3799 config_state = "down"
3803 disks = [self._ComputeDiskStatus(instance, None, device)
3804 for device in instance.disks]
3807 "name": instance.name,
3808 "config_state": config_state,
3809 "run_state": remote_state,
3810 "pnode": instance.primary_node,
3811 "snodes": instance.secondary_nodes,
3813 "memory": instance.memory,
3814 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3816 "network_port": instance.network_port,
3817 "vcpus": instance.vcpus,
3818 "kernel_path": instance.kernel_path,
3819 "initrd_path": instance.initrd_path,
3820 "hvm_boot_order": instance.hvm_boot_order,
3823 result[instance.name] = idict
3828 class LUSetInstanceParms(LogicalUnit):
3829 """Modifies an instances's parameters.
3832 HPATH = "instance-modify"
3833 HTYPE = constants.HTYPE_INSTANCE
3834 _OP_REQP = ["instance_name"]
3836 def BuildHooksEnv(self):
3839 This runs on the master, primary and secondaries.
3844 args['memory'] = self.mem
3846 args['vcpus'] = self.vcpus
3847 if self.do_ip or self.do_bridge or self.mac:
3851 ip = self.instance.nics[0].ip
3853 bridge = self.bridge
3855 bridge = self.instance.nics[0].bridge
3859 mac = self.instance.nics[0].mac
3860 args['nics'] = [(ip, bridge, mac)]
3861 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3862 nl = [self.sstore.GetMasterNode(),
3863 self.instance.primary_node] + list(self.instance.secondary_nodes)
3866 def CheckPrereq(self):
3867 """Check prerequisites.
3869 This only checks the instance list against the existing names.
3872 self.mem = getattr(self.op, "mem", None)
3873 self.vcpus = getattr(self.op, "vcpus", None)
3874 self.ip = getattr(self.op, "ip", None)
3875 self.mac = getattr(self.op, "mac", None)
3876 self.bridge = getattr(self.op, "bridge", None)
3877 self.kernel_path = getattr(self.op, "kernel_path", None)
3878 self.initrd_path = getattr(self.op, "initrd_path", None)
3879 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
3880 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
3881 self.kernel_path, self.initrd_path, self.hvm_boot_order]
3882 if all_parms.count(None) == len(all_parms):
3883 raise errors.OpPrereqError("No changes submitted")
3884 if self.mem is not None:
3886 self.mem = int(self.mem)
3887 except ValueError, err:
3888 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3889 if self.vcpus is not None:
3891 self.vcpus = int(self.vcpus)
3892 except ValueError, err:
3893 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3894 if self.ip is not None:
3896 if self.ip.lower() == "none":
3899 if not utils.IsValidIP(self.ip):
3900 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3903 self.do_bridge = (self.bridge is not None)
3904 if self.mac is not None:
3905 if self.cfg.IsMacInUse(self.mac):
3906 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
3908 if not utils.IsValidMac(self.mac):
3909 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
3911 if self.kernel_path is not None:
3912 self.do_kernel_path = True
3913 if self.kernel_path == constants.VALUE_NONE:
3914 raise errors.OpPrereqError("Can't set instance to no kernel")
3916 if self.kernel_path != constants.VALUE_DEFAULT:
3917 if not os.path.isabs(self.kernel_path):
3918 raise errors.OpPrereqError("The kernel path must be an absolute"
3921 self.do_kernel_path = False
3923 if self.initrd_path is not None:
3924 self.do_initrd_path = True
3925 if self.initrd_path not in (constants.VALUE_NONE,
3926 constants.VALUE_DEFAULT):
3927 if not os.path.isabs(self.initrd_path):
3928 raise errors.OpPrereqError("The initrd path must be an absolute"
3931 self.do_initrd_path = False
3933 # boot order verification
3934 if self.hvm_boot_order is not None:
3935 if self.hvm_boot_order != constants.VALUE_DEFAULT:
3936 if len(self.hvm_boot_order.strip("acdn")) != 0:
3937 raise errors.OpPrereqError("invalid boot order specified,"
3938 " must be one or more of [acdn]"
3941 instance = self.cfg.GetInstanceInfo(
3942 self.cfg.ExpandInstanceName(self.op.instance_name))
3943 if instance is None:
3944 raise errors.OpPrereqError("No such instance name '%s'" %
3945 self.op.instance_name)
3946 self.op.instance_name = instance.name
3947 self.instance = instance
3950 def Exec(self, feedback_fn):
3951 """Modifies an instance.
3953 All parameters take effect only at the next restart of the instance.
3956 instance = self.instance
3958 instance.memory = self.mem
3959 result.append(("mem", self.mem))
3961 instance.vcpus = self.vcpus
3962 result.append(("vcpus", self.vcpus))
3964 instance.nics[0].ip = self.ip
3965 result.append(("ip", self.ip))
3967 instance.nics[0].bridge = self.bridge
3968 result.append(("bridge", self.bridge))
3970 instance.nics[0].mac = self.mac
3971 result.append(("mac", self.mac))
3972 if self.do_kernel_path:
3973 instance.kernel_path = self.kernel_path
3974 result.append(("kernel_path", self.kernel_path))
3975 if self.do_initrd_path:
3976 instance.initrd_path = self.initrd_path
3977 result.append(("initrd_path", self.initrd_path))
3978 if self.hvm_boot_order:
3979 if self.hvm_boot_order == constants.VALUE_DEFAULT:
3980 instance.hvm_boot_order = None
3982 instance.hvm_boot_order = self.hvm_boot_order
3983 result.append(("hvm_boot_order", self.hvm_boot_order))
3985 self.cfg.AddInstance(instance)
3990 class LUQueryExports(NoHooksLU):
3991 """Query the exports list
3996 def CheckPrereq(self):
3997 """Check that the nodelist contains only existing nodes.
4000 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4002 def Exec(self, feedback_fn):
4003 """Compute the list of all the exported system images.
4006 a dictionary with the structure node->(export-list)
4007 where export-list is a list of the instances exported on
4011 return rpc.call_export_list(self.nodes)
4014 class LUExportInstance(LogicalUnit):
4015 """Export an instance to an image in the cluster.
4018 HPATH = "instance-export"
4019 HTYPE = constants.HTYPE_INSTANCE
4020 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4022 def BuildHooksEnv(self):
4025 This will run on the master, primary node and target node.
4029 "EXPORT_NODE": self.op.target_node,
4030 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4032 env.update(_BuildInstanceHookEnvByObject(self.instance))
4033 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4034 self.op.target_node]
4037 def CheckPrereq(self):
4038 """Check prerequisites.
4040 This checks that the instance name is a valid one.
4043 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4044 self.instance = self.cfg.GetInstanceInfo(instance_name)
4045 if self.instance is None:
4046 raise errors.OpPrereqError("Instance '%s' not found" %
4047 self.op.instance_name)
4050 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4051 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4053 if self.dst_node is None:
4054 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4055 self.op.target_node)
4056 self.op.target_node = self.dst_node.name
4058 def Exec(self, feedback_fn):
4059 """Export an instance to an image in the cluster.
4062 instance = self.instance
4063 dst_node = self.dst_node
4064 src_node = instance.primary_node
4065 # shutdown the instance, unless requested not to do so
4066 if self.op.shutdown:
4067 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4068 self.proc.ChainOpCode(op)
4070 vgname = self.cfg.GetVGName()
4075 for disk in instance.disks:
4076 if disk.iv_name == "sda":
4077 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4078 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4080 if not new_dev_name:
4081 logger.Error("could not snapshot block device %s on node %s" %
4082 (disk.logical_id[1], src_node))
4084 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4085 logical_id=(vgname, new_dev_name),
4086 physical_id=(vgname, new_dev_name),
4087 iv_name=disk.iv_name)
4088 snap_disks.append(new_dev)
4091 if self.op.shutdown:
4092 op = opcodes.OpStartupInstance(instance_name=instance.name,
4094 self.proc.ChainOpCode(op)
4096 # TODO: check for size
4098 for dev in snap_disks:
4099 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4101 logger.Error("could not export block device %s from node"
4103 (dev.logical_id[1], src_node, dst_node.name))
4104 if not rpc.call_blockdev_remove(src_node, dev):
4105 logger.Error("could not remove snapshot block device %s from"
4106 " node %s" % (dev.logical_id[1], src_node))
4108 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4109 logger.Error("could not finalize export for instance %s on node %s" %
4110 (instance.name, dst_node.name))
4112 nodelist = self.cfg.GetNodeList()
4113 nodelist.remove(dst_node.name)
4115 # on one-node clusters nodelist will be empty after the removal
4116 # if we proceed the backup would be removed because OpQueryExports
4117 # substitutes an empty list with the full cluster node list.
4119 op = opcodes.OpQueryExports(nodes=nodelist)
4120 exportlist = self.proc.ChainOpCode(op)
4121 for node in exportlist:
4122 if instance.name in exportlist[node]:
4123 if not rpc.call_export_remove(node, instance.name):
4124 logger.Error("could not remove older export for instance %s"
4125 " on node %s" % (instance.name, node))
4128 class TagsLU(NoHooksLU):
4131 This is an abstract class which is the parent of all the other tags LUs.
4134 def CheckPrereq(self):
4135 """Check prerequisites.
4138 if self.op.kind == constants.TAG_CLUSTER:
4139 self.target = self.cfg.GetClusterInfo()
4140 elif self.op.kind == constants.TAG_NODE:
4141 name = self.cfg.ExpandNodeName(self.op.name)
4143 raise errors.OpPrereqError("Invalid node name (%s)" %
4146 self.target = self.cfg.GetNodeInfo(name)
4147 elif self.op.kind == constants.TAG_INSTANCE:
4148 name = self.cfg.ExpandInstanceName(self.op.name)
4150 raise errors.OpPrereqError("Invalid instance name (%s)" %
4153 self.target = self.cfg.GetInstanceInfo(name)
4155 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4159 class LUGetTags(TagsLU):
4160 """Returns the tags of a given object.
4163 _OP_REQP = ["kind", "name"]
4165 def Exec(self, feedback_fn):
4166 """Returns the tag list.
4169 return self.target.GetTags()
4172 class LUSearchTags(NoHooksLU):
4173 """Searches the tags for a given pattern.
4176 _OP_REQP = ["pattern"]
4178 def CheckPrereq(self):
4179 """Check prerequisites.
4181 This checks the pattern passed for validity by compiling it.
4185 self.re = re.compile(self.op.pattern)
4186 except re.error, err:
4187 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4188 (self.op.pattern, err))
4190 def Exec(self, feedback_fn):
4191 """Returns the tag list.
4195 tgts = [("/cluster", cfg.GetClusterInfo())]
4196 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4197 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4198 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4199 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4201 for path, target in tgts:
4202 for tag in target.GetTags():
4203 if self.re.search(tag):
4204 results.append((path, tag))
4208 class LUAddTags(TagsLU):
4209 """Sets a tag on a given object.
4212 _OP_REQP = ["kind", "name", "tags"]
4214 def CheckPrereq(self):
4215 """Check prerequisites.
4217 This checks the type and length of the tag name and value.
4220 TagsLU.CheckPrereq(self)
4221 for tag in self.op.tags:
4222 objects.TaggableObject.ValidateTag(tag)
4224 def Exec(self, feedback_fn):
4229 for tag in self.op.tags:
4230 self.target.AddTag(tag)
4231 except errors.TagError, err:
4232 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4234 self.cfg.Update(self.target)
4235 except errors.ConfigurationError:
4236 raise errors.OpRetryError("There has been a modification to the"
4237 " config file and the operation has been"
4238 " aborted. Please retry.")
4241 class LUDelTags(TagsLU):
4242 """Delete a list of tags from a given object.
4245 _OP_REQP = ["kind", "name", "tags"]
4247 def CheckPrereq(self):
4248 """Check prerequisites.
4250 This checks that we have the given tag.
4253 TagsLU.CheckPrereq(self)
4254 for tag in self.op.tags:
4255 objects.TaggableObject.ValidateTag(tag)
4256 del_tags = frozenset(self.op.tags)
4257 cur_tags = self.target.GetTags()
4258 if not del_tags <= cur_tags:
4259 diff_tags = del_tags - cur_tags
4260 diff_names = ["'%s'" % tag for tag in diff_tags]
4262 raise errors.OpPrereqError("Tag(s) %s not found" %
4263 (",".join(diff_names)))
4265 def Exec(self, feedback_fn):
4266 """Remove the tag from the object.
4269 for tag in self.op.tags:
4270 self.target.RemoveTag(tag)
4272 self.cfg.Update(self.target)
4273 except errors.ConfigurationError:
4274 raise errors.OpRetryError("There has been a modification to the"
4275 " config file and the operation has been"
4276 " aborted. Please retry.")
4278 class LUTestDelay(NoHooksLU):
4279 """Sleep for a specified amount of time.
4281 This LU sleeps on the master and/or nodes for a specified amoutn of
4285 _OP_REQP = ["duration", "on_master", "on_nodes"]
4287 def CheckPrereq(self):
4288 """Check prerequisites.
4290 This checks that we have a good list of nodes and/or the duration
4295 if self.op.on_nodes:
4296 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4298 def Exec(self, feedback_fn):
4299 """Do the actual sleep.
4302 if self.op.on_master:
4303 if not utils.TestDelay(self.op.duration):
4304 raise errors.OpExecError("Error during master delay test")
4305 if self.op.on_nodes:
4306 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4308 raise errors.OpExecError("Complete failure from rpc call")
4309 for node, node_result in result.items():
4311 raise errors.OpExecError("Failure during rpc call to node %s,"
4312 " result: %s" % (node, node_result))