4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
73 self.processor = processor
77 for attr_name in self._OP_REQP:
78 attr_val = getattr(op, attr_name, None)
80 raise errors.OpPrereqError("Required parameter '%s' missing" %
83 if not cfg.IsCluster():
84 raise errors.OpPrereqError("Cluster not initialized yet,"
85 " use 'gnt-cluster init' first.")
87 master = sstore.GetMasterNode()
88 if master != utils.HostInfo().name:
89 raise errors.OpPrereqError("Commands must be run on the master"
92 def CheckPrereq(self):
93 """Check prerequisites for this LU.
95 This method should check that the prerequisites for the execution
96 of this LU are fulfilled. It can do internode communication, but
97 it should be idempotent - no cluster or system changes are
100 The method should raise errors.OpPrereqError in case something is
101 not fulfilled. Its return value is ignored.
103 This method should also update all the parameters of the opcode to
104 their canonical form; e.g. a short node name must be fully
105 expanded after this method has successfully completed (so that
106 hooks, logging, etc. work correctly).
109 raise NotImplementedError
111 def Exec(self, feedback_fn):
114 This method should implement the actual work. It should raise
115 errors.OpExecError for failures that are somewhat dealt with in
119 raise NotImplementedError
121 def BuildHooksEnv(self):
122 """Build hooks environment for this LU.
124 This method should return a three-node tuple consisting of: a dict
125 containing the environment that will be used for running the
126 specific hook for this LU, a list of node names on which the hook
127 should run before the execution, and a list of node names on which
128 the hook should run after the execution.
130 The keys of the dict must not have 'GANETI_' prefixed as this will
131 be handled in the hooks runner. Also note additional keys will be
132 added by the hooks runner. If the LU doesn't define any
133 environment, an empty dict (and not None) should be returned.
135 As for the node lists, the master should not be included in the
136 them, as it will be added by the hooks runner in case this LU
137 requires a cluster to run on (otherwise we don't have a node
138 list). No nodes should be returned as an empty list (and not
141 Note that if the HPATH for a LU class is None, this function will
145 raise NotImplementedError
148 class NoHooksLU(LogicalUnit):
149 """Simple LU which runs no hooks.
151 This LU is intended as a parent for other LogicalUnits which will
152 run no hooks, in order to reduce duplicate code.
158 def BuildHooksEnv(self):
161 This is a no-op, since we don't run hooks.
167 def _GetWantedNodes(lu, nodes):
168 """Returns list of checked and expanded node names.
171 nodes: List of nodes (strings) or None for all
174 if not isinstance(nodes, list):
175 raise errors.OpPrereqError("Invalid argument type 'nodes'")
181 node = lu.cfg.ExpandNodeName(name)
183 raise errors.OpPrereqError("No such node name '%s'" % name)
187 wanted = lu.cfg.GetNodeList()
188 return utils.NiceSort(wanted)
191 def _GetWantedInstances(lu, instances):
192 """Returns list of checked and expanded instance names.
195 instances: List of instances (strings) or None for all
198 if not isinstance(instances, list):
199 raise errors.OpPrereqError("Invalid argument type 'instances'")
204 for name in instances:
205 instance = lu.cfg.ExpandInstanceName(name)
207 raise errors.OpPrereqError("No such instance name '%s'" % name)
208 wanted.append(instance)
211 wanted = lu.cfg.GetInstanceList()
212 return utils.NiceSort(wanted)
215 def _CheckOutputFields(static, dynamic, selected):
216 """Checks whether all selected fields are valid.
219 static: Static fields
220 dynamic: Dynamic fields
223 static_fields = frozenset(static)
224 dynamic_fields = frozenset(dynamic)
226 all_fields = static_fields | dynamic_fields
228 if not all_fields.issuperset(selected):
229 raise errors.OpPrereqError("Unknown output fields selected: %s"
230 % ",".join(frozenset(selected).
231 difference(all_fields)))
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235 memory, vcpus, nics):
236 """Builds instance related env variables for hooks from single variables.
239 secondary_nodes: List of secondary nodes as strings
243 "INSTANCE_NAME": name,
244 "INSTANCE_PRIMARY": primary_node,
245 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246 "INSTANCE_OS_TYPE": os_type,
247 "INSTANCE_STATUS": status,
248 "INSTANCE_MEMORY": memory,
249 "INSTANCE_VCPUS": vcpus,
253 nic_count = len(nics)
254 for idx, (ip, bridge) in enumerate(nics):
257 env["INSTANCE_NIC%d_IP" % idx] = ip
258 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
262 env["INSTANCE_NIC_COUNT"] = nic_count
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268 """Builds instance related env variables for hooks from an object.
271 instance: objects.Instance object of instance
272 override: dict of values to override
275 'name': instance.name,
276 'primary_node': instance.primary_node,
277 'secondary_nodes': instance.secondary_nodes,
278 'os_type': instance.os,
279 'status': instance.os,
280 'memory': instance.memory,
281 'vcpus': instance.vcpus,
282 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
285 args.update(override)
286 return _BuildInstanceHookEnv(**args)
289 def _UpdateEtcHosts(fullnode, ip):
290 """Ensure a node has a correct entry in /etc/hosts.
293 fullnode - Fully qualified domain name of host. (str)
294 ip - IPv4 address of host (str)
297 node = fullnode.split(".", 1)[0]
299 f = open('/etc/hosts', 'r+')
308 rawline = f.readline()
314 line = rawline.split('\n')[0]
317 line = line.split('#')[0]
320 # Entire line was comment, skip
321 save_lines.append(rawline)
324 fields = line.split()
328 for spec in [ ip, fullnode, node ]:
329 if spec not in fields:
336 save_lines.append(rawline)
339 if havesome and not haveall:
340 # Line (old, or manual?) which is missing some. Remove.
344 save_lines.append(rawline)
347 add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
351 save_lines = save_lines + add_lines
353 # We removed a line, write a new file and replace old.
354 fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355 newfile = os.fdopen(fd, 'w')
356 newfile.write(''.join(save_lines))
358 os.rename(tmpname, '/etc/hosts')
361 # Simply appending a new line will do the trick.
363 for add in add_lines:
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370 """Ensure a node has a correct known_hosts entry.
373 fullnode - Fully qualified domain name of host. (str)
374 ip - IPv4 address of host (str)
375 pubkey - the public key of the cluster
378 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
381 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
390 rawline = f.readline()
391 logger.Debug('read %s' % (repr(rawline),))
397 line = rawline.split('\n')[0]
399 parts = line.split(' ')
400 fields = parts[0].split(',')
405 for spec in [ ip, fullnode ]:
406 if spec not in fields:
411 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
412 if haveall and key == pubkey:
414 save_lines.append(rawline)
415 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
418 if havesome and (not haveall or key != pubkey):
420 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
423 save_lines.append(rawline)
426 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
427 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
430 save_lines = save_lines + add_lines
432 # Write a new file and replace old.
433 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
435 newfile = os.fdopen(fd, 'w')
437 newfile.write(''.join(save_lines))
440 logger.Debug("Wrote new known_hosts.")
441 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
444 # Simply appending a new line will do the trick.
446 for add in add_lines:
452 def _HasValidVG(vglist, vgname):
453 """Checks if the volume group list is valid.
455 A non-None return value means there's an error, and the return value
456 is the error message.
459 vgsize = vglist.get(vgname, None)
461 return "volume group '%s' missing" % vgname
463 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
468 def _InitSSHSetup(node):
469 """Setup the SSH configuration for the cluster.
472 This generates a dsa keypair for root, adds the pub key to the
473 permitted hosts and adds the hostkey to its own known hosts.
476 node: the name of this host as a fqdn
479 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
481 for name in priv_key, pub_key:
482 if os.path.exists(name):
483 utils.CreateBackup(name)
484 utils.RemoveFile(name)
486 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
490 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
493 f = open(pub_key, 'r')
495 utils.AddAuthorizedKey(auth_keys, f.read(8192))
500 def _InitGanetiServerSetup(ss):
501 """Setup the necessary configuration for the initial node daemon.
503 This creates the nodepass file containing the shared password for
504 the cluster and also generates the SSL certificate.
507 # Create pseudo random password
508 randpass = sha.new(os.urandom(64)).hexdigest()
509 # and write it into sstore
510 ss.SetKey(ss.SS_NODED_PASS, randpass)
512 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513 "-days", str(365*5), "-nodes", "-x509",
514 "-keyout", constants.SSL_CERT_FILE,
515 "-out", constants.SSL_CERT_FILE, "-batch"])
517 raise errors.OpExecError("could not generate server ssl cert, command"
518 " %s had exitcode %s and error message %s" %
519 (result.cmd, result.exit_code, result.output))
521 os.chmod(constants.SSL_CERT_FILE, 0400)
523 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
526 raise errors.OpExecError("Could not start the node daemon, command %s"
527 " had exitcode %s and error %s" %
528 (result.cmd, result.exit_code, result.output))
531 def _CheckInstanceBridgesExist(instance):
532 """Check that the brigdes needed by an instance exist.
535 # check bridges existance
536 brlist = [nic.bridge for nic in instance.nics]
537 if not rpc.call_bridges_exist(instance.primary_node, brlist):
538 raise errors.OpPrereqError("one or more target bridges %s does not"
539 " exist on destination node '%s'" %
540 (brlist, instance.primary_node))
543 class LUInitCluster(LogicalUnit):
544 """Initialise the cluster.
547 HPATH = "cluster-init"
548 HTYPE = constants.HTYPE_CLUSTER
549 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
550 "def_bridge", "master_netdev"]
553 def BuildHooksEnv(self):
556 Notes: Since we don't require a cluster, we must manually add
557 ourselves in the post-run node list.
560 env = {"OP_TARGET": self.op.cluster_name}
561 return env, [], [self.hostname.name]
563 def CheckPrereq(self):
564 """Verify that the passed name is a valid one.
567 if config.ConfigWriter.IsCluster():
568 raise errors.OpPrereqError("Cluster is already initialised")
570 self.hostname = hostname = utils.HostInfo()
572 if hostname.ip.startswith("127."):
573 raise errors.OpPrereqError("This host's IP resolves to the private"
574 " range (%s). Please fix DNS or /etc/hosts." %
577 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
579 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
580 constants.DEFAULT_NODED_PORT):
581 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
582 " to %s,\nbut this ip address does not"
583 " belong to this host."
584 " Aborting." % hostname.ip)
586 secondary_ip = getattr(self.op, "secondary_ip", None)
587 if secondary_ip and not utils.IsValidIP(secondary_ip):
588 raise errors.OpPrereqError("Invalid secondary ip given")
590 secondary_ip != hostname.ip and
591 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
592 constants.DEFAULT_NODED_PORT))):
593 raise errors.OpPrereqError("You gave %s as secondary IP,\n"
594 "but it does not belong to this host." %
596 self.secondary_ip = secondary_ip
598 # checks presence of the volume group given
599 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
602 raise errors.OpPrereqError("Error: %s" % vgstatus)
604 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
606 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
609 if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
610 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
611 self.op.hypervisor_type)
613 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
615 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
616 (self.op.master_netdev,
617 result.output.strip()))
619 def Exec(self, feedback_fn):
620 """Initialize the cluster.
623 clustername = self.clustername
624 hostname = self.hostname
626 # set up the simple store
627 self.sstore = ss = ssconf.SimpleStore()
628 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
634 # set up the inter-node password and certificate
635 _InitGanetiServerSetup(ss)
637 # start the master ip
638 rpc.call_node_start_master(hostname.name)
640 # set up ssh config and /etc/hosts
641 f = open(constants.SSH_HOST_RSA_PUB, 'r')
646 sshkey = sshline.split(" ")[1]
648 _UpdateEtcHosts(hostname.name, hostname.ip)
650 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
652 _InitSSHSetup(hostname.name)
654 # init of cluster config file
655 self.cfg = cfgw = config.ConfigWriter()
656 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657 sshkey, self.op.mac_prefix,
658 self.op.vg_name, self.op.def_bridge)
661 class LUDestroyCluster(NoHooksLU):
662 """Logical unit for destroying the cluster.
667 def CheckPrereq(self):
668 """Check prerequisites.
670 This checks whether the cluster is empty.
672 Any errors are signalled by raising errors.OpPrereqError.
675 master = self.sstore.GetMasterNode()
677 nodelist = self.cfg.GetNodeList()
678 if len(nodelist) != 1 or nodelist[0] != master:
679 raise errors.OpPrereqError("There are still %d node(s) in"
680 " this cluster." % (len(nodelist) - 1))
681 instancelist = self.cfg.GetInstanceList()
683 raise errors.OpPrereqError("There are still %d instance(s) in"
684 " this cluster." % len(instancelist))
686 def Exec(self, feedback_fn):
687 """Destroys the cluster.
690 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691 utils.CreateBackup(priv_key)
692 utils.CreateBackup(pub_key)
693 rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
696 class LUVerifyCluster(NoHooksLU):
697 """Verifies the cluster status.
702 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703 remote_version, feedback_fn):
704 """Run multiple tests against a node.
707 - compares ganeti version
708 - checks vg existance and size > 20G
709 - checks config file checksum
710 - checks ssh to other nodes
713 node: name of the node to check
714 file_list: required list of files
715 local_cksum: dictionary of local files and their checksums
718 # compares ganeti version
719 local_version = constants.PROTOCOL_VERSION
720 if not remote_version:
721 feedback_fn(" - ERROR: connection to %s failed" % (node))
724 if local_version != remote_version:
725 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
726 (local_version, node, remote_version))
729 # checks vg existance and size > 20G
733 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
737 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
739 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
742 # checks config file checksum
745 if 'filelist' not in node_result:
747 feedback_fn(" - ERROR: node hasn't returned file checksum data")
749 remote_cksum = node_result['filelist']
750 for file_name in file_list:
751 if file_name not in remote_cksum:
753 feedback_fn(" - ERROR: file '%s' missing" % file_name)
754 elif remote_cksum[file_name] != local_cksum[file_name]:
756 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
758 if 'nodelist' not in node_result:
760 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
762 if node_result['nodelist']:
764 for node in node_result['nodelist']:
765 feedback_fn(" - ERROR: communication with node '%s': %s" %
766 (node, node_result['nodelist'][node]))
767 hyp_result = node_result.get('hypervisor', None)
768 if hyp_result is not None:
769 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
772 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773 """Verify an instance.
775 This function checks to see if the required block devices are
776 available on the instance's node.
781 instancelist = self.cfg.GetInstanceList()
782 if not instance in instancelist:
783 feedback_fn(" - ERROR: instance %s not in instance list %s" %
784 (instance, instancelist))
787 instanceconfig = self.cfg.GetInstanceInfo(instance)
788 node_current = instanceconfig.primary_node
791 instanceconfig.MapLVsByNode(node_vol_should)
793 for node in node_vol_should:
794 for volume in node_vol_should[node]:
795 if node not in node_vol_is or volume not in node_vol_is[node]:
796 feedback_fn(" - ERROR: volume %s missing on node %s" %
800 if not instanceconfig.status == 'down':
801 if not instance in node_instance[node_current]:
802 feedback_fn(" - ERROR: instance %s not running on node %s" %
803 (instance, node_current))
806 for node in node_instance:
807 if (not node == node_current):
808 if instance in node_instance[node]:
809 feedback_fn(" - ERROR: instance %s should not run on node %s" %
815 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816 """Verify if there are any unknown volumes in the cluster.
818 The .os, .swap and backup volumes are ignored. All other volumes are
824 for node in node_vol_is:
825 for volume in node_vol_is[node]:
826 if node not in node_vol_should or volume not in node_vol_should[node]:
827 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
832 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833 """Verify the list of running instances.
835 This checks what instances are running but unknown to the cluster.
839 for node in node_instance:
840 for runninginstance in node_instance[node]:
841 if runninginstance not in instancelist:
842 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
843 (runninginstance, node))
847 def CheckPrereq(self):
848 """Check prerequisites.
850 This has no prerequisites.
855 def Exec(self, feedback_fn):
856 """Verify integrity of cluster, performing various test on nodes.
860 feedback_fn("* Verifying global settings")
861 self.cfg.VerifyConfig()
863 master = self.sstore.GetMasterNode()
864 vg_name = self.cfg.GetVGName()
865 nodelist = utils.NiceSort(self.cfg.GetNodeList())
866 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870 # FIXME: verify OS list
872 file_names = list(self.sstore.GetFileList())
873 file_names.append(constants.SSL_CERT_FILE)
874 file_names.append(constants.CLUSTER_CONF_FILE)
875 local_checksums = utils.FingerprintFiles(file_names)
877 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
878 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
879 all_instanceinfo = rpc.call_instance_list(nodelist)
880 all_vglist = rpc.call_vg_list(nodelist)
881 node_verify_param = {
882 'filelist': file_names,
883 'nodelist': nodelist,
886 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
887 all_rversion = rpc.call_version(nodelist)
889 for node in nodelist:
890 feedback_fn("* Verifying node %s" % node)
891 result = self._VerifyNode(node, file_names, local_checksums,
892 all_vglist[node], all_nvinfo[node],
893 all_rversion[node], feedback_fn)
897 volumeinfo = all_volumeinfo[node]
899 if type(volumeinfo) != dict:
900 feedback_fn(" - ERROR: connection to %s failed" % (node,))
904 node_volume[node] = volumeinfo
907 nodeinstance = all_instanceinfo[node]
908 if type(nodeinstance) != list:
909 feedback_fn(" - ERROR: connection to %s failed" % (node,))
913 node_instance[node] = nodeinstance
917 for instance in instancelist:
918 feedback_fn("* Verifying instance %s" % instance)
919 result = self._VerifyInstance(instance, node_volume, node_instance,
923 inst_config = self.cfg.GetInstanceInfo(instance)
925 inst_config.MapLVsByNode(node_vol_should)
927 feedback_fn("* Verifying orphan volumes")
928 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
932 feedback_fn("* Verifying remaining instances")
933 result = self._VerifyOrphanInstances(instancelist, node_instance,
940 class LURenameCluster(LogicalUnit):
941 """Rename the cluster.
944 HPATH = "cluster-rename"
945 HTYPE = constants.HTYPE_CLUSTER
948 def BuildHooksEnv(self):
953 "OP_TARGET": self.op.sstore.GetClusterName(),
954 "NEW_NAME": self.op.name,
956 mn = self.sstore.GetMasterNode()
957 return env, [mn], [mn]
959 def CheckPrereq(self):
960 """Verify that the passed name is a valid one.
963 hostname = utils.HostInfo(self.op.name)
965 new_name = hostname.name
966 self.ip = new_ip = hostname.ip
967 old_name = self.sstore.GetClusterName()
968 old_ip = self.sstore.GetMasterIP()
969 if new_name == old_name and new_ip == old_ip:
970 raise errors.OpPrereqError("Neither the name nor the IP address of the"
971 " cluster has changed")
973 result = utils.RunCmd(["fping", "-q", new_ip])
974 if not result.failed:
975 raise errors.OpPrereqError("The given cluster IP address (%s) is"
976 " reachable on the network. Aborting." %
979 self.op.name = new_name
981 def Exec(self, feedback_fn):
982 """Rename the cluster.
985 clustername = self.op.name
989 # shutdown the master IP
990 master = ss.GetMasterNode()
991 if not rpc.call_node_stop_master(master):
992 raise errors.OpExecError("Could not disable the master role")
996 ss.SetKey(ss.SS_MASTER_IP, ip)
997 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
999 # Distribute updated ss config to all nodes
1000 myself = self.cfg.GetNodeInfo(master)
1001 dist_nodes = self.cfg.GetNodeList()
1002 if myself.name in dist_nodes:
1003 dist_nodes.remove(myself.name)
1005 logger.Debug("Copying updated ssconf data to all nodes")
1006 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007 fname = ss.KeyToFilename(keyname)
1008 result = rpc.call_upload_file(dist_nodes, fname)
1009 for to_node in dist_nodes:
1010 if not result[to_node]:
1011 logger.Error("copy of file %s to node %s failed" %
1014 if not rpc.call_node_start_master(master):
1015 logger.Error("Could not re-enable the master role on the master,\n"
1016 "please restart manually.")
1019 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1020 """Sleep and poll for an instance's disk to sync.
1023 if not instance.disks:
1027 logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1029 node = instance.primary_node
1031 for dev in instance.disks:
1032 cfgw.SetDiskID(dev, node)
1038 cumul_degraded = False
1039 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1041 logger.ToStderr("Can't get any data from node %s" % node)
1044 raise errors.RemoteError("Can't contact node %s for mirror data,"
1045 " aborting." % node)
1049 for i in range(len(rstats)):
1052 logger.ToStderr("Can't compute data for node %s/%s" %
1053 (node, instance.disks[i].iv_name))
1055 perc_done, est_time, is_degraded = mstat
1056 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1057 if perc_done is not None:
1059 if est_time is not None:
1060 rem_time = "%d estimated seconds remaining" % est_time
1063 rem_time = "no time estimate"
1064 logger.ToStdout("- device %s: %5.2f%% done, %s" %
1065 (instance.disks[i].iv_name, perc_done, rem_time))
1072 time.sleep(min(60, max_time))
1078 logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1079 return not cumul_degraded
1082 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1083 """Check that mirrors are not degraded.
1086 cfgw.SetDiskID(dev, node)
1089 if on_primary or dev.AssembleOnSecondary():
1090 rstats = rpc.call_blockdev_find(node, dev)
1092 logger.ToStderr("Can't get any data from node %s" % node)
1095 result = result and (not rstats[5])
1097 for child in dev.children:
1098 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1103 class LUDiagnoseOS(NoHooksLU):
1104 """Logical unit for OS diagnose/query.
1109 def CheckPrereq(self):
1110 """Check prerequisites.
1112 This always succeeds, since this is a pure query LU.
1117 def Exec(self, feedback_fn):
1118 """Compute the list of OSes.
1121 node_list = self.cfg.GetNodeList()
1122 node_data = rpc.call_os_diagnose(node_list)
1123 if node_data == False:
1124 raise errors.OpExecError("Can't gather the list of OSes")
1128 class LURemoveNode(LogicalUnit):
1129 """Logical unit for removing a node.
1132 HPATH = "node-remove"
1133 HTYPE = constants.HTYPE_NODE
1134 _OP_REQP = ["node_name"]
1136 def BuildHooksEnv(self):
1139 This doesn't run on the target node in the pre phase as a failed
1140 node would not allows itself to run.
1144 "OP_TARGET": self.op.node_name,
1145 "NODE_NAME": self.op.node_name,
1147 all_nodes = self.cfg.GetNodeList()
1148 all_nodes.remove(self.op.node_name)
1149 return env, all_nodes, all_nodes
1151 def CheckPrereq(self):
1152 """Check prerequisites.
1155 - the node exists in the configuration
1156 - it does not have primary or secondary instances
1157 - it's not the master
1159 Any errors are signalled by raising errors.OpPrereqError.
1162 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1164 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1166 instance_list = self.cfg.GetInstanceList()
1168 masternode = self.sstore.GetMasterNode()
1169 if node.name == masternode:
1170 raise errors.OpPrereqError("Node is the master node,"
1171 " you need to failover first.")
1173 for instance_name in instance_list:
1174 instance = self.cfg.GetInstanceInfo(instance_name)
1175 if node.name == instance.primary_node:
1176 raise errors.OpPrereqError("Instance %s still running on the node,"
1177 " please remove first." % instance_name)
1178 if node.name in instance.secondary_nodes:
1179 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1180 " please remove first." % instance_name)
1181 self.op.node_name = node.name
1184 def Exec(self, feedback_fn):
1185 """Removes the node from the cluster.
1189 logger.Info("stopping the node daemon and removing configs from node %s" %
1192 rpc.call_node_leave_cluster(node.name)
1194 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1196 logger.Info("Removing node %s from config" % node.name)
1198 self.cfg.RemoveNode(node.name)
1201 class LUQueryNodes(NoHooksLU):
1202 """Logical unit for querying nodes.
1205 _OP_REQP = ["output_fields", "names"]
1207 def CheckPrereq(self):
1208 """Check prerequisites.
1210 This checks that the fields required are valid output fields.
1213 self.dynamic_fields = frozenset(["dtotal", "dfree",
1214 "mtotal", "mnode", "mfree",
1217 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1218 "pinst_list", "sinst_list",
1220 dynamic=self.dynamic_fields,
1221 selected=self.op.output_fields)
1223 self.wanted = _GetWantedNodes(self, self.op.names)
1225 def Exec(self, feedback_fn):
1226 """Computes the list of nodes and their attributes.
1229 nodenames = self.wanted
1230 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1232 # begin data gathering
1234 if self.dynamic_fields.intersection(self.op.output_fields):
1236 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1237 for name in nodenames:
1238 nodeinfo = node_data.get(name, None)
1241 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1242 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1243 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1244 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1245 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1246 "bootid": nodeinfo['bootid'],
1249 live_data[name] = {}
1251 live_data = dict.fromkeys(nodenames, {})
1253 node_to_primary = dict([(name, set()) for name in nodenames])
1254 node_to_secondary = dict([(name, set()) for name in nodenames])
1256 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1257 "sinst_cnt", "sinst_list"))
1258 if inst_fields & frozenset(self.op.output_fields):
1259 instancelist = self.cfg.GetInstanceList()
1261 for instance_name in instancelist:
1262 inst = self.cfg.GetInstanceInfo(instance_name)
1263 if inst.primary_node in node_to_primary:
1264 node_to_primary[inst.primary_node].add(inst.name)
1265 for secnode in inst.secondary_nodes:
1266 if secnode in node_to_secondary:
1267 node_to_secondary[secnode].add(inst.name)
1269 # end data gathering
1272 for node in nodelist:
1274 for field in self.op.output_fields:
1277 elif field == "pinst_list":
1278 val = list(node_to_primary[node.name])
1279 elif field == "sinst_list":
1280 val = list(node_to_secondary[node.name])
1281 elif field == "pinst_cnt":
1282 val = len(node_to_primary[node.name])
1283 elif field == "sinst_cnt":
1284 val = len(node_to_secondary[node.name])
1285 elif field == "pip":
1286 val = node.primary_ip
1287 elif field == "sip":
1288 val = node.secondary_ip
1289 elif field in self.dynamic_fields:
1290 val = live_data[node.name].get(field, None)
1292 raise errors.ParameterError(field)
1293 node_output.append(val)
1294 output.append(node_output)
1299 class LUQueryNodeVolumes(NoHooksLU):
1300 """Logical unit for getting volumes on node(s).
1303 _OP_REQP = ["nodes", "output_fields"]
1305 def CheckPrereq(self):
1306 """Check prerequisites.
1308 This checks that the fields required are valid output fields.
1311 self.nodes = _GetWantedNodes(self, self.op.nodes)
1313 _CheckOutputFields(static=["node"],
1314 dynamic=["phys", "vg", "name", "size", "instance"],
1315 selected=self.op.output_fields)
1318 def Exec(self, feedback_fn):
1319 """Computes the list of nodes and their attributes.
1322 nodenames = self.nodes
1323 volumes = rpc.call_node_volumes(nodenames)
1325 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1326 in self.cfg.GetInstanceList()]
1328 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1331 for node in nodenames:
1332 if node not in volumes or not volumes[node]:
1335 node_vols = volumes[node][:]
1336 node_vols.sort(key=lambda vol: vol['dev'])
1338 for vol in node_vols:
1340 for field in self.op.output_fields:
1343 elif field == "phys":
1347 elif field == "name":
1349 elif field == "size":
1350 val = int(float(vol['size']))
1351 elif field == "instance":
1353 if node not in lv_by_node[inst]:
1355 if vol['name'] in lv_by_node[inst][node]:
1361 raise errors.ParameterError(field)
1362 node_output.append(str(val))
1364 output.append(node_output)
1369 class LUAddNode(LogicalUnit):
1370 """Logical unit for adding node to the cluster.
1374 HTYPE = constants.HTYPE_NODE
1375 _OP_REQP = ["node_name"]
1377 def BuildHooksEnv(self):
1380 This will run on all nodes before, and on all nodes + the new node after.
1384 "OP_TARGET": self.op.node_name,
1385 "NODE_NAME": self.op.node_name,
1386 "NODE_PIP": self.op.primary_ip,
1387 "NODE_SIP": self.op.secondary_ip,
1389 nodes_0 = self.cfg.GetNodeList()
1390 nodes_1 = nodes_0 + [self.op.node_name, ]
1391 return env, nodes_0, nodes_1
1393 def CheckPrereq(self):
1394 """Check prerequisites.
1397 - the new node is not already in the config
1399 - its parameters (single/dual homed) matches the cluster
1401 Any errors are signalled by raising errors.OpPrereqError.
1404 node_name = self.op.node_name
1407 dns_data = utils.HostInfo(node_name)
1409 node = dns_data.name
1410 primary_ip = self.op.primary_ip = dns_data.ip
1411 secondary_ip = getattr(self.op, "secondary_ip", None)
1412 if secondary_ip is None:
1413 secondary_ip = primary_ip
1414 if not utils.IsValidIP(secondary_ip):
1415 raise errors.OpPrereqError("Invalid secondary IP given")
1416 self.op.secondary_ip = secondary_ip
1417 node_list = cfg.GetNodeList()
1418 if node in node_list:
1419 raise errors.OpPrereqError("Node %s is already in the configuration"
1422 for existing_node_name in node_list:
1423 existing_node = cfg.GetNodeInfo(existing_node_name)
1424 if (existing_node.primary_ip == primary_ip or
1425 existing_node.secondary_ip == primary_ip or
1426 existing_node.primary_ip == secondary_ip or
1427 existing_node.secondary_ip == secondary_ip):
1428 raise errors.OpPrereqError("New node ip address(es) conflict with"
1429 " existing node %s" % existing_node.name)
1431 # check that the type of the node (single versus dual homed) is the
1432 # same as for the master
1433 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1434 master_singlehomed = myself.secondary_ip == myself.primary_ip
1435 newbie_singlehomed = secondary_ip == primary_ip
1436 if master_singlehomed != newbie_singlehomed:
1437 if master_singlehomed:
1438 raise errors.OpPrereqError("The master has no private ip but the"
1439 " new node has one")
1441 raise errors.OpPrereqError("The master has a private ip but the"
1442 " new node doesn't have one")
1444 # checks reachablity
1445 if not utils.TcpPing(utils.HostInfo().name,
1447 constants.DEFAULT_NODED_PORT):
1448 raise errors.OpPrereqError("Node not reachable by ping")
1450 if not newbie_singlehomed:
1451 # check reachability from my secondary ip to newbie's secondary ip
1452 if not utils.TcpPing(myself.secondary_ip,
1454 constants.DEFAULT_NODED_PORT):
1455 raise errors.OpPrereqError(
1456 "Node secondary ip not reachable by TCP based ping to noded port")
1458 self.new_node = objects.Node(name=node,
1459 primary_ip=primary_ip,
1460 secondary_ip=secondary_ip)
1462 def Exec(self, feedback_fn):
1463 """Adds the new node to the cluster.
1466 new_node = self.new_node
1467 node = new_node.name
1469 # set up inter-node password and certificate and restarts the node daemon
1470 gntpass = self.sstore.GetNodeDaemonPassword()
1471 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1472 raise errors.OpExecError("ganeti password corruption detected")
1473 f = open(constants.SSL_CERT_FILE)
1475 gntpem = f.read(8192)
1478 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1479 # so we use this to detect an invalid certificate; as long as the
1480 # cert doesn't contain this, the here-document will be correctly
1481 # parsed by the shell sequence below
1482 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1483 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1484 if not gntpem.endswith("\n"):
1485 raise errors.OpExecError("PEM must end with newline")
1486 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1488 # and then connect with ssh to set password and start ganeti-noded
1489 # note that all the below variables are sanitized at this point,
1490 # either by being constants or by the checks above
1492 mycommand = ("umask 077 && "
1493 "echo '%s' > '%s' && "
1494 "cat > '%s' << '!EOF.' && \n"
1495 "%s!EOF.\n%s restart" %
1496 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1497 constants.SSL_CERT_FILE, gntpem,
1498 constants.NODE_INITD_SCRIPT))
1500 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1502 raise errors.OpExecError("Remote command on node %s, error: %s,"
1504 (node, result.fail_reason, result.output))
1506 # check connectivity
1509 result = rpc.call_version([node])[node]
1511 if constants.PROTOCOL_VERSION == result:
1512 logger.Info("communication to node %s fine, sw version %s match" %
1515 raise errors.OpExecError("Version mismatch master version %s,"
1516 " node version %s" %
1517 (constants.PROTOCOL_VERSION, result))
1519 raise errors.OpExecError("Cannot get version from the new node")
1522 logger.Info("copy ssh key to node %s" % node)
1523 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1525 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1526 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1532 keyarray.append(f.read())
1536 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1537 keyarray[3], keyarray[4], keyarray[5])
1540 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1542 # Add node to our /etc/hosts, and add key to known_hosts
1543 _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1544 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1545 self.cfg.GetHostKey())
1547 if new_node.secondary_ip != new_node.primary_ip:
1548 if not rpc.call_node_tcp_ping(new_node.name,
1549 constants.LOCALHOST_IP_ADDRESS,
1550 new_node.secondary_ip,
1551 constants.DEFAULT_NODED_PORT,
1553 raise errors.OpExecError("Node claims it doesn't have the"
1554 " secondary ip you gave (%s).\n"
1555 "Please fix and re-run this command." %
1556 new_node.secondary_ip)
1558 success, msg = ssh.VerifyNodeHostname(node)
1560 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1561 " than the one the resolver gives: %s.\n"
1562 "Please fix and re-run this command." %
1565 # Distribute updated /etc/hosts and known_hosts to all nodes,
1566 # including the node just added
1567 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1568 dist_nodes = self.cfg.GetNodeList() + [node]
1569 if myself.name in dist_nodes:
1570 dist_nodes.remove(myself.name)
1572 logger.Debug("Copying hosts and known_hosts to all nodes")
1573 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1574 result = rpc.call_upload_file(dist_nodes, fname)
1575 for to_node in dist_nodes:
1576 if not result[to_node]:
1577 logger.Error("copy of file %s to node %s failed" %
1580 to_copy = ss.GetFileList()
1581 for fname in to_copy:
1582 if not ssh.CopyFileToNode(node, fname):
1583 logger.Error("could not copy file %s to node %s" % (fname, node))
1585 logger.Info("adding node %s to cluster.conf" % node)
1586 self.cfg.AddNode(new_node)
1589 class LUMasterFailover(LogicalUnit):
1590 """Failover the master node to the current node.
1592 This is a special LU in that it must run on a non-master node.
1595 HPATH = "master-failover"
1596 HTYPE = constants.HTYPE_CLUSTER
1600 def BuildHooksEnv(self):
1603 This will run on the new master only in the pre phase, and on all
1604 the nodes in the post phase.
1608 "OP_TARGET": self.new_master,
1609 "NEW_MASTER": self.new_master,
1610 "OLD_MASTER": self.old_master,
1612 return env, [self.new_master], self.cfg.GetNodeList()
1614 def CheckPrereq(self):
1615 """Check prerequisites.
1617 This checks that we are not already the master.
1620 self.new_master = utils.HostInfo().name
1621 self.old_master = self.sstore.GetMasterNode()
1623 if self.old_master == self.new_master:
1624 raise errors.OpPrereqError("This commands must be run on the node"
1625 " where you want the new master to be.\n"
1626 "%s is already the master" %
1629 def Exec(self, feedback_fn):
1630 """Failover the master node.
1632 This command, when run on a non-master node, will cause the current
1633 master to cease being master, and the non-master to become new
1637 #TODO: do not rely on gethostname returning the FQDN
1638 logger.Info("setting master to %s, old master: %s" %
1639 (self.new_master, self.old_master))
1641 if not rpc.call_node_stop_master(self.old_master):
1642 logger.Error("could disable the master role on the old master"
1643 " %s, please disable manually" % self.old_master)
1646 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1647 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1648 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1649 logger.Error("could not distribute the new simple store master file"
1650 " to the other nodes, please check.")
1652 if not rpc.call_node_start_master(self.new_master):
1653 logger.Error("could not start the master role on the new master"
1654 " %s, please check" % self.new_master)
1655 feedback_fn("Error in activating the master IP on the new master,\n"
1656 "please fix manually.")
1660 class LUQueryClusterInfo(NoHooksLU):
1661 """Query cluster configuration.
1667 def CheckPrereq(self):
1668 """No prerequsites needed for this LU.
1673 def Exec(self, feedback_fn):
1674 """Return cluster config.
1678 "name": self.sstore.GetClusterName(),
1679 "software_version": constants.RELEASE_VERSION,
1680 "protocol_version": constants.PROTOCOL_VERSION,
1681 "config_version": constants.CONFIG_VERSION,
1682 "os_api_version": constants.OS_API_VERSION,
1683 "export_version": constants.EXPORT_VERSION,
1684 "master": self.sstore.GetMasterNode(),
1685 "architecture": (platform.architecture()[0], platform.machine()),
1691 class LUClusterCopyFile(NoHooksLU):
1692 """Copy file to cluster.
1695 _OP_REQP = ["nodes", "filename"]
1697 def CheckPrereq(self):
1698 """Check prerequisites.
1700 It should check that the named file exists and that the given list
1704 if not os.path.exists(self.op.filename):
1705 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1707 self.nodes = _GetWantedNodes(self, self.op.nodes)
1709 def Exec(self, feedback_fn):
1710 """Copy a file from master to some nodes.
1713 opts - class with options as members
1714 args - list containing a single element, the file name
1716 nodes - list containing the name of target nodes; if empty, all nodes
1719 filename = self.op.filename
1721 myname = utils.HostInfo().name
1723 for node in self.nodes:
1726 if not ssh.CopyFileToNode(node, filename):
1727 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1730 class LUDumpClusterConfig(NoHooksLU):
1731 """Return a text-representation of the cluster-config.
1736 def CheckPrereq(self):
1737 """No prerequisites.
1742 def Exec(self, feedback_fn):
1743 """Dump a representation of the cluster config to the standard output.
1746 return self.cfg.DumpConfig()
1749 class LURunClusterCommand(NoHooksLU):
1750 """Run a command on some nodes.
1753 _OP_REQP = ["command", "nodes"]
1755 def CheckPrereq(self):
1756 """Check prerequisites.
1758 It checks that the given list of nodes is valid.
1761 self.nodes = _GetWantedNodes(self, self.op.nodes)
1763 def Exec(self, feedback_fn):
1764 """Run a command on some nodes.
1768 for node in self.nodes:
1769 result = ssh.SSHCall(node, "root", self.op.command)
1770 data.append((node, result.output, result.exit_code))
1775 class LUActivateInstanceDisks(NoHooksLU):
1776 """Bring up an instance's disks.
1779 _OP_REQP = ["instance_name"]
1781 def CheckPrereq(self):
1782 """Check prerequisites.
1784 This checks that the instance is in the cluster.
1787 instance = self.cfg.GetInstanceInfo(
1788 self.cfg.ExpandInstanceName(self.op.instance_name))
1789 if instance is None:
1790 raise errors.OpPrereqError("Instance '%s' not known" %
1791 self.op.instance_name)
1792 self.instance = instance
1795 def Exec(self, feedback_fn):
1796 """Activate the disks.
1799 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1801 raise errors.OpExecError("Cannot activate block devices")
1806 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1807 """Prepare the block devices for an instance.
1809 This sets up the block devices on all nodes.
1812 instance: a ganeti.objects.Instance object
1813 ignore_secondaries: if true, errors on secondary nodes won't result
1814 in an error return from the function
1817 false if the operation failed
1818 list of (host, instance_visible_name, node_visible_name) if the operation
1819 suceeded with the mapping from node devices to instance devices
1823 for inst_disk in instance.disks:
1824 master_result = None
1825 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1826 cfg.SetDiskID(node_disk, node)
1827 is_primary = node == instance.primary_node
1828 result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1830 logger.Error("could not prepare block device %s on node %s (is_pri"
1831 "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1832 if is_primary or not ignore_secondaries:
1835 master_result = result
1836 device_info.append((instance.primary_node, inst_disk.iv_name,
1839 # leave the disks configured for the primary node
1840 # this is a workaround that would be fixed better by
1841 # improving the logical/physical id handling
1842 for disk in instance.disks:
1843 cfg.SetDiskID(disk, instance.primary_node)
1845 return disks_ok, device_info
1848 def _StartInstanceDisks(cfg, instance, force):
1849 """Start the disks of an instance.
1852 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1853 ignore_secondaries=force)
1855 _ShutdownInstanceDisks(instance, cfg)
1856 if force is not None and not force:
1857 logger.Error("If the message above refers to a secondary node,"
1858 " you can retry the operation using '--force'.")
1859 raise errors.OpExecError("Disk consistency error")
1862 class LUDeactivateInstanceDisks(NoHooksLU):
1863 """Shutdown an instance's disks.
1866 _OP_REQP = ["instance_name"]
1868 def CheckPrereq(self):
1869 """Check prerequisites.
1871 This checks that the instance is in the cluster.
1874 instance = self.cfg.GetInstanceInfo(
1875 self.cfg.ExpandInstanceName(self.op.instance_name))
1876 if instance is None:
1877 raise errors.OpPrereqError("Instance '%s' not known" %
1878 self.op.instance_name)
1879 self.instance = instance
1881 def Exec(self, feedback_fn):
1882 """Deactivate the disks
1885 instance = self.instance
1886 ins_l = rpc.call_instance_list([instance.primary_node])
1887 ins_l = ins_l[instance.primary_node]
1888 if not type(ins_l) is list:
1889 raise errors.OpExecError("Can't contact node '%s'" %
1890 instance.primary_node)
1892 if self.instance.name in ins_l:
1893 raise errors.OpExecError("Instance is running, can't shutdown"
1896 _ShutdownInstanceDisks(instance, self.cfg)
1899 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1900 """Shutdown block devices of an instance.
1902 This does the shutdown on all nodes of the instance.
1904 If the ignore_primary is false, errors on the primary node are
1909 for disk in instance.disks:
1910 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1911 cfg.SetDiskID(top_disk, node)
1912 if not rpc.call_blockdev_shutdown(node, top_disk):
1913 logger.Error("could not shutdown block device %s on node %s" %
1914 (disk.iv_name, node))
1915 if not ignore_primary or node != instance.primary_node:
1920 class LUStartupInstance(LogicalUnit):
1921 """Starts an instance.
1924 HPATH = "instance-start"
1925 HTYPE = constants.HTYPE_INSTANCE
1926 _OP_REQP = ["instance_name", "force"]
1928 def BuildHooksEnv(self):
1931 This runs on master, primary and secondary nodes of the instance.
1935 "FORCE": self.op.force,
1937 env.update(_BuildInstanceHookEnvByObject(self.instance))
1938 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1939 list(self.instance.secondary_nodes))
1942 def CheckPrereq(self):
1943 """Check prerequisites.
1945 This checks that the instance is in the cluster.
1948 instance = self.cfg.GetInstanceInfo(
1949 self.cfg.ExpandInstanceName(self.op.instance_name))
1950 if instance is None:
1951 raise errors.OpPrereqError("Instance '%s' not known" %
1952 self.op.instance_name)
1954 # check bridges existance
1955 _CheckInstanceBridgesExist(instance)
1957 self.instance = instance
1958 self.op.instance_name = instance.name
1960 def Exec(self, feedback_fn):
1961 """Start the instance.
1964 instance = self.instance
1965 force = self.op.force
1966 extra_args = getattr(self.op, "extra_args", "")
1968 node_current = instance.primary_node
1970 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1972 raise errors.OpExecError("Could not contact node %s for infos" %
1975 freememory = nodeinfo[node_current]['memory_free']
1976 memory = instance.memory
1977 if memory > freememory:
1978 raise errors.OpExecError("Not enough memory to start instance"
1980 " needed %s MiB, available %s MiB" %
1981 (instance.name, node_current, memory,
1984 _StartInstanceDisks(self.cfg, instance, force)
1986 if not rpc.call_instance_start(node_current, instance, extra_args):
1987 _ShutdownInstanceDisks(instance, self.cfg)
1988 raise errors.OpExecError("Could not start instance")
1990 self.cfg.MarkInstanceUp(instance.name)
1993 class LURebootInstance(LogicalUnit):
1994 """Reboot an instance.
1997 HPATH = "instance-reboot"
1998 HTYPE = constants.HTYPE_INSTANCE
1999 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2001 def BuildHooksEnv(self):
2004 This runs on master, primary and secondary nodes of the instance.
2008 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2010 env.update(_BuildInstanceHookEnvByObject(self.instance))
2011 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2012 list(self.instance.secondary_nodes))
2015 def CheckPrereq(self):
2016 """Check prerequisites.
2018 This checks that the instance is in the cluster.
2021 instance = self.cfg.GetInstanceInfo(
2022 self.cfg.ExpandInstanceName(self.op.instance_name))
2023 if instance is None:
2024 raise errors.OpPrereqError("Instance '%s' not known" %
2025 self.op.instance_name)
2027 # check bridges existance
2028 _CheckInstanceBridgesExist(instance)
2030 self.instance = instance
2031 self.op.instance_name = instance.name
2033 def Exec(self, feedback_fn):
2034 """Reboot the instance.
2037 instance = self.instance
2038 ignore_secondaries = self.op.ignore_secondaries
2039 reboot_type = self.op.reboot_type
2040 extra_args = getattr(self.op, "extra_args", "")
2042 node_current = instance.primary_node
2044 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2045 constants.INSTANCE_REBOOT_HARD,
2046 constants.INSTANCE_REBOOT_FULL]:
2047 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2048 (constants.INSTANCE_REBOOT_SOFT,
2049 constants.INSTANCE_REBOOT_HARD,
2050 constants.INSTANCE_REBOOT_FULL))
2052 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2053 constants.INSTANCE_REBOOT_HARD]:
2054 if not rpc.call_instance_reboot(node_current, instance,
2055 reboot_type, extra_args):
2056 raise errors.OpExecError("Could not reboot instance")
2058 if not rpc.call_instance_shutdown(node_current, instance):
2059 raise errors.OpExecError("could not shutdown instance for full reboot")
2060 _ShutdownInstanceDisks(instance, self.cfg)
2061 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2062 if not rpc.call_instance_start(node_current, instance, extra_args):
2063 _ShutdownInstanceDisks(instance, self.cfg)
2064 raise errors.OpExecError("Could not start instance for full reboot")
2066 self.cfg.MarkInstanceUp(instance.name)
2069 class LUShutdownInstance(LogicalUnit):
2070 """Shutdown an instance.
2073 HPATH = "instance-stop"
2074 HTYPE = constants.HTYPE_INSTANCE
2075 _OP_REQP = ["instance_name"]
2077 def BuildHooksEnv(self):
2080 This runs on master, primary and secondary nodes of the instance.
2083 env = _BuildInstanceHookEnvByObject(self.instance)
2084 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2085 list(self.instance.secondary_nodes))
2088 def CheckPrereq(self):
2089 """Check prerequisites.
2091 This checks that the instance is in the cluster.
2094 instance = self.cfg.GetInstanceInfo(
2095 self.cfg.ExpandInstanceName(self.op.instance_name))
2096 if instance is None:
2097 raise errors.OpPrereqError("Instance '%s' not known" %
2098 self.op.instance_name)
2099 self.instance = instance
2101 def Exec(self, feedback_fn):
2102 """Shutdown the instance.
2105 instance = self.instance
2106 node_current = instance.primary_node
2107 if not rpc.call_instance_shutdown(node_current, instance):
2108 logger.Error("could not shutdown instance")
2110 self.cfg.MarkInstanceDown(instance.name)
2111 _ShutdownInstanceDisks(instance, self.cfg)
2114 class LUReinstallInstance(LogicalUnit):
2115 """Reinstall an instance.
2118 HPATH = "instance-reinstall"
2119 HTYPE = constants.HTYPE_INSTANCE
2120 _OP_REQP = ["instance_name"]
2122 def BuildHooksEnv(self):
2125 This runs on master, primary and secondary nodes of the instance.
2128 env = _BuildInstanceHookEnvByObject(self.instance)
2129 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130 list(self.instance.secondary_nodes))
2133 def CheckPrereq(self):
2134 """Check prerequisites.
2136 This checks that the instance is in the cluster and is not running.
2139 instance = self.cfg.GetInstanceInfo(
2140 self.cfg.ExpandInstanceName(self.op.instance_name))
2141 if instance is None:
2142 raise errors.OpPrereqError("Instance '%s' not known" %
2143 self.op.instance_name)
2144 if instance.disk_template == constants.DT_DISKLESS:
2145 raise errors.OpPrereqError("Instance '%s' has no disks" %
2146 self.op.instance_name)
2147 if instance.status != "down":
2148 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2149 self.op.instance_name)
2150 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2152 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2153 (self.op.instance_name,
2154 instance.primary_node))
2156 self.op.os_type = getattr(self.op, "os_type", None)
2157 if self.op.os_type is not None:
2159 pnode = self.cfg.GetNodeInfo(
2160 self.cfg.ExpandNodeName(instance.primary_node))
2162 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2164 os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2165 if not isinstance(os_obj, objects.OS):
2166 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2167 " primary node" % self.op.os_type)
2169 self.instance = instance
2171 def Exec(self, feedback_fn):
2172 """Reinstall the instance.
2175 inst = self.instance
2177 if self.op.os_type is not None:
2178 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2179 inst.os = self.op.os_type
2180 self.cfg.AddInstance(inst)
2182 _StartInstanceDisks(self.cfg, inst, None)
2184 feedback_fn("Running the instance OS create scripts...")
2185 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2186 raise errors.OpExecError("Could not install OS for instance %s "
2188 (inst.name, inst.primary_node))
2190 _ShutdownInstanceDisks(inst, self.cfg)
2193 class LURenameInstance(LogicalUnit):
2194 """Rename an instance.
2197 HPATH = "instance-rename"
2198 HTYPE = constants.HTYPE_INSTANCE
2199 _OP_REQP = ["instance_name", "new_name"]
2201 def BuildHooksEnv(self):
2204 This runs on master, primary and secondary nodes of the instance.
2207 env = _BuildInstanceHookEnvByObject(self.instance)
2208 env["INSTANCE_NEW_NAME"] = self.op.new_name
2209 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210 list(self.instance.secondary_nodes))
2213 def CheckPrereq(self):
2214 """Check prerequisites.
2216 This checks that the instance is in the cluster and is not running.
2219 instance = self.cfg.GetInstanceInfo(
2220 self.cfg.ExpandInstanceName(self.op.instance_name))
2221 if instance is None:
2222 raise errors.OpPrereqError("Instance '%s' not known" %
2223 self.op.instance_name)
2224 if instance.status != "down":
2225 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2226 self.op.instance_name)
2227 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2229 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2230 (self.op.instance_name,
2231 instance.primary_node))
2232 self.instance = instance
2234 # new name verification
2235 name_info = utils.HostInfo(self.op.new_name)
2237 self.op.new_name = new_name = name_info.name
2238 if not getattr(self.op, "ignore_ip", False):
2239 command = ["fping", "-q", name_info.ip]
2240 result = utils.RunCmd(command)
2241 if not result.failed:
2242 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2243 (name_info.ip, new_name))
2246 def Exec(self, feedback_fn):
2247 """Reinstall the instance.
2250 inst = self.instance
2251 old_name = inst.name
2253 self.cfg.RenameInstance(inst.name, self.op.new_name)
2255 # re-read the instance from the configuration after rename
2256 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2258 _StartInstanceDisks(self.cfg, inst, None)
2260 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2262 msg = ("Could run OS rename script for instance %s\n"
2264 "(but the instance has been renamed in Ganeti)" %
2265 (inst.name, inst.primary_node))
2268 _ShutdownInstanceDisks(inst, self.cfg)
2271 class LURemoveInstance(LogicalUnit):
2272 """Remove an instance.
2275 HPATH = "instance-remove"
2276 HTYPE = constants.HTYPE_INSTANCE
2277 _OP_REQP = ["instance_name"]
2279 def BuildHooksEnv(self):
2282 This runs on master, primary and secondary nodes of the instance.
2285 env = _BuildInstanceHookEnvByObject(self.instance)
2286 nl = [self.sstore.GetMasterNode()]
2289 def CheckPrereq(self):
2290 """Check prerequisites.
2292 This checks that the instance is in the cluster.
2295 instance = self.cfg.GetInstanceInfo(
2296 self.cfg.ExpandInstanceName(self.op.instance_name))
2297 if instance is None:
2298 raise errors.OpPrereqError("Instance '%s' not known" %
2299 self.op.instance_name)
2300 self.instance = instance
2302 def Exec(self, feedback_fn):
2303 """Remove the instance.
2306 instance = self.instance
2307 logger.Info("shutting down instance %s on node %s" %
2308 (instance.name, instance.primary_node))
2310 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2311 if self.op.ignore_failures:
2312 feedback_fn("Warning: can't shutdown instance")
2314 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2315 (instance.name, instance.primary_node))
2317 logger.Info("removing block devices for instance %s" % instance.name)
2319 if not _RemoveDisks(instance, self.cfg):
2320 if self.op.ignore_failures:
2321 feedback_fn("Warning: can't remove instance's disks")
2323 raise errors.OpExecError("Can't remove instance's disks")
2325 logger.Info("removing instance %s out of cluster config" % instance.name)
2327 self.cfg.RemoveInstance(instance.name)
2330 class LUQueryInstances(NoHooksLU):
2331 """Logical unit for querying instances.
2334 _OP_REQP = ["output_fields", "names"]
2336 def CheckPrereq(self):
2337 """Check prerequisites.
2339 This checks that the fields required are valid output fields.
2342 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2343 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2344 "admin_state", "admin_ram",
2345 "disk_template", "ip", "mac", "bridge",
2346 "sda_size", "sdb_size"],
2347 dynamic=self.dynamic_fields,
2348 selected=self.op.output_fields)
2350 self.wanted = _GetWantedInstances(self, self.op.names)
2352 def Exec(self, feedback_fn):
2353 """Computes the list of nodes and their attributes.
2356 instance_names = self.wanted
2357 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2360 # begin data gathering
2362 nodes = frozenset([inst.primary_node for inst in instance_list])
2365 if self.dynamic_fields.intersection(self.op.output_fields):
2367 node_data = rpc.call_all_instances_info(nodes)
2369 result = node_data[name]
2371 live_data.update(result)
2372 elif result == False:
2373 bad_nodes.append(name)
2374 # else no instance is alive
2376 live_data = dict([(name, {}) for name in instance_names])
2378 # end data gathering
2381 for instance in instance_list:
2383 for field in self.op.output_fields:
2388 elif field == "pnode":
2389 val = instance.primary_node
2390 elif field == "snodes":
2391 val = list(instance.secondary_nodes)
2392 elif field == "admin_state":
2393 val = (instance.status != "down")
2394 elif field == "oper_state":
2395 if instance.primary_node in bad_nodes:
2398 val = bool(live_data.get(instance.name))
2399 elif field == "admin_ram":
2400 val = instance.memory
2401 elif field == "oper_ram":
2402 if instance.primary_node in bad_nodes:
2404 elif instance.name in live_data:
2405 val = live_data[instance.name].get("memory", "?")
2408 elif field == "disk_template":
2409 val = instance.disk_template
2411 val = instance.nics[0].ip
2412 elif field == "bridge":
2413 val = instance.nics[0].bridge
2414 elif field == "mac":
2415 val = instance.nics[0].mac
2416 elif field == "sda_size" or field == "sdb_size":
2417 disk = instance.FindDisk(field[:3])
2423 raise errors.ParameterError(field)
2430 class LUFailoverInstance(LogicalUnit):
2431 """Failover an instance.
2434 HPATH = "instance-failover"
2435 HTYPE = constants.HTYPE_INSTANCE
2436 _OP_REQP = ["instance_name", "ignore_consistency"]
2438 def BuildHooksEnv(self):
2441 This runs on master, primary and secondary nodes of the instance.
2445 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2447 env.update(_BuildInstanceHookEnvByObject(self.instance))
2448 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2451 def CheckPrereq(self):
2452 """Check prerequisites.
2454 This checks that the instance is in the cluster.
2457 instance = self.cfg.GetInstanceInfo(
2458 self.cfg.ExpandInstanceName(self.op.instance_name))
2459 if instance is None:
2460 raise errors.OpPrereqError("Instance '%s' not known" %
2461 self.op.instance_name)
2463 if instance.disk_template not in constants.DTS_NET_MIRROR:
2464 raise errors.OpPrereqError("Instance's disk layout is not"
2465 " network mirrored, cannot failover.")
2467 secondary_nodes = instance.secondary_nodes
2468 if not secondary_nodes:
2469 raise errors.ProgrammerError("no secondary node but using "
2470 "DT_REMOTE_RAID1 template")
2472 # check memory requirements on the secondary node
2473 target_node = secondary_nodes[0]
2474 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2475 info = nodeinfo.get(target_node, None)
2477 raise errors.OpPrereqError("Cannot get current information"
2478 " from node '%s'" % nodeinfo)
2479 if instance.memory > info['memory_free']:
2480 raise errors.OpPrereqError("Not enough memory on target node %s."
2481 " %d MB available, %d MB required" %
2482 (target_node, info['memory_free'],
2485 # check bridge existance
2486 brlist = [nic.bridge for nic in instance.nics]
2487 if not rpc.call_bridges_exist(instance.primary_node, brlist):
2488 raise errors.OpPrereqError("One or more target bridges %s does not"
2489 " exist on destination node '%s'" %
2490 (brlist, instance.primary_node))
2492 self.instance = instance
2494 def Exec(self, feedback_fn):
2495 """Failover an instance.
2497 The failover is done by shutting it down on its present node and
2498 starting it on the secondary.
2501 instance = self.instance
2503 source_node = instance.primary_node
2504 target_node = instance.secondary_nodes[0]
2506 feedback_fn("* checking disk consistency between source and target")
2507 for dev in instance.disks:
2508 # for remote_raid1, these are md over drbd
2509 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2510 if not self.op.ignore_consistency:
2511 raise errors.OpExecError("Disk %s is degraded on target node,"
2512 " aborting failover." % dev.iv_name)
2514 feedback_fn("* checking target node resource availability")
2515 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2518 raise errors.OpExecError("Could not contact target node %s." %
2521 free_memory = int(nodeinfo[target_node]['memory_free'])
2522 memory = instance.memory
2523 if memory > free_memory:
2524 raise errors.OpExecError("Not enough memory to create instance %s on"
2525 " node %s. needed %s MiB, available %s MiB" %
2526 (instance.name, target_node, memory,
2529 feedback_fn("* shutting down instance on source node")
2530 logger.Info("Shutting down instance %s on node %s" %
2531 (instance.name, source_node))
2533 if not rpc.call_instance_shutdown(source_node, instance):
2534 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2535 " anyway. Please make sure node %s is down" %
2536 (instance.name, source_node, source_node))
2538 feedback_fn("* deactivating the instance's disks on source node")
2539 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2540 raise errors.OpExecError("Can't shut down the instance's disks.")
2542 instance.primary_node = target_node
2543 # distribute new instance config to the other nodes
2544 self.cfg.AddInstance(instance)
2546 feedback_fn("* activating the instance's disks on target node")
2547 logger.Info("Starting instance %s on node %s" %
2548 (instance.name, target_node))
2550 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2551 ignore_secondaries=True)
2553 _ShutdownInstanceDisks(instance, self.cfg)
2554 raise errors.OpExecError("Can't activate the instance's disks")
2556 feedback_fn("* starting the instance on the target node")
2557 if not rpc.call_instance_start(target_node, instance, None):
2558 _ShutdownInstanceDisks(instance, self.cfg)
2559 raise errors.OpExecError("Could not start instance %s on node %s." %
2560 (instance.name, target_node))
2563 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2564 """Create a tree of block devices on the primary node.
2566 This always creates all devices.
2570 for child in device.children:
2571 if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2574 cfg.SetDiskID(device, node)
2575 new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2578 if device.physical_id is None:
2579 device.physical_id = new_id
2583 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2584 """Create a tree of block devices on a secondary node.
2586 If this device type has to be created on secondaries, create it and
2589 If not, just recurse to children keeping the same 'force' value.
2592 if device.CreateOnSecondary():
2595 for child in device.children:
2596 if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2601 cfg.SetDiskID(device, node)
2602 new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2605 if device.physical_id is None:
2606 device.physical_id = new_id
2610 def _GenerateUniqueNames(cfg, exts):
2611 """Generate a suitable LV name.
2613 This will generate a logical volume name for the given instance.
2618 new_id = cfg.GenerateUniqueID()
2619 results.append("%s%s" % (new_id, val))
2623 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2624 """Generate a drbd device complete with its children.
2627 port = cfg.AllocatePort()
2628 vgname = cfg.GetVGName()
2629 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2630 logical_id=(vgname, names[0]))
2631 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2632 logical_id=(vgname, names[1]))
2633 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2634 logical_id = (primary, secondary, port),
2635 children = [dev_data, dev_meta])
2639 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2640 """Generate a drbd8 device complete with its children.
2643 port = cfg.AllocatePort()
2644 vgname = cfg.GetVGName()
2645 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2646 logical_id=(vgname, names[0]))
2647 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2648 logical_id=(vgname, names[1]))
2649 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2650 logical_id = (primary, secondary, port),
2651 children = [dev_data, dev_meta],
2655 def _GenerateDiskTemplate(cfg, template_name,
2656 instance_name, primary_node,
2657 secondary_nodes, disk_sz, swap_sz):
2658 """Generate the entire disk layout for a given template type.
2661 #TODO: compute space requirements
2663 vgname = cfg.GetVGName()
2664 if template_name == "diskless":
2666 elif template_name == "plain":
2667 if len(secondary_nodes) != 0:
2668 raise errors.ProgrammerError("Wrong template configuration")
2670 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2671 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2672 logical_id=(vgname, names[0]),
2674 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2675 logical_id=(vgname, names[1]),
2677 disks = [sda_dev, sdb_dev]
2678 elif template_name == "local_raid1":
2679 if len(secondary_nodes) != 0:
2680 raise errors.ProgrammerError("Wrong template configuration")
2683 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2684 ".sdb_m1", ".sdb_m2"])
2685 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2686 logical_id=(vgname, names[0]))
2687 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688 logical_id=(vgname, names[1]))
2689 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2691 children = [sda_dev_m1, sda_dev_m2])
2692 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2693 logical_id=(vgname, names[2]))
2694 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2695 logical_id=(vgname, names[3]))
2696 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2698 children = [sdb_dev_m1, sdb_dev_m2])
2699 disks = [md_sda_dev, md_sdb_dev]
2700 elif template_name == constants.DT_REMOTE_RAID1:
2701 if len(secondary_nodes) != 1:
2702 raise errors.ProgrammerError("Wrong template configuration")
2703 remote_node = secondary_nodes[0]
2704 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2705 ".sdb_data", ".sdb_meta"])
2706 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2707 disk_sz, names[0:2])
2708 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2709 children = [drbd_sda_dev], size=disk_sz)
2710 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2711 swap_sz, names[2:4])
2712 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2713 children = [drbd_sdb_dev], size=swap_sz)
2714 disks = [md_sda_dev, md_sdb_dev]
2715 elif template_name == constants.DT_DRBD8:
2716 if len(secondary_nodes) != 1:
2717 raise errors.ProgrammerError("Wrong template configuration")
2718 remote_node = secondary_nodes[0]
2719 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2720 ".sdb_data", ".sdb_meta"])
2721 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2722 disk_sz, names[0:2], "sda")
2723 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2724 swap_sz, names[2:4], "sdb")
2725 disks = [drbd_sda_dev, drbd_sdb_dev]
2727 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2731 def _GetInstanceInfoText(instance):
2732 """Compute that text that should be added to the disk's metadata.
2735 return "originstname+%s" % instance.name
2738 def _CreateDisks(cfg, instance):
2739 """Create all disks for an instance.
2741 This abstracts away some work from AddInstance.
2744 instance: the instance object
2747 True or False showing the success of the creation process
2750 info = _GetInstanceInfoText(instance)
2752 for device in instance.disks:
2753 logger.Info("creating volume %s for instance %s" %
2754 (device.iv_name, instance.name))
2756 for secondary_node in instance.secondary_nodes:
2757 if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2759 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2760 (device.iv_name, device, secondary_node))
2763 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2764 logger.Error("failed to create volume %s on primary!" %
2770 def _RemoveDisks(instance, cfg):
2771 """Remove all disks for an instance.
2773 This abstracts away some work from `AddInstance()` and
2774 `RemoveInstance()`. Note that in case some of the devices couldn't
2775 be removed, the removal will continue with the other ones (compare
2776 with `_CreateDisks()`).
2779 instance: the instance object
2782 True or False showing the success of the removal proces
2785 logger.Info("removing block devices for instance %s" % instance.name)
2788 for device in instance.disks:
2789 for node, disk in device.ComputeNodeTree(instance.primary_node):
2790 cfg.SetDiskID(disk, node)
2791 if not rpc.call_blockdev_remove(node, disk):
2792 logger.Error("could not remove block device %s on node %s,"
2793 " continuing anyway" %
2794 (device.iv_name, node))
2799 class LUCreateInstance(LogicalUnit):
2800 """Create an instance.
2803 HPATH = "instance-add"
2804 HTYPE = constants.HTYPE_INSTANCE
2805 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2806 "disk_template", "swap_size", "mode", "start", "vcpus",
2807 "wait_for_sync", "ip_check"]
2809 def BuildHooksEnv(self):
2812 This runs on master, primary and secondary nodes of the instance.
2816 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2817 "INSTANCE_DISK_SIZE": self.op.disk_size,
2818 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2819 "INSTANCE_ADD_MODE": self.op.mode,
2821 if self.op.mode == constants.INSTANCE_IMPORT:
2822 env["INSTANCE_SRC_NODE"] = self.op.src_node
2823 env["INSTANCE_SRC_PATH"] = self.op.src_path
2824 env["INSTANCE_SRC_IMAGE"] = self.src_image
2826 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2827 primary_node=self.op.pnode,
2828 secondary_nodes=self.secondaries,
2829 status=self.instance_status,
2830 os_type=self.op.os_type,
2831 memory=self.op.mem_size,
2832 vcpus=self.op.vcpus,
2833 nics=[(self.inst_ip, self.op.bridge)],
2836 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2841 def CheckPrereq(self):
2842 """Check prerequisites.
2845 if self.op.mode not in (constants.INSTANCE_CREATE,
2846 constants.INSTANCE_IMPORT):
2847 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2850 if self.op.mode == constants.INSTANCE_IMPORT:
2851 src_node = getattr(self.op, "src_node", None)
2852 src_path = getattr(self.op, "src_path", None)
2853 if src_node is None or src_path is None:
2854 raise errors.OpPrereqError("Importing an instance requires source"
2855 " node and path options")
2856 src_node_full = self.cfg.ExpandNodeName(src_node)
2857 if src_node_full is None:
2858 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2859 self.op.src_node = src_node = src_node_full
2861 if not os.path.isabs(src_path):
2862 raise errors.OpPrereqError("The source path must be absolute")
2864 export_info = rpc.call_export_info(src_node, src_path)
2867 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2869 if not export_info.has_section(constants.INISECT_EXP):
2870 raise errors.ProgrammerError("Corrupted export config")
2872 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2873 if (int(ei_version) != constants.EXPORT_VERSION):
2874 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2875 (ei_version, constants.EXPORT_VERSION))
2877 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2878 raise errors.OpPrereqError("Can't import instance with more than"
2881 # FIXME: are the old os-es, disk sizes, etc. useful?
2882 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2883 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2885 self.src_image = diskimage
2886 else: # INSTANCE_CREATE
2887 if getattr(self.op, "os_type", None) is None:
2888 raise errors.OpPrereqError("No guest OS specified")
2890 # check primary node
2891 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2893 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2895 self.op.pnode = pnode.name
2897 self.secondaries = []
2898 # disk template and mirror node verification
2899 if self.op.disk_template not in constants.DISK_TEMPLATES:
2900 raise errors.OpPrereqError("Invalid disk template name")
2902 if self.op.disk_template in constants.DTS_NET_MIRROR:
2903 if getattr(self.op, "snode", None) is None:
2904 raise errors.OpPrereqError("The networked disk templates need"
2907 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2908 if snode_name is None:
2909 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2911 elif snode_name == pnode.name:
2912 raise errors.OpPrereqError("The secondary node cannot be"
2913 " the primary node.")
2914 self.secondaries.append(snode_name)
2916 # Check lv size requirements
2917 nodenames = [pnode.name] + self.secondaries
2918 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2920 # Required free disk space as a function of disk and swap space
2922 constants.DT_DISKLESS: 0,
2923 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2924 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2925 # 256 MB are added for drbd metadata, 128MB for each drbd device
2926 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2927 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2930 if self.op.disk_template not in req_size_dict:
2931 raise errors.ProgrammerError("Disk template '%s' size requirement"
2932 " is unknown" % self.op.disk_template)
2934 req_size = req_size_dict[self.op.disk_template]
2936 for node in nodenames:
2937 info = nodeinfo.get(node, None)
2939 raise errors.OpPrereqError("Cannot get current information"
2940 " from node '%s'" % nodeinfo)
2941 if req_size > info['vg_free']:
2942 raise errors.OpPrereqError("Not enough disk space on target node %s."
2943 " %d MB available, %d MB required" %
2944 (node, info['vg_free'], req_size))
2947 os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2948 if not isinstance(os_obj, objects.OS):
2949 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2950 " primary node" % self.op.os_type)
2952 # instance verification
2953 hostname1 = utils.HostInfo(self.op.instance_name)
2955 self.op.instance_name = instance_name = hostname1.name
2956 instance_list = self.cfg.GetInstanceList()
2957 if instance_name in instance_list:
2958 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2961 ip = getattr(self.op, "ip", None)
2962 if ip is None or ip.lower() == "none":
2964 elif ip.lower() == "auto":
2965 inst_ip = hostname1.ip
2967 if not utils.IsValidIP(ip):
2968 raise errors.OpPrereqError("given IP address '%s' doesn't look"
2969 " like a valid IP" % ip)
2971 self.inst_ip = inst_ip
2973 if self.op.start and not self.op.ip_check:
2974 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2975 " adding an instance in start mode")
2977 if self.op.ip_check:
2978 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2979 constants.DEFAULT_NODED_PORT):
2980 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2981 (hostname1.ip, instance_name))
2983 # bridge verification
2984 bridge = getattr(self.op, "bridge", None)
2986 self.op.bridge = self.cfg.GetDefBridge()
2988 self.op.bridge = bridge
2990 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2991 raise errors.OpPrereqError("target bridge '%s' does not exist on"
2992 " destination node '%s'" %
2993 (self.op.bridge, pnode.name))
2996 self.instance_status = 'up'
2998 self.instance_status = 'down'
3000 def Exec(self, feedback_fn):
3001 """Create and add the instance to the cluster.
3004 instance = self.op.instance_name
3005 pnode_name = self.pnode.name
3007 nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3008 if self.inst_ip is not None:
3009 nic.ip = self.inst_ip
3011 disks = _GenerateDiskTemplate(self.cfg,
3012 self.op.disk_template,
3013 instance, pnode_name,
3014 self.secondaries, self.op.disk_size,
3017 iobj = objects.Instance(name=instance, os=self.op.os_type,
3018 primary_node=pnode_name,
3019 memory=self.op.mem_size,
3020 vcpus=self.op.vcpus,
3021 nics=[nic], disks=disks,
3022 disk_template=self.op.disk_template,
3023 status=self.instance_status,
3026 feedback_fn("* creating instance disks...")
3027 if not _CreateDisks(self.cfg, iobj):
3028 _RemoveDisks(iobj, self.cfg)
3029 raise errors.OpExecError("Device creation failed, reverting...")
3031 feedback_fn("adding instance %s to cluster config" % instance)
3033 self.cfg.AddInstance(iobj)
3035 if self.op.wait_for_sync:
3036 disk_abort = not _WaitForSync(self.cfg, iobj)
3037 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3038 # make sure the disks are not degraded (still sync-ing is ok)
3040 feedback_fn("* checking mirrors status")
3041 disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3046 _RemoveDisks(iobj, self.cfg)
3047 self.cfg.RemoveInstance(iobj.name)
3048 raise errors.OpExecError("There are some degraded disks for"
3051 feedback_fn("creating os for instance %s on node %s" %
3052 (instance, pnode_name))
3054 if iobj.disk_template != constants.DT_DISKLESS:
3055 if self.op.mode == constants.INSTANCE_CREATE:
3056 feedback_fn("* running the instance OS create scripts...")
3057 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3058 raise errors.OpExecError("could not add os for instance %s"
3060 (instance, pnode_name))
3062 elif self.op.mode == constants.INSTANCE_IMPORT:
3063 feedback_fn("* running the instance OS import scripts...")
3064 src_node = self.op.src_node
3065 src_image = self.src_image
3066 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3067 src_node, src_image):
3068 raise errors.OpExecError("Could not import os for instance"
3070 (instance, pnode_name))
3072 # also checked in the prereq part
3073 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3077 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3078 feedback_fn("* starting instance...")
3079 if not rpc.call_instance_start(pnode_name, iobj, None):
3080 raise errors.OpExecError("Could not start instance")
3083 class LUConnectConsole(NoHooksLU):
3084 """Connect to an instance's console.
3086 This is somewhat special in that it returns the command line that
3087 you need to run on the master node in order to connect to the
3091 _OP_REQP = ["instance_name"]
3093 def CheckPrereq(self):
3094 """Check prerequisites.
3096 This checks that the instance is in the cluster.
3099 instance = self.cfg.GetInstanceInfo(
3100 self.cfg.ExpandInstanceName(self.op.instance_name))
3101 if instance is None:
3102 raise errors.OpPrereqError("Instance '%s' not known" %
3103 self.op.instance_name)
3104 self.instance = instance
3106 def Exec(self, feedback_fn):
3107 """Connect to the console of an instance
3110 instance = self.instance
3111 node = instance.primary_node
3113 node_insts = rpc.call_instance_list([node])[node]
3114 if node_insts is False:
3115 raise errors.OpExecError("Can't connect to node %s." % node)
3117 if instance.name not in node_insts:
3118 raise errors.OpExecError("Instance %s is not running." % instance.name)
3120 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3122 hyper = hypervisor.GetHypervisor()
3123 console_cmd = hyper.GetShellCommandForConsole(instance.name)
3125 argv = ["ssh", "-q", "-t"]
3126 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3127 argv.extend(ssh.BATCH_MODE_OPTS)
3129 argv.append(console_cmd)
3133 class LUAddMDDRBDComponent(LogicalUnit):
3134 """Adda new mirror member to an instance's disk.
3137 HPATH = "mirror-add"
3138 HTYPE = constants.HTYPE_INSTANCE
3139 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3141 def BuildHooksEnv(self):
3144 This runs on the master, the primary and all the secondaries.
3148 "NEW_SECONDARY": self.op.remote_node,
3149 "DISK_NAME": self.op.disk_name,
3151 env.update(_BuildInstanceHookEnvByObject(self.instance))
3152 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3153 self.op.remote_node,] + list(self.instance.secondary_nodes)
3156 def CheckPrereq(self):
3157 """Check prerequisites.
3159 This checks that the instance is in the cluster.
3162 instance = self.cfg.GetInstanceInfo(
3163 self.cfg.ExpandInstanceName(self.op.instance_name))
3164 if instance is None:
3165 raise errors.OpPrereqError("Instance '%s' not known" %
3166 self.op.instance_name)
3167 self.instance = instance
3169 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3170 if remote_node is None:
3171 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3172 self.remote_node = remote_node
3174 if remote_node == instance.primary_node:
3175 raise errors.OpPrereqError("The specified node is the primary node of"
3178 if instance.disk_template != constants.DT_REMOTE_RAID1:
3179 raise errors.OpPrereqError("Instance's disk layout is not"
3181 for disk in instance.disks:
3182 if disk.iv_name == self.op.disk_name:
3185 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3186 " instance." % self.op.disk_name)
3187 if len(disk.children) > 1:
3188 raise errors.OpPrereqError("The device already has two slave"
3190 "This would create a 3-disk raid1"
3191 " which we don't allow.")
3194 def Exec(self, feedback_fn):
3195 """Add the mirror component
3199 instance = self.instance
3201 remote_node = self.remote_node
3202 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3203 names = _GenerateUniqueNames(self.cfg, lv_names)
3204 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3205 remote_node, disk.size, names)
3207 logger.Info("adding new mirror component on secondary")
3209 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3210 _GetInstanceInfoText(instance)):
3211 raise errors.OpExecError("Failed to create new component on secondary"
3212 " node %s" % remote_node)
3214 logger.Info("adding new mirror component on primary")
3216 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3217 _GetInstanceInfoText(instance)):
3218 # remove secondary dev
3219 self.cfg.SetDiskID(new_drbd, remote_node)
3220 rpc.call_blockdev_remove(remote_node, new_drbd)
3221 raise errors.OpExecError("Failed to create volume on primary")
3223 # the device exists now
3224 # call the primary node to add the mirror to md
3225 logger.Info("adding new mirror component to md")
3226 if not rpc.call_blockdev_addchildren(instance.primary_node,
3228 logger.Error("Can't add mirror compoment to md!")
3229 self.cfg.SetDiskID(new_drbd, remote_node)
3230 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3231 logger.Error("Can't rollback on secondary")
3232 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3233 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3234 logger.Error("Can't rollback on primary")
3235 raise errors.OpExecError("Can't add mirror component to md array")
3237 disk.children.append(new_drbd)
3239 self.cfg.AddInstance(instance)
3241 _WaitForSync(self.cfg, instance)
3246 class LURemoveMDDRBDComponent(LogicalUnit):
3247 """Remove a component from a remote_raid1 disk.
3250 HPATH = "mirror-remove"
3251 HTYPE = constants.HTYPE_INSTANCE
3252 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3254 def BuildHooksEnv(self):
3257 This runs on the master, the primary and all the secondaries.
3261 "DISK_NAME": self.op.disk_name,
3262 "DISK_ID": self.op.disk_id,
3263 "OLD_SECONDARY": self.old_secondary,
3265 env.update(_BuildInstanceHookEnvByObject(self.instance))
3266 nl = [self.sstore.GetMasterNode(),
3267 self.instance.primary_node] + list(self.instance.secondary_nodes)
3270 def CheckPrereq(self):
3271 """Check prerequisites.
3273 This checks that the instance is in the cluster.
3276 instance = self.cfg.GetInstanceInfo(
3277 self.cfg.ExpandInstanceName(self.op.instance_name))
3278 if instance is None:
3279 raise errors.OpPrereqError("Instance '%s' not known" %
3280 self.op.instance_name)
3281 self.instance = instance
3283 if instance.disk_template != constants.DT_REMOTE_RAID1:
3284 raise errors.OpPrereqError("Instance's disk layout is not"
3286 for disk in instance.disks:
3287 if disk.iv_name == self.op.disk_name:
3290 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3291 " instance." % self.op.disk_name)
3292 for child in disk.children:
3293 if (child.dev_type == constants.LD_DRBD7 and
3294 child.logical_id[2] == self.op.disk_id):
3297 raise errors.OpPrereqError("Can't find the device with this port.")
3299 if len(disk.children) < 2:
3300 raise errors.OpPrereqError("Cannot remove the last component from"
3304 if self.child.logical_id[0] == instance.primary_node:
3308 self.old_secondary = self.child.logical_id[oid]
3310 def Exec(self, feedback_fn):
3311 """Remove the mirror component
3314 instance = self.instance
3317 logger.Info("remove mirror component")
3318 self.cfg.SetDiskID(disk, instance.primary_node)
3319 if not rpc.call_blockdev_removechildren(instance.primary_node,
3321 raise errors.OpExecError("Can't remove child from mirror.")
3323 for node in child.logical_id[:2]:
3324 self.cfg.SetDiskID(child, node)
3325 if not rpc.call_blockdev_remove(node, child):
3326 logger.Error("Warning: failed to remove device from node %s,"
3327 " continuing operation." % node)
3329 disk.children.remove(child)
3330 self.cfg.AddInstance(instance)
3333 class LUReplaceDisks(LogicalUnit):
3334 """Replace the disks of an instance.
3337 HPATH = "mirrors-replace"
3338 HTYPE = constants.HTYPE_INSTANCE
3339 _OP_REQP = ["instance_name"]
3341 def BuildHooksEnv(self):
3344 This runs on the master, the primary and all the secondaries.
3348 "NEW_SECONDARY": self.op.remote_node,
3349 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3351 env.update(_BuildInstanceHookEnvByObject(self.instance))
3352 nl = [self.sstore.GetMasterNode(),
3353 self.instance.primary_node] + list(self.instance.secondary_nodes)
3356 def CheckPrereq(self):
3357 """Check prerequisites.
3359 This checks that the instance is in the cluster.
3362 instance = self.cfg.GetInstanceInfo(
3363 self.cfg.ExpandInstanceName(self.op.instance_name))
3364 if instance is None:
3365 raise errors.OpPrereqError("Instance '%s' not known" %
3366 self.op.instance_name)
3367 self.instance = instance
3369 if instance.disk_template != constants.DT_REMOTE_RAID1:
3370 raise errors.OpPrereqError("Instance's disk layout is not"
3373 if len(instance.secondary_nodes) != 1:
3374 raise errors.OpPrereqError("The instance has a strange layout,"
3375 " expected one secondary but found %d" %
3376 len(instance.secondary_nodes))
3378 remote_node = getattr(self.op, "remote_node", None)
3379 if remote_node is None:
3380 remote_node = instance.secondary_nodes[0]
3382 remote_node = self.cfg.ExpandNodeName(remote_node)
3383 if remote_node is None:
3384 raise errors.OpPrereqError("Node '%s' not known" %
3385 self.op.remote_node)
3386 if remote_node == instance.primary_node:
3387 raise errors.OpPrereqError("The specified node is the primary node of"
3389 self.op.remote_node = remote_node
3391 def Exec(self, feedback_fn):
3392 """Replace the disks of an instance.
3395 instance = self.instance
3398 remote_node = self.op.remote_node
3400 for dev in instance.disks:
3402 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3403 names = _GenerateUniqueNames(cfg, lv_names)
3404 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3405 remote_node, size, names)
3406 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3407 logger.Info("adding new mirror component on secondary for %s" %
3410 if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3411 _GetInstanceInfoText(instance)):
3412 raise errors.OpExecError("Failed to create new component on"
3413 " secondary node %s\n"
3414 "Full abort, cleanup manually!" %
3417 logger.Info("adding new mirror component on primary")
3419 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3420 _GetInstanceInfoText(instance)):
3421 # remove secondary dev
3422 cfg.SetDiskID(new_drbd, remote_node)
3423 rpc.call_blockdev_remove(remote_node, new_drbd)
3424 raise errors.OpExecError("Failed to create volume on primary!\n"
3425 "Full abort, cleanup manually!!")
3427 # the device exists now
3428 # call the primary node to add the mirror to md
3429 logger.Info("adding new mirror component to md")
3430 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3432 logger.Error("Can't add mirror compoment to md!")
3433 cfg.SetDiskID(new_drbd, remote_node)
3434 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3435 logger.Error("Can't rollback on secondary")
3436 cfg.SetDiskID(new_drbd, instance.primary_node)
3437 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3438 logger.Error("Can't rollback on primary")
3439 raise errors.OpExecError("Full abort, cleanup manually!!")
3441 dev.children.append(new_drbd)
3442 cfg.AddInstance(instance)
3444 # this can fail as the old devices are degraded and _WaitForSync
3445 # does a combined result over all disks, so we don't check its
3447 _WaitForSync(cfg, instance, unlock=True)
3449 # so check manually all the devices
3450 for name in iv_names:
3451 dev, child, new_drbd = iv_names[name]
3452 cfg.SetDiskID(dev, instance.primary_node)
3453 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3455 raise errors.OpExecError("MD device %s is degraded!" % name)
3456 cfg.SetDiskID(new_drbd, instance.primary_node)
3457 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3459 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3461 for name in iv_names:
3462 dev, child, new_drbd = iv_names[name]
3463 logger.Info("remove mirror %s component" % name)
3464 cfg.SetDiskID(dev, instance.primary_node)
3465 if not rpc.call_blockdev_removechildren(instance.primary_node,
3467 logger.Error("Can't remove child from mirror, aborting"
3468 " *this device cleanup*.\nYou need to cleanup manually!!")
3471 for node in child.logical_id[:2]:
3472 logger.Info("remove child device on %s" % node)
3473 cfg.SetDiskID(child, node)
3474 if not rpc.call_blockdev_remove(node, child):
3475 logger.Error("Warning: failed to remove device from node %s,"
3476 " continuing operation." % node)
3478 dev.children.remove(child)
3480 cfg.AddInstance(instance)
3483 class LUQueryInstanceData(NoHooksLU):
3484 """Query runtime instance data.
3487 _OP_REQP = ["instances"]
3489 def CheckPrereq(self):
3490 """Check prerequisites.
3492 This only checks the optional instance list against the existing names.
3495 if not isinstance(self.op.instances, list):
3496 raise errors.OpPrereqError("Invalid argument type 'instances'")
3497 if self.op.instances:
3498 self.wanted_instances = []
3499 names = self.op.instances
3501 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3502 if instance is None:
3503 raise errors.OpPrereqError("No such instance name '%s'" % name)
3504 self.wanted_instances.append(instance)
3506 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3507 in self.cfg.GetInstanceList()]
3511 def _ComputeDiskStatus(self, instance, snode, dev):
3512 """Compute block device status.
3515 self.cfg.SetDiskID(dev, instance.primary_node)
3516 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3517 if dev.dev_type in constants.LDS_DRBD:
3518 # we change the snode then (otherwise we use the one passed in)
3519 if dev.logical_id[0] == instance.primary_node:
3520 snode = dev.logical_id[1]
3522 snode = dev.logical_id[0]
3525 self.cfg.SetDiskID(dev, snode)
3526 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3531 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3532 for child in dev.children]
3537 "iv_name": dev.iv_name,
3538 "dev_type": dev.dev_type,
3539 "logical_id": dev.logical_id,
3540 "physical_id": dev.physical_id,
3541 "pstatus": dev_pstatus,
3542 "sstatus": dev_sstatus,
3543 "children": dev_children,
3548 def Exec(self, feedback_fn):
3549 """Gather and return data"""
3551 for instance in self.wanted_instances:
3552 remote_info = rpc.call_instance_info(instance.primary_node,
3554 if remote_info and "state" in remote_info:
3557 remote_state = "down"
3558 if instance.status == "down":
3559 config_state = "down"
3563 disks = [self._ComputeDiskStatus(instance, None, device)
3564 for device in instance.disks]
3567 "name": instance.name,
3568 "config_state": config_state,
3569 "run_state": remote_state,
3570 "pnode": instance.primary_node,
3571 "snodes": instance.secondary_nodes,
3573 "memory": instance.memory,
3574 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3576 "vcpus": instance.vcpus,
3579 result[instance.name] = idict
3584 class LUSetInstanceParms(LogicalUnit):
3585 """Modifies an instances's parameters.
3588 HPATH = "instance-modify"
3589 HTYPE = constants.HTYPE_INSTANCE
3590 _OP_REQP = ["instance_name"]
3592 def BuildHooksEnv(self):
3595 This runs on the master, primary and secondaries.
3600 args['memory'] = self.mem
3602 args['vcpus'] = self.vcpus
3603 if self.do_ip or self.do_bridge:
3607 ip = self.instance.nics[0].ip
3609 bridge = self.bridge
3611 bridge = self.instance.nics[0].bridge
3612 args['nics'] = [(ip, bridge)]
3613 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3614 nl = [self.sstore.GetMasterNode(),
3615 self.instance.primary_node] + list(self.instance.secondary_nodes)
3618 def CheckPrereq(self):
3619 """Check prerequisites.
3621 This only checks the instance list against the existing names.
3624 self.mem = getattr(self.op, "mem", None)
3625 self.vcpus = getattr(self.op, "vcpus", None)
3626 self.ip = getattr(self.op, "ip", None)
3627 self.bridge = getattr(self.op, "bridge", None)
3628 if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3629 raise errors.OpPrereqError("No changes submitted")
3630 if self.mem is not None:
3632 self.mem = int(self.mem)
3633 except ValueError, err:
3634 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3635 if self.vcpus is not None:
3637 self.vcpus = int(self.vcpus)
3638 except ValueError, err:
3639 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3640 if self.ip is not None:
3642 if self.ip.lower() == "none":
3645 if not utils.IsValidIP(self.ip):
3646 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3649 self.do_bridge = (self.bridge is not None)
3651 instance = self.cfg.GetInstanceInfo(
3652 self.cfg.ExpandInstanceName(self.op.instance_name))
3653 if instance is None:
3654 raise errors.OpPrereqError("No such instance name '%s'" %
3655 self.op.instance_name)
3656 self.op.instance_name = instance.name
3657 self.instance = instance
3660 def Exec(self, feedback_fn):
3661 """Modifies an instance.
3663 All parameters take effect only at the next restart of the instance.
3666 instance = self.instance
3668 instance.memory = self.mem
3669 result.append(("mem", self.mem))
3671 instance.vcpus = self.vcpus
3672 result.append(("vcpus", self.vcpus))
3674 instance.nics[0].ip = self.ip
3675 result.append(("ip", self.ip))
3677 instance.nics[0].bridge = self.bridge
3678 result.append(("bridge", self.bridge))
3680 self.cfg.AddInstance(instance)
3685 class LUQueryExports(NoHooksLU):
3686 """Query the exports list
3691 def CheckPrereq(self):
3692 """Check that the nodelist contains only existing nodes.
3695 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3697 def Exec(self, feedback_fn):
3698 """Compute the list of all the exported system images.
3701 a dictionary with the structure node->(export-list)
3702 where export-list is a list of the instances exported on
3706 return rpc.call_export_list(self.nodes)
3709 class LUExportInstance(LogicalUnit):
3710 """Export an instance to an image in the cluster.
3713 HPATH = "instance-export"
3714 HTYPE = constants.HTYPE_INSTANCE
3715 _OP_REQP = ["instance_name", "target_node", "shutdown"]
3717 def BuildHooksEnv(self):
3720 This will run on the master, primary node and target node.
3724 "EXPORT_NODE": self.op.target_node,
3725 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3727 env.update(_BuildInstanceHookEnvByObject(self.instance))
3728 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3729 self.op.target_node]
3732 def CheckPrereq(self):
3733 """Check prerequisites.
3735 This checks that the instance name is a valid one.
3738 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3739 self.instance = self.cfg.GetInstanceInfo(instance_name)
3740 if self.instance is None:
3741 raise errors.OpPrereqError("Instance '%s' not found" %
3742 self.op.instance_name)
3745 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3746 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3748 if self.dst_node is None:
3749 raise errors.OpPrereqError("Destination node '%s' is unknown." %
3750 self.op.target_node)
3751 self.op.target_node = self.dst_node.name
3753 def Exec(self, feedback_fn):
3754 """Export an instance to an image in the cluster.
3757 instance = self.instance
3758 dst_node = self.dst_node
3759 src_node = instance.primary_node
3760 # shutdown the instance, unless requested not to do so
3761 if self.op.shutdown:
3762 op = opcodes.OpShutdownInstance(instance_name=instance.name)
3763 self.processor.ChainOpCode(op, feedback_fn)
3765 vgname = self.cfg.GetVGName()
3770 for disk in instance.disks:
3771 if disk.iv_name == "sda":
3772 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3773 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3775 if not new_dev_name:
3776 logger.Error("could not snapshot block device %s on node %s" %
3777 (disk.logical_id[1], src_node))
3779 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
3780 logical_id=(vgname, new_dev_name),
3781 physical_id=(vgname, new_dev_name),
3782 iv_name=disk.iv_name)
3783 snap_disks.append(new_dev)
3786 if self.op.shutdown:
3787 op = opcodes.OpStartupInstance(instance_name=instance.name,
3789 self.processor.ChainOpCode(op, feedback_fn)
3791 # TODO: check for size
3793 for dev in snap_disks:
3794 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3796 logger.Error("could not export block device %s from node"
3798 (dev.logical_id[1], src_node, dst_node.name))
3799 if not rpc.call_blockdev_remove(src_node, dev):
3800 logger.Error("could not remove snapshot block device %s from"
3801 " node %s" % (dev.logical_id[1], src_node))
3803 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3804 logger.Error("could not finalize export for instance %s on node %s" %
3805 (instance.name, dst_node.name))
3807 nodelist = self.cfg.GetNodeList()
3808 nodelist.remove(dst_node.name)
3810 # on one-node clusters nodelist will be empty after the removal
3811 # if we proceed the backup would be removed because OpQueryExports
3812 # substitutes an empty list with the full cluster node list.
3814 op = opcodes.OpQueryExports(nodes=nodelist)
3815 exportlist = self.processor.ChainOpCode(op, feedback_fn)
3816 for node in exportlist:
3817 if instance.name in exportlist[node]:
3818 if not rpc.call_export_remove(node, instance.name):
3819 logger.Error("could not remove older export for instance %s"
3820 " on node %s" % (instance.name, node))
3823 class TagsLU(NoHooksLU):
3826 This is an abstract class which is the parent of all the other tags LUs.
3829 def CheckPrereq(self):
3830 """Check prerequisites.
3833 if self.op.kind == constants.TAG_CLUSTER:
3834 self.target = self.cfg.GetClusterInfo()
3835 elif self.op.kind == constants.TAG_NODE:
3836 name = self.cfg.ExpandNodeName(self.op.name)
3838 raise errors.OpPrereqError("Invalid node name (%s)" %
3841 self.target = self.cfg.GetNodeInfo(name)
3842 elif self.op.kind == constants.TAG_INSTANCE:
3843 name = self.cfg.ExpandInstanceName(self.op.name)
3845 raise errors.OpPrereqError("Invalid instance name (%s)" %
3848 self.target = self.cfg.GetInstanceInfo(name)
3850 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3854 class LUGetTags(TagsLU):
3855 """Returns the tags of a given object.
3858 _OP_REQP = ["kind", "name"]
3860 def Exec(self, feedback_fn):
3861 """Returns the tag list.
3864 return self.target.GetTags()
3867 class LUAddTags(TagsLU):
3868 """Sets a tag on a given object.
3871 _OP_REQP = ["kind", "name", "tags"]
3873 def CheckPrereq(self):
3874 """Check prerequisites.
3876 This checks the type and length of the tag name and value.
3879 TagsLU.CheckPrereq(self)
3880 for tag in self.op.tags:
3881 objects.TaggableObject.ValidateTag(tag)
3883 def Exec(self, feedback_fn):
3888 for tag in self.op.tags:
3889 self.target.AddTag(tag)
3890 except errors.TagError, err:
3891 raise errors.OpExecError("Error while setting tag: %s" % str(err))
3893 self.cfg.Update(self.target)
3894 except errors.ConfigurationError:
3895 raise errors.OpRetryError("There has been a modification to the"
3896 " config file and the operation has been"
3897 " aborted. Please retry.")
3900 class LUDelTags(TagsLU):
3901 """Delete a list of tags from a given object.
3904 _OP_REQP = ["kind", "name", "tags"]
3906 def CheckPrereq(self):
3907 """Check prerequisites.
3909 This checks that we have the given tag.
3912 TagsLU.CheckPrereq(self)
3913 for tag in self.op.tags:
3914 objects.TaggableObject.ValidateTag(tag)
3915 del_tags = frozenset(self.op.tags)
3916 cur_tags = self.target.GetTags()
3917 if not del_tags <= cur_tags:
3918 diff_tags = del_tags - cur_tags
3919 diff_names = ["'%s'" % tag for tag in diff_tags]
3921 raise errors.OpPrereqError("Tag(s) %s not found" %
3922 (",".join(diff_names)))
3924 def Exec(self, feedback_fn):
3925 """Remove the tag from the object.
3928 for tag in self.op.tags:
3929 self.target.RemoveTag(tag)
3931 self.cfg.Update(self.target)
3932 except errors.ConfigurationError:
3933 raise errors.OpRetryError("There has been a modification to the"
3934 " config file and the operation has been"
3935 " aborted. Please retry.")