4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
72 self.processor = processor
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _GetWantedNodes(lu, nodes):
167 """Returns list of checked and expanded node names.
170 nodes: List of nodes (strings) or None for all
173 if not isinstance(nodes, list):
174 raise errors.OpPrereqError("Invalid argument type 'nodes'")
180 node = lu.cfg.ExpandNodeName(name)
182 raise errors.OpPrereqError("No such node name '%s'" % name)
186 wanted = lu.cfg.GetNodeList()
187 return utils.NiceSort(wanted)
190 def _GetWantedInstances(lu, instances):
191 """Returns list of checked and expanded instance names.
194 instances: List of instances (strings) or None for all
197 if not isinstance(instances, list):
198 raise errors.OpPrereqError("Invalid argument type 'instances'")
203 for name in instances:
204 instance = lu.cfg.ExpandInstanceName(name)
206 raise errors.OpPrereqError("No such instance name '%s'" % name)
207 wanted.append(instance)
210 wanted = lu.cfg.GetInstanceList()
211 return utils.NiceSort(wanted)
214 def _CheckOutputFields(static, dynamic, selected):
215 """Checks whether all selected fields are valid.
218 static: Static fields
219 dynamic: Dynamic fields
222 static_fields = frozenset(static)
223 dynamic_fields = frozenset(dynamic)
225 all_fields = static_fields | dynamic_fields
227 if not all_fields.issuperset(selected):
228 raise errors.OpPrereqError("Unknown output fields selected: %s"
229 % ",".join(frozenset(selected).
230 difference(all_fields)))
233 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234 memory, vcpus, nics):
235 """Builds instance related env variables for hooks from single variables.
238 secondary_nodes: List of secondary nodes as strings
242 "INSTANCE_NAME": name,
243 "INSTANCE_PRIMARY": primary_node,
244 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245 "INSTANCE_OS_TYPE": os_type,
246 "INSTANCE_STATUS": status,
247 "INSTANCE_MEMORY": memory,
248 "INSTANCE_VCPUS": vcpus,
252 nic_count = len(nics)
253 for idx, (ip, bridge) in enumerate(nics):
256 env["INSTANCE_NIC%d_IP" % idx] = ip
257 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
261 env["INSTANCE_NIC_COUNT"] = nic_count
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267 """Builds instance related env variables for hooks from an object.
270 instance: objects.Instance object of instance
271 override: dict of values to override
274 'name': instance.name,
275 'primary_node': instance.primary_node,
276 'secondary_nodes': instance.secondary_nodes,
277 'os_type': instance.os,
278 'status': instance.os,
279 'memory': instance.memory,
280 'vcpus': instance.vcpus,
281 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
284 args.update(override)
285 return _BuildInstanceHookEnv(**args)
288 def _UpdateEtcHosts(fullnode, ip):
289 """Ensure a node has a correct entry in /etc/hosts.
292 fullnode - Fully qualified domain name of host. (str)
293 ip - IPv4 address of host (str)
296 node = fullnode.split(".", 1)[0]
298 f = open('/etc/hosts', 'r+')
307 rawline = f.readline()
313 line = rawline.split('\n')[0]
316 line = line.split('#')[0]
319 # Entire line was comment, skip
320 save_lines.append(rawline)
323 fields = line.split()
327 for spec in [ ip, fullnode, node ]:
328 if spec not in fields:
335 save_lines.append(rawline)
338 if havesome and not haveall:
339 # Line (old, or manual?) which is missing some. Remove.
343 save_lines.append(rawline)
346 add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
350 save_lines = save_lines + add_lines
352 # We removed a line, write a new file and replace old.
353 fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354 newfile = os.fdopen(fd, 'w')
355 newfile.write(''.join(save_lines))
357 os.rename(tmpname, '/etc/hosts')
360 # Simply appending a new line will do the trick.
362 for add in add_lines:
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369 """Ensure a node has a correct known_hosts entry.
372 fullnode - Fully qualified domain name of host. (str)
373 ip - IPv4 address of host (str)
374 pubkey - the public key of the cluster
377 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
389 logger.Debug('read %s' % (repr(rawline),))
391 parts = rawline.rstrip('\r\n').split()
393 # Ignore unwanted lines
394 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395 fields = parts[0].split(',')
400 for spec in [ ip, fullnode ]:
401 if spec not in fields:
406 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407 if haveall and key == pubkey:
409 save_lines.append(rawline)
410 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
413 if havesome and (not haveall or key != pubkey):
415 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
418 save_lines.append(rawline)
421 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
425 save_lines = save_lines + add_lines
427 # Write a new file and replace old.
428 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
430 newfile = os.fdopen(fd, 'w')
432 newfile.write(''.join(save_lines))
435 logger.Debug("Wrote new known_hosts.")
436 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
439 # Simply appending a new line will do the trick.
441 for add in add_lines:
447 def _HasValidVG(vglist, vgname):
448 """Checks if the volume group list is valid.
450 A non-None return value means there's an error, and the return value
451 is the error message.
454 vgsize = vglist.get(vgname, None)
456 return "volume group '%s' missing" % vgname
458 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463 def _InitSSHSetup(node):
464 """Setup the SSH configuration for the cluster.
467 This generates a dsa keypair for root, adds the pub key to the
468 permitted hosts and adds the hostkey to its own known hosts.
471 node: the name of this host as a fqdn
474 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
476 for name in priv_key, pub_key:
477 if os.path.exists(name):
478 utils.CreateBackup(name)
479 utils.RemoveFile(name)
481 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
485 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
488 f = open(pub_key, 'r')
490 utils.AddAuthorizedKey(auth_keys, f.read(8192))
495 def _InitGanetiServerSetup(ss):
496 """Setup the necessary configuration for the initial node daemon.
498 This creates the nodepass file containing the shared password for
499 the cluster and also generates the SSL certificate.
502 # Create pseudo random password
503 randpass = sha.new(os.urandom(64)).hexdigest()
504 # and write it into sstore
505 ss.SetKey(ss.SS_NODED_PASS, randpass)
507 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508 "-days", str(365*5), "-nodes", "-x509",
509 "-keyout", constants.SSL_CERT_FILE,
510 "-out", constants.SSL_CERT_FILE, "-batch"])
512 raise errors.OpExecError("could not generate server ssl cert, command"
513 " %s had exitcode %s and error message %s" %
514 (result.cmd, result.exit_code, result.output))
516 os.chmod(constants.SSL_CERT_FILE, 0400)
518 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
521 raise errors.OpExecError("Could not start the node daemon, command %s"
522 " had exitcode %s and error %s" %
523 (result.cmd, result.exit_code, result.output))
526 def _CheckInstanceBridgesExist(instance):
527 """Check that the brigdes needed by an instance exist.
530 # check bridges existance
531 brlist = [nic.bridge for nic in instance.nics]
532 if not rpc.call_bridges_exist(instance.primary_node, brlist):
533 raise errors.OpPrereqError("one or more target bridges %s does not"
534 " exist on destination node '%s'" %
535 (brlist, instance.primary_node))
538 class LUInitCluster(LogicalUnit):
539 """Initialise the cluster.
542 HPATH = "cluster-init"
543 HTYPE = constants.HTYPE_CLUSTER
544 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545 "def_bridge", "master_netdev"]
548 def BuildHooksEnv(self):
551 Notes: Since we don't require a cluster, we must manually add
552 ourselves in the post-run node list.
555 env = {"OP_TARGET": self.op.cluster_name}
556 return env, [], [self.hostname.name]
558 def CheckPrereq(self):
559 """Verify that the passed name is a valid one.
562 if config.ConfigWriter.IsCluster():
563 raise errors.OpPrereqError("Cluster is already initialised")
565 self.hostname = hostname = utils.HostInfo()
567 if hostname.ip.startswith("127."):
568 raise errors.OpPrereqError("This host's IP resolves to the private"
569 " range (%s). Please fix DNS or /etc/hosts." %
572 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
574 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575 constants.DEFAULT_NODED_PORT):
576 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577 " to %s,\nbut this ip address does not"
578 " belong to this host."
579 " Aborting." % hostname.ip)
581 secondary_ip = getattr(self.op, "secondary_ip", None)
582 if secondary_ip and not utils.IsValidIP(secondary_ip):
583 raise errors.OpPrereqError("Invalid secondary ip given")
585 secondary_ip != hostname.ip and
586 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587 constants.DEFAULT_NODED_PORT))):
588 raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589 "but it does not belong to this host." %
591 self.secondary_ip = secondary_ip
593 # checks presence of the volume group given
594 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
597 raise errors.OpPrereqError("Error: %s" % vgstatus)
599 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
601 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
604 if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606 self.op.hypervisor_type)
608 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
610 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611 (self.op.master_netdev,
612 result.output.strip()))
614 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
615 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
616 raise errors.OpPrereqError("Init.d script '%s' missing or not "
617 "executable." % constants.NODE_INITD_SCRIPT)
619 def Exec(self, feedback_fn):
620 """Initialize the cluster.
623 clustername = self.clustername
624 hostname = self.hostname
626 # set up the simple store
627 self.sstore = ss = ssconf.SimpleStore()
628 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
634 # set up the inter-node password and certificate
635 _InitGanetiServerSetup(ss)
637 # start the master ip
638 rpc.call_node_start_master(hostname.name)
640 # set up ssh config and /etc/hosts
641 f = open(constants.SSH_HOST_RSA_PUB, 'r')
646 sshkey = sshline.split(" ")[1]
648 _UpdateEtcHosts(hostname.name, hostname.ip)
650 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
652 _InitSSHSetup(hostname.name)
654 # init of cluster config file
655 self.cfg = cfgw = config.ConfigWriter()
656 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657 sshkey, self.op.mac_prefix,
658 self.op.vg_name, self.op.def_bridge)
661 class LUDestroyCluster(NoHooksLU):
662 """Logical unit for destroying the cluster.
667 def CheckPrereq(self):
668 """Check prerequisites.
670 This checks whether the cluster is empty.
672 Any errors are signalled by raising errors.OpPrereqError.
675 master = self.sstore.GetMasterNode()
677 nodelist = self.cfg.GetNodeList()
678 if len(nodelist) != 1 or nodelist[0] != master:
679 raise errors.OpPrereqError("There are still %d node(s) in"
680 " this cluster." % (len(nodelist) - 1))
681 instancelist = self.cfg.GetInstanceList()
683 raise errors.OpPrereqError("There are still %d instance(s) in"
684 " this cluster." % len(instancelist))
686 def Exec(self, feedback_fn):
687 """Destroys the cluster.
690 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691 utils.CreateBackup(priv_key)
692 utils.CreateBackup(pub_key)
693 rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
696 class LUVerifyCluster(NoHooksLU):
697 """Verifies the cluster status.
702 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703 remote_version, feedback_fn):
704 """Run multiple tests against a node.
707 - compares ganeti version
708 - checks vg existance and size > 20G
709 - checks config file checksum
710 - checks ssh to other nodes
713 node: name of the node to check
714 file_list: required list of files
715 local_cksum: dictionary of local files and their checksums
718 # compares ganeti version
719 local_version = constants.PROTOCOL_VERSION
720 if not remote_version:
721 feedback_fn(" - ERROR: connection to %s failed" % (node))
724 if local_version != remote_version:
725 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
726 (local_version, node, remote_version))
729 # checks vg existance and size > 20G
733 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
737 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
739 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
742 # checks config file checksum
745 if 'filelist' not in node_result:
747 feedback_fn(" - ERROR: node hasn't returned file checksum data")
749 remote_cksum = node_result['filelist']
750 for file_name in file_list:
751 if file_name not in remote_cksum:
753 feedback_fn(" - ERROR: file '%s' missing" % file_name)
754 elif remote_cksum[file_name] != local_cksum[file_name]:
756 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
758 if 'nodelist' not in node_result:
760 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
762 if node_result['nodelist']:
764 for node in node_result['nodelist']:
765 feedback_fn(" - ERROR: communication with node '%s': %s" %
766 (node, node_result['nodelist'][node]))
767 hyp_result = node_result.get('hypervisor', None)
768 if hyp_result is not None:
769 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
772 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773 """Verify an instance.
775 This function checks to see if the required block devices are
776 available on the instance's node.
781 instancelist = self.cfg.GetInstanceList()
782 if not instance in instancelist:
783 feedback_fn(" - ERROR: instance %s not in instance list %s" %
784 (instance, instancelist))
787 instanceconfig = self.cfg.GetInstanceInfo(instance)
788 node_current = instanceconfig.primary_node
791 instanceconfig.MapLVsByNode(node_vol_should)
793 for node in node_vol_should:
794 for volume in node_vol_should[node]:
795 if node not in node_vol_is or volume not in node_vol_is[node]:
796 feedback_fn(" - ERROR: volume %s missing on node %s" %
800 if not instanceconfig.status == 'down':
801 if not instance in node_instance[node_current]:
802 feedback_fn(" - ERROR: instance %s not running on node %s" %
803 (instance, node_current))
806 for node in node_instance:
807 if (not node == node_current):
808 if instance in node_instance[node]:
809 feedback_fn(" - ERROR: instance %s should not run on node %s" %
815 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816 """Verify if there are any unknown volumes in the cluster.
818 The .os, .swap and backup volumes are ignored. All other volumes are
824 for node in node_vol_is:
825 for volume in node_vol_is[node]:
826 if node not in node_vol_should or volume not in node_vol_should[node]:
827 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
832 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833 """Verify the list of running instances.
835 This checks what instances are running but unknown to the cluster.
839 for node in node_instance:
840 for runninginstance in node_instance[node]:
841 if runninginstance not in instancelist:
842 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
843 (runninginstance, node))
847 def CheckPrereq(self):
848 """Check prerequisites.
850 This has no prerequisites.
855 def Exec(self, feedback_fn):
856 """Verify integrity of cluster, performing various test on nodes.
860 feedback_fn("* Verifying global settings")
861 self.cfg.VerifyConfig()
863 vg_name = self.cfg.GetVGName()
864 nodelist = utils.NiceSort(self.cfg.GetNodeList())
865 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
869 # FIXME: verify OS list
871 file_names = list(self.sstore.GetFileList())
872 file_names.append(constants.SSL_CERT_FILE)
873 file_names.append(constants.CLUSTER_CONF_FILE)
874 local_checksums = utils.FingerprintFiles(file_names)
876 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
877 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
878 all_instanceinfo = rpc.call_instance_list(nodelist)
879 all_vglist = rpc.call_vg_list(nodelist)
880 node_verify_param = {
881 'filelist': file_names,
882 'nodelist': nodelist,
885 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
886 all_rversion = rpc.call_version(nodelist)
888 for node in nodelist:
889 feedback_fn("* Verifying node %s" % node)
890 result = self._VerifyNode(node, file_names, local_checksums,
891 all_vglist[node], all_nvinfo[node],
892 all_rversion[node], feedback_fn)
896 volumeinfo = all_volumeinfo[node]
898 if type(volumeinfo) != dict:
899 feedback_fn(" - ERROR: connection to %s failed" % (node,))
903 node_volume[node] = volumeinfo
906 nodeinstance = all_instanceinfo[node]
907 if type(nodeinstance) != list:
908 feedback_fn(" - ERROR: connection to %s failed" % (node,))
912 node_instance[node] = nodeinstance
916 for instance in instancelist:
917 feedback_fn("* Verifying instance %s" % instance)
918 result = self._VerifyInstance(instance, node_volume, node_instance,
922 inst_config = self.cfg.GetInstanceInfo(instance)
924 inst_config.MapLVsByNode(node_vol_should)
926 feedback_fn("* Verifying orphan volumes")
927 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
931 feedback_fn("* Verifying remaining instances")
932 result = self._VerifyOrphanInstances(instancelist, node_instance,
939 class LURenameCluster(LogicalUnit):
940 """Rename the cluster.
943 HPATH = "cluster-rename"
944 HTYPE = constants.HTYPE_CLUSTER
947 def BuildHooksEnv(self):
952 "OP_TARGET": self.op.sstore.GetClusterName(),
953 "NEW_NAME": self.op.name,
955 mn = self.sstore.GetMasterNode()
956 return env, [mn], [mn]
958 def CheckPrereq(self):
959 """Verify that the passed name is a valid one.
962 hostname = utils.HostInfo(self.op.name)
964 new_name = hostname.name
965 self.ip = new_ip = hostname.ip
966 old_name = self.sstore.GetClusterName()
967 old_ip = self.sstore.GetMasterIP()
968 if new_name == old_name and new_ip == old_ip:
969 raise errors.OpPrereqError("Neither the name nor the IP address of the"
970 " cluster has changed")
972 result = utils.RunCmd(["fping", "-q", new_ip])
973 if not result.failed:
974 raise errors.OpPrereqError("The given cluster IP address (%s) is"
975 " reachable on the network. Aborting." %
978 self.op.name = new_name
980 def Exec(self, feedback_fn):
981 """Rename the cluster.
984 clustername = self.op.name
988 # shutdown the master IP
989 master = ss.GetMasterNode()
990 if not rpc.call_node_stop_master(master):
991 raise errors.OpExecError("Could not disable the master role")
995 ss.SetKey(ss.SS_MASTER_IP, ip)
996 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
998 # Distribute updated ss config to all nodes
999 myself = self.cfg.GetNodeInfo(master)
1000 dist_nodes = self.cfg.GetNodeList()
1001 if myself.name in dist_nodes:
1002 dist_nodes.remove(myself.name)
1004 logger.Debug("Copying updated ssconf data to all nodes")
1005 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1006 fname = ss.KeyToFilename(keyname)
1007 result = rpc.call_upload_file(dist_nodes, fname)
1008 for to_node in dist_nodes:
1009 if not result[to_node]:
1010 logger.Error("copy of file %s to node %s failed" %
1013 if not rpc.call_node_start_master(master):
1014 logger.Error("Could not re-enable the master role on the master,\n"
1015 "please restart manually.")
1018 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1019 """Sleep and poll for an instance's disk to sync.
1022 if not instance.disks:
1026 logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1028 node = instance.primary_node
1030 for dev in instance.disks:
1031 cfgw.SetDiskID(dev, node)
1037 cumul_degraded = False
1038 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1040 logger.ToStderr("Can't get any data from node %s" % node)
1043 raise errors.RemoteError("Can't contact node %s for mirror data,"
1044 " aborting." % node)
1048 for i in range(len(rstats)):
1051 logger.ToStderr("Can't compute data for node %s/%s" %
1052 (node, instance.disks[i].iv_name))
1054 # we ignore the ldisk parameter
1055 perc_done, est_time, is_degraded, _ = mstat
1056 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1057 if perc_done is not None:
1059 if est_time is not None:
1060 rem_time = "%d estimated seconds remaining" % est_time
1063 rem_time = "no time estimate"
1064 logger.ToStdout("- device %s: %5.2f%% done, %s" %
1065 (instance.disks[i].iv_name, perc_done, rem_time))
1072 time.sleep(min(60, max_time))
1078 logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1079 return not cumul_degraded
1082 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1083 """Check that mirrors are not degraded.
1085 The ldisk parameter, if True, will change the test from the
1086 is_degraded attribute (which represents overall non-ok status for
1087 the device(s)) to the ldisk (representing the local storage status).
1090 cfgw.SetDiskID(dev, node)
1097 if on_primary or dev.AssembleOnSecondary():
1098 rstats = rpc.call_blockdev_find(node, dev)
1100 logger.ToStderr("Can't get any data from node %s" % node)
1103 result = result and (not rstats[idx])
1105 for child in dev.children:
1106 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1111 class LUDiagnoseOS(NoHooksLU):
1112 """Logical unit for OS diagnose/query.
1117 def CheckPrereq(self):
1118 """Check prerequisites.
1120 This always succeeds, since this is a pure query LU.
1125 def Exec(self, feedback_fn):
1126 """Compute the list of OSes.
1129 node_list = self.cfg.GetNodeList()
1130 node_data = rpc.call_os_diagnose(node_list)
1131 if node_data == False:
1132 raise errors.OpExecError("Can't gather the list of OSes")
1136 class LURemoveNode(LogicalUnit):
1137 """Logical unit for removing a node.
1140 HPATH = "node-remove"
1141 HTYPE = constants.HTYPE_NODE
1142 _OP_REQP = ["node_name"]
1144 def BuildHooksEnv(self):
1147 This doesn't run on the target node in the pre phase as a failed
1148 node would not allows itself to run.
1152 "OP_TARGET": self.op.node_name,
1153 "NODE_NAME": self.op.node_name,
1155 all_nodes = self.cfg.GetNodeList()
1156 all_nodes.remove(self.op.node_name)
1157 return env, all_nodes, all_nodes
1159 def CheckPrereq(self):
1160 """Check prerequisites.
1163 - the node exists in the configuration
1164 - it does not have primary or secondary instances
1165 - it's not the master
1167 Any errors are signalled by raising errors.OpPrereqError.
1170 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1172 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1174 instance_list = self.cfg.GetInstanceList()
1176 masternode = self.sstore.GetMasterNode()
1177 if node.name == masternode:
1178 raise errors.OpPrereqError("Node is the master node,"
1179 " you need to failover first.")
1181 for instance_name in instance_list:
1182 instance = self.cfg.GetInstanceInfo(instance_name)
1183 if node.name == instance.primary_node:
1184 raise errors.OpPrereqError("Instance %s still running on the node,"
1185 " please remove first." % instance_name)
1186 if node.name in instance.secondary_nodes:
1187 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1188 " please remove first." % instance_name)
1189 self.op.node_name = node.name
1192 def Exec(self, feedback_fn):
1193 """Removes the node from the cluster.
1197 logger.Info("stopping the node daemon and removing configs from node %s" %
1200 rpc.call_node_leave_cluster(node.name)
1202 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1204 logger.Info("Removing node %s from config" % node.name)
1206 self.cfg.RemoveNode(node.name)
1209 class LUQueryNodes(NoHooksLU):
1210 """Logical unit for querying nodes.
1213 _OP_REQP = ["output_fields", "names"]
1215 def CheckPrereq(self):
1216 """Check prerequisites.
1218 This checks that the fields required are valid output fields.
1221 self.dynamic_fields = frozenset(["dtotal", "dfree",
1222 "mtotal", "mnode", "mfree",
1225 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1226 "pinst_list", "sinst_list",
1228 dynamic=self.dynamic_fields,
1229 selected=self.op.output_fields)
1231 self.wanted = _GetWantedNodes(self, self.op.names)
1233 def Exec(self, feedback_fn):
1234 """Computes the list of nodes and their attributes.
1237 nodenames = self.wanted
1238 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1240 # begin data gathering
1242 if self.dynamic_fields.intersection(self.op.output_fields):
1244 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1245 for name in nodenames:
1246 nodeinfo = node_data.get(name, None)
1249 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1250 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1251 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1252 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1253 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1254 "bootid": nodeinfo['bootid'],
1257 live_data[name] = {}
1259 live_data = dict.fromkeys(nodenames, {})
1261 node_to_primary = dict([(name, set()) for name in nodenames])
1262 node_to_secondary = dict([(name, set()) for name in nodenames])
1264 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1265 "sinst_cnt", "sinst_list"))
1266 if inst_fields & frozenset(self.op.output_fields):
1267 instancelist = self.cfg.GetInstanceList()
1269 for instance_name in instancelist:
1270 inst = self.cfg.GetInstanceInfo(instance_name)
1271 if inst.primary_node in node_to_primary:
1272 node_to_primary[inst.primary_node].add(inst.name)
1273 for secnode in inst.secondary_nodes:
1274 if secnode in node_to_secondary:
1275 node_to_secondary[secnode].add(inst.name)
1277 # end data gathering
1280 for node in nodelist:
1282 for field in self.op.output_fields:
1285 elif field == "pinst_list":
1286 val = list(node_to_primary[node.name])
1287 elif field == "sinst_list":
1288 val = list(node_to_secondary[node.name])
1289 elif field == "pinst_cnt":
1290 val = len(node_to_primary[node.name])
1291 elif field == "sinst_cnt":
1292 val = len(node_to_secondary[node.name])
1293 elif field == "pip":
1294 val = node.primary_ip
1295 elif field == "sip":
1296 val = node.secondary_ip
1297 elif field in self.dynamic_fields:
1298 val = live_data[node.name].get(field, None)
1300 raise errors.ParameterError(field)
1301 node_output.append(val)
1302 output.append(node_output)
1307 class LUQueryNodeVolumes(NoHooksLU):
1308 """Logical unit for getting volumes on node(s).
1311 _OP_REQP = ["nodes", "output_fields"]
1313 def CheckPrereq(self):
1314 """Check prerequisites.
1316 This checks that the fields required are valid output fields.
1319 self.nodes = _GetWantedNodes(self, self.op.nodes)
1321 _CheckOutputFields(static=["node"],
1322 dynamic=["phys", "vg", "name", "size", "instance"],
1323 selected=self.op.output_fields)
1326 def Exec(self, feedback_fn):
1327 """Computes the list of nodes and their attributes.
1330 nodenames = self.nodes
1331 volumes = rpc.call_node_volumes(nodenames)
1333 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1334 in self.cfg.GetInstanceList()]
1336 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1339 for node in nodenames:
1340 if node not in volumes or not volumes[node]:
1343 node_vols = volumes[node][:]
1344 node_vols.sort(key=lambda vol: vol['dev'])
1346 for vol in node_vols:
1348 for field in self.op.output_fields:
1351 elif field == "phys":
1355 elif field == "name":
1357 elif field == "size":
1358 val = int(float(vol['size']))
1359 elif field == "instance":
1361 if node not in lv_by_node[inst]:
1363 if vol['name'] in lv_by_node[inst][node]:
1369 raise errors.ParameterError(field)
1370 node_output.append(str(val))
1372 output.append(node_output)
1377 class LUAddNode(LogicalUnit):
1378 """Logical unit for adding node to the cluster.
1382 HTYPE = constants.HTYPE_NODE
1383 _OP_REQP = ["node_name"]
1385 def BuildHooksEnv(self):
1388 This will run on all nodes before, and on all nodes + the new node after.
1392 "OP_TARGET": self.op.node_name,
1393 "NODE_NAME": self.op.node_name,
1394 "NODE_PIP": self.op.primary_ip,
1395 "NODE_SIP": self.op.secondary_ip,
1397 nodes_0 = self.cfg.GetNodeList()
1398 nodes_1 = nodes_0 + [self.op.node_name, ]
1399 return env, nodes_0, nodes_1
1401 def CheckPrereq(self):
1402 """Check prerequisites.
1405 - the new node is not already in the config
1407 - its parameters (single/dual homed) matches the cluster
1409 Any errors are signalled by raising errors.OpPrereqError.
1412 node_name = self.op.node_name
1415 dns_data = utils.HostInfo(node_name)
1417 node = dns_data.name
1418 primary_ip = self.op.primary_ip = dns_data.ip
1419 secondary_ip = getattr(self.op, "secondary_ip", None)
1420 if secondary_ip is None:
1421 secondary_ip = primary_ip
1422 if not utils.IsValidIP(secondary_ip):
1423 raise errors.OpPrereqError("Invalid secondary IP given")
1424 self.op.secondary_ip = secondary_ip
1425 node_list = cfg.GetNodeList()
1426 if node in node_list:
1427 raise errors.OpPrereqError("Node %s is already in the configuration"
1430 for existing_node_name in node_list:
1431 existing_node = cfg.GetNodeInfo(existing_node_name)
1432 if (existing_node.primary_ip == primary_ip or
1433 existing_node.secondary_ip == primary_ip or
1434 existing_node.primary_ip == secondary_ip or
1435 existing_node.secondary_ip == secondary_ip):
1436 raise errors.OpPrereqError("New node ip address(es) conflict with"
1437 " existing node %s" % existing_node.name)
1439 # check that the type of the node (single versus dual homed) is the
1440 # same as for the master
1441 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1442 master_singlehomed = myself.secondary_ip == myself.primary_ip
1443 newbie_singlehomed = secondary_ip == primary_ip
1444 if master_singlehomed != newbie_singlehomed:
1445 if master_singlehomed:
1446 raise errors.OpPrereqError("The master has no private ip but the"
1447 " new node has one")
1449 raise errors.OpPrereqError("The master has a private ip but the"
1450 " new node doesn't have one")
1452 # checks reachablity
1453 if not utils.TcpPing(utils.HostInfo().name,
1455 constants.DEFAULT_NODED_PORT):
1456 raise errors.OpPrereqError("Node not reachable by ping")
1458 if not newbie_singlehomed:
1459 # check reachability from my secondary ip to newbie's secondary ip
1460 if not utils.TcpPing(myself.secondary_ip,
1462 constants.DEFAULT_NODED_PORT):
1463 raise errors.OpPrereqError(
1464 "Node secondary ip not reachable by TCP based ping to noded port")
1466 self.new_node = objects.Node(name=node,
1467 primary_ip=primary_ip,
1468 secondary_ip=secondary_ip)
1470 def Exec(self, feedback_fn):
1471 """Adds the new node to the cluster.
1474 new_node = self.new_node
1475 node = new_node.name
1477 # set up inter-node password and certificate and restarts the node daemon
1478 gntpass = self.sstore.GetNodeDaemonPassword()
1479 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1480 raise errors.OpExecError("ganeti password corruption detected")
1481 f = open(constants.SSL_CERT_FILE)
1483 gntpem = f.read(8192)
1486 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1487 # so we use this to detect an invalid certificate; as long as the
1488 # cert doesn't contain this, the here-document will be correctly
1489 # parsed by the shell sequence below
1490 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1491 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1492 if not gntpem.endswith("\n"):
1493 raise errors.OpExecError("PEM must end with newline")
1494 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1496 # and then connect with ssh to set password and start ganeti-noded
1497 # note that all the below variables are sanitized at this point,
1498 # either by being constants or by the checks above
1500 mycommand = ("umask 077 && "
1501 "echo '%s' > '%s' && "
1502 "cat > '%s' << '!EOF.' && \n"
1503 "%s!EOF.\n%s restart" %
1504 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1505 constants.SSL_CERT_FILE, gntpem,
1506 constants.NODE_INITD_SCRIPT))
1508 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1510 raise errors.OpExecError("Remote command on node %s, error: %s,"
1512 (node, result.fail_reason, result.output))
1514 # check connectivity
1517 result = rpc.call_version([node])[node]
1519 if constants.PROTOCOL_VERSION == result:
1520 logger.Info("communication to node %s fine, sw version %s match" %
1523 raise errors.OpExecError("Version mismatch master version %s,"
1524 " node version %s" %
1525 (constants.PROTOCOL_VERSION, result))
1527 raise errors.OpExecError("Cannot get version from the new node")
1530 logger.Info("copy ssh key to node %s" % node)
1531 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1533 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1534 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1540 keyarray.append(f.read())
1544 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1545 keyarray[3], keyarray[4], keyarray[5])
1548 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1550 # Add node to our /etc/hosts, and add key to known_hosts
1551 _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1552 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1553 self.cfg.GetHostKey())
1555 if new_node.secondary_ip != new_node.primary_ip:
1556 if not rpc.call_node_tcp_ping(new_node.name,
1557 constants.LOCALHOST_IP_ADDRESS,
1558 new_node.secondary_ip,
1559 constants.DEFAULT_NODED_PORT,
1561 raise errors.OpExecError("Node claims it doesn't have the"
1562 " secondary ip you gave (%s).\n"
1563 "Please fix and re-run this command." %
1564 new_node.secondary_ip)
1566 success, msg = ssh.VerifyNodeHostname(node)
1568 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1569 " than the one the resolver gives: %s.\n"
1570 "Please fix and re-run this command." %
1573 # Distribute updated /etc/hosts and known_hosts to all nodes,
1574 # including the node just added
1575 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1576 dist_nodes = self.cfg.GetNodeList() + [node]
1577 if myself.name in dist_nodes:
1578 dist_nodes.remove(myself.name)
1580 logger.Debug("Copying hosts and known_hosts to all nodes")
1581 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1582 result = rpc.call_upload_file(dist_nodes, fname)
1583 for to_node in dist_nodes:
1584 if not result[to_node]:
1585 logger.Error("copy of file %s to node %s failed" %
1588 to_copy = ss.GetFileList()
1589 for fname in to_copy:
1590 if not ssh.CopyFileToNode(node, fname):
1591 logger.Error("could not copy file %s to node %s" % (fname, node))
1593 logger.Info("adding node %s to cluster.conf" % node)
1594 self.cfg.AddNode(new_node)
1597 class LUMasterFailover(LogicalUnit):
1598 """Failover the master node to the current node.
1600 This is a special LU in that it must run on a non-master node.
1603 HPATH = "master-failover"
1604 HTYPE = constants.HTYPE_CLUSTER
1608 def BuildHooksEnv(self):
1611 This will run on the new master only in the pre phase, and on all
1612 the nodes in the post phase.
1616 "OP_TARGET": self.new_master,
1617 "NEW_MASTER": self.new_master,
1618 "OLD_MASTER": self.old_master,
1620 return env, [self.new_master], self.cfg.GetNodeList()
1622 def CheckPrereq(self):
1623 """Check prerequisites.
1625 This checks that we are not already the master.
1628 self.new_master = utils.HostInfo().name
1629 self.old_master = self.sstore.GetMasterNode()
1631 if self.old_master == self.new_master:
1632 raise errors.OpPrereqError("This commands must be run on the node"
1633 " where you want the new master to be.\n"
1634 "%s is already the master" %
1637 def Exec(self, feedback_fn):
1638 """Failover the master node.
1640 This command, when run on a non-master node, will cause the current
1641 master to cease being master, and the non-master to become new
1645 #TODO: do not rely on gethostname returning the FQDN
1646 logger.Info("setting master to %s, old master: %s" %
1647 (self.new_master, self.old_master))
1649 if not rpc.call_node_stop_master(self.old_master):
1650 logger.Error("could disable the master role on the old master"
1651 " %s, please disable manually" % self.old_master)
1654 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1655 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1656 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1657 logger.Error("could not distribute the new simple store master file"
1658 " to the other nodes, please check.")
1660 if not rpc.call_node_start_master(self.new_master):
1661 logger.Error("could not start the master role on the new master"
1662 " %s, please check" % self.new_master)
1663 feedback_fn("Error in activating the master IP on the new master,\n"
1664 "please fix manually.")
1668 class LUQueryClusterInfo(NoHooksLU):
1669 """Query cluster configuration.
1675 def CheckPrereq(self):
1676 """No prerequsites needed for this LU.
1681 def Exec(self, feedback_fn):
1682 """Return cluster config.
1686 "name": self.sstore.GetClusterName(),
1687 "software_version": constants.RELEASE_VERSION,
1688 "protocol_version": constants.PROTOCOL_VERSION,
1689 "config_version": constants.CONFIG_VERSION,
1690 "os_api_version": constants.OS_API_VERSION,
1691 "export_version": constants.EXPORT_VERSION,
1692 "master": self.sstore.GetMasterNode(),
1693 "architecture": (platform.architecture()[0], platform.machine()),
1699 class LUClusterCopyFile(NoHooksLU):
1700 """Copy file to cluster.
1703 _OP_REQP = ["nodes", "filename"]
1705 def CheckPrereq(self):
1706 """Check prerequisites.
1708 It should check that the named file exists and that the given list
1712 if not os.path.exists(self.op.filename):
1713 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1715 self.nodes = _GetWantedNodes(self, self.op.nodes)
1717 def Exec(self, feedback_fn):
1718 """Copy a file from master to some nodes.
1721 opts - class with options as members
1722 args - list containing a single element, the file name
1724 nodes - list containing the name of target nodes; if empty, all nodes
1727 filename = self.op.filename
1729 myname = utils.HostInfo().name
1731 for node in self.nodes:
1734 if not ssh.CopyFileToNode(node, filename):
1735 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1738 class LUDumpClusterConfig(NoHooksLU):
1739 """Return a text-representation of the cluster-config.
1744 def CheckPrereq(self):
1745 """No prerequisites.
1750 def Exec(self, feedback_fn):
1751 """Dump a representation of the cluster config to the standard output.
1754 return self.cfg.DumpConfig()
1757 class LURunClusterCommand(NoHooksLU):
1758 """Run a command on some nodes.
1761 _OP_REQP = ["command", "nodes"]
1763 def CheckPrereq(self):
1764 """Check prerequisites.
1766 It checks that the given list of nodes is valid.
1769 self.nodes = _GetWantedNodes(self, self.op.nodes)
1771 def Exec(self, feedback_fn):
1772 """Run a command on some nodes.
1776 for node in self.nodes:
1777 result = ssh.SSHCall(node, "root", self.op.command)
1778 data.append((node, result.output, result.exit_code))
1783 class LUActivateInstanceDisks(NoHooksLU):
1784 """Bring up an instance's disks.
1787 _OP_REQP = ["instance_name"]
1789 def CheckPrereq(self):
1790 """Check prerequisites.
1792 This checks that the instance is in the cluster.
1795 instance = self.cfg.GetInstanceInfo(
1796 self.cfg.ExpandInstanceName(self.op.instance_name))
1797 if instance is None:
1798 raise errors.OpPrereqError("Instance '%s' not known" %
1799 self.op.instance_name)
1800 self.instance = instance
1803 def Exec(self, feedback_fn):
1804 """Activate the disks.
1807 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1809 raise errors.OpExecError("Cannot activate block devices")
1814 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1815 """Prepare the block devices for an instance.
1817 This sets up the block devices on all nodes.
1820 instance: a ganeti.objects.Instance object
1821 ignore_secondaries: if true, errors on secondary nodes won't result
1822 in an error return from the function
1825 false if the operation failed
1826 list of (host, instance_visible_name, node_visible_name) if the operation
1827 suceeded with the mapping from node devices to instance devices
1831 for inst_disk in instance.disks:
1832 master_result = None
1833 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1834 cfg.SetDiskID(node_disk, node)
1835 is_primary = node == instance.primary_node
1836 result = rpc.call_blockdev_assemble(node, node_disk,
1837 instance.name, is_primary)
1839 logger.Error("could not prepare block device %s on node %s (is_pri"
1840 "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1841 if is_primary or not ignore_secondaries:
1844 master_result = result
1845 device_info.append((instance.primary_node, inst_disk.iv_name,
1848 # leave the disks configured for the primary node
1849 # this is a workaround that would be fixed better by
1850 # improving the logical/physical id handling
1851 for disk in instance.disks:
1852 cfg.SetDiskID(disk, instance.primary_node)
1854 return disks_ok, device_info
1857 def _StartInstanceDisks(cfg, instance, force):
1858 """Start the disks of an instance.
1861 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1862 ignore_secondaries=force)
1864 _ShutdownInstanceDisks(instance, cfg)
1865 if force is not None and not force:
1866 logger.Error("If the message above refers to a secondary node,"
1867 " you can retry the operation using '--force'.")
1868 raise errors.OpExecError("Disk consistency error")
1871 class LUDeactivateInstanceDisks(NoHooksLU):
1872 """Shutdown an instance's disks.
1875 _OP_REQP = ["instance_name"]
1877 def CheckPrereq(self):
1878 """Check prerequisites.
1880 This checks that the instance is in the cluster.
1883 instance = self.cfg.GetInstanceInfo(
1884 self.cfg.ExpandInstanceName(self.op.instance_name))
1885 if instance is None:
1886 raise errors.OpPrereqError("Instance '%s' not known" %
1887 self.op.instance_name)
1888 self.instance = instance
1890 def Exec(self, feedback_fn):
1891 """Deactivate the disks
1894 instance = self.instance
1895 ins_l = rpc.call_instance_list([instance.primary_node])
1896 ins_l = ins_l[instance.primary_node]
1897 if not type(ins_l) is list:
1898 raise errors.OpExecError("Can't contact node '%s'" %
1899 instance.primary_node)
1901 if self.instance.name in ins_l:
1902 raise errors.OpExecError("Instance is running, can't shutdown"
1905 _ShutdownInstanceDisks(instance, self.cfg)
1908 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1909 """Shutdown block devices of an instance.
1911 This does the shutdown on all nodes of the instance.
1913 If the ignore_primary is false, errors on the primary node are
1918 for disk in instance.disks:
1919 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1920 cfg.SetDiskID(top_disk, node)
1921 if not rpc.call_blockdev_shutdown(node, top_disk):
1922 logger.Error("could not shutdown block device %s on node %s" %
1923 (disk.iv_name, node))
1924 if not ignore_primary or node != instance.primary_node:
1929 class LUStartupInstance(LogicalUnit):
1930 """Starts an instance.
1933 HPATH = "instance-start"
1934 HTYPE = constants.HTYPE_INSTANCE
1935 _OP_REQP = ["instance_name", "force"]
1937 def BuildHooksEnv(self):
1940 This runs on master, primary and secondary nodes of the instance.
1944 "FORCE": self.op.force,
1946 env.update(_BuildInstanceHookEnvByObject(self.instance))
1947 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1948 list(self.instance.secondary_nodes))
1951 def CheckPrereq(self):
1952 """Check prerequisites.
1954 This checks that the instance is in the cluster.
1957 instance = self.cfg.GetInstanceInfo(
1958 self.cfg.ExpandInstanceName(self.op.instance_name))
1959 if instance is None:
1960 raise errors.OpPrereqError("Instance '%s' not known" %
1961 self.op.instance_name)
1963 # check bridges existance
1964 _CheckInstanceBridgesExist(instance)
1966 self.instance = instance
1967 self.op.instance_name = instance.name
1969 def Exec(self, feedback_fn):
1970 """Start the instance.
1973 instance = self.instance
1974 force = self.op.force
1975 extra_args = getattr(self.op, "extra_args", "")
1977 node_current = instance.primary_node
1979 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1981 raise errors.OpExecError("Could not contact node %s for infos" %
1984 freememory = nodeinfo[node_current]['memory_free']
1985 memory = instance.memory
1986 if memory > freememory:
1987 raise errors.OpExecError("Not enough memory to start instance"
1989 " needed %s MiB, available %s MiB" %
1990 (instance.name, node_current, memory,
1993 _StartInstanceDisks(self.cfg, instance, force)
1995 if not rpc.call_instance_start(node_current, instance, extra_args):
1996 _ShutdownInstanceDisks(instance, self.cfg)
1997 raise errors.OpExecError("Could not start instance")
1999 self.cfg.MarkInstanceUp(instance.name)
2002 class LURebootInstance(LogicalUnit):
2003 """Reboot an instance.
2006 HPATH = "instance-reboot"
2007 HTYPE = constants.HTYPE_INSTANCE
2008 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2010 def BuildHooksEnv(self):
2013 This runs on master, primary and secondary nodes of the instance.
2017 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2019 env.update(_BuildInstanceHookEnvByObject(self.instance))
2020 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2021 list(self.instance.secondary_nodes))
2024 def CheckPrereq(self):
2025 """Check prerequisites.
2027 This checks that the instance is in the cluster.
2030 instance = self.cfg.GetInstanceInfo(
2031 self.cfg.ExpandInstanceName(self.op.instance_name))
2032 if instance is None:
2033 raise errors.OpPrereqError("Instance '%s' not known" %
2034 self.op.instance_name)
2036 # check bridges existance
2037 _CheckInstanceBridgesExist(instance)
2039 self.instance = instance
2040 self.op.instance_name = instance.name
2042 def Exec(self, feedback_fn):
2043 """Reboot the instance.
2046 instance = self.instance
2047 ignore_secondaries = self.op.ignore_secondaries
2048 reboot_type = self.op.reboot_type
2049 extra_args = getattr(self.op, "extra_args", "")
2051 node_current = instance.primary_node
2053 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2054 constants.INSTANCE_REBOOT_HARD,
2055 constants.INSTANCE_REBOOT_FULL]:
2056 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2057 (constants.INSTANCE_REBOOT_SOFT,
2058 constants.INSTANCE_REBOOT_HARD,
2059 constants.INSTANCE_REBOOT_FULL))
2061 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2062 constants.INSTANCE_REBOOT_HARD]:
2063 if not rpc.call_instance_reboot(node_current, instance,
2064 reboot_type, extra_args):
2065 raise errors.OpExecError("Could not reboot instance")
2067 if not rpc.call_instance_shutdown(node_current, instance):
2068 raise errors.OpExecError("could not shutdown instance for full reboot")
2069 _ShutdownInstanceDisks(instance, self.cfg)
2070 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2071 if not rpc.call_instance_start(node_current, instance, extra_args):
2072 _ShutdownInstanceDisks(instance, self.cfg)
2073 raise errors.OpExecError("Could not start instance for full reboot")
2075 self.cfg.MarkInstanceUp(instance.name)
2078 class LUShutdownInstance(LogicalUnit):
2079 """Shutdown an instance.
2082 HPATH = "instance-stop"
2083 HTYPE = constants.HTYPE_INSTANCE
2084 _OP_REQP = ["instance_name"]
2086 def BuildHooksEnv(self):
2089 This runs on master, primary and secondary nodes of the instance.
2092 env = _BuildInstanceHookEnvByObject(self.instance)
2093 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2094 list(self.instance.secondary_nodes))
2097 def CheckPrereq(self):
2098 """Check prerequisites.
2100 This checks that the instance is in the cluster.
2103 instance = self.cfg.GetInstanceInfo(
2104 self.cfg.ExpandInstanceName(self.op.instance_name))
2105 if instance is None:
2106 raise errors.OpPrereqError("Instance '%s' not known" %
2107 self.op.instance_name)
2108 self.instance = instance
2110 def Exec(self, feedback_fn):
2111 """Shutdown the instance.
2114 instance = self.instance
2115 node_current = instance.primary_node
2116 if not rpc.call_instance_shutdown(node_current, instance):
2117 logger.Error("could not shutdown instance")
2119 self.cfg.MarkInstanceDown(instance.name)
2120 _ShutdownInstanceDisks(instance, self.cfg)
2123 class LUReinstallInstance(LogicalUnit):
2124 """Reinstall an instance.
2127 HPATH = "instance-reinstall"
2128 HTYPE = constants.HTYPE_INSTANCE
2129 _OP_REQP = ["instance_name"]
2131 def BuildHooksEnv(self):
2134 This runs on master, primary and secondary nodes of the instance.
2137 env = _BuildInstanceHookEnvByObject(self.instance)
2138 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2139 list(self.instance.secondary_nodes))
2142 def CheckPrereq(self):
2143 """Check prerequisites.
2145 This checks that the instance is in the cluster and is not running.
2148 instance = self.cfg.GetInstanceInfo(
2149 self.cfg.ExpandInstanceName(self.op.instance_name))
2150 if instance is None:
2151 raise errors.OpPrereqError("Instance '%s' not known" %
2152 self.op.instance_name)
2153 if instance.disk_template == constants.DT_DISKLESS:
2154 raise errors.OpPrereqError("Instance '%s' has no disks" %
2155 self.op.instance_name)
2156 if instance.status != "down":
2157 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2158 self.op.instance_name)
2159 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2161 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2162 (self.op.instance_name,
2163 instance.primary_node))
2165 self.op.os_type = getattr(self.op, "os_type", None)
2166 if self.op.os_type is not None:
2168 pnode = self.cfg.GetNodeInfo(
2169 self.cfg.ExpandNodeName(instance.primary_node))
2171 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2173 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2175 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2176 " primary node" % self.op.os_type)
2178 self.instance = instance
2180 def Exec(self, feedback_fn):
2181 """Reinstall the instance.
2184 inst = self.instance
2186 if self.op.os_type is not None:
2187 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2188 inst.os = self.op.os_type
2189 self.cfg.AddInstance(inst)
2191 _StartInstanceDisks(self.cfg, inst, None)
2193 feedback_fn("Running the instance OS create scripts...")
2194 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2195 raise errors.OpExecError("Could not install OS for instance %s "
2197 (inst.name, inst.primary_node))
2199 _ShutdownInstanceDisks(inst, self.cfg)
2202 class LURenameInstance(LogicalUnit):
2203 """Rename an instance.
2206 HPATH = "instance-rename"
2207 HTYPE = constants.HTYPE_INSTANCE
2208 _OP_REQP = ["instance_name", "new_name"]
2210 def BuildHooksEnv(self):
2213 This runs on master, primary and secondary nodes of the instance.
2216 env = _BuildInstanceHookEnvByObject(self.instance)
2217 env["INSTANCE_NEW_NAME"] = self.op.new_name
2218 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2219 list(self.instance.secondary_nodes))
2222 def CheckPrereq(self):
2223 """Check prerequisites.
2225 This checks that the instance is in the cluster and is not running.
2228 instance = self.cfg.GetInstanceInfo(
2229 self.cfg.ExpandInstanceName(self.op.instance_name))
2230 if instance is None:
2231 raise errors.OpPrereqError("Instance '%s' not known" %
2232 self.op.instance_name)
2233 if instance.status != "down":
2234 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2235 self.op.instance_name)
2236 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2238 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2239 (self.op.instance_name,
2240 instance.primary_node))
2241 self.instance = instance
2243 # new name verification
2244 name_info = utils.HostInfo(self.op.new_name)
2246 self.op.new_name = new_name = name_info.name
2247 if not getattr(self.op, "ignore_ip", False):
2248 command = ["fping", "-q", name_info.ip]
2249 result = utils.RunCmd(command)
2250 if not result.failed:
2251 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2252 (name_info.ip, new_name))
2255 def Exec(self, feedback_fn):
2256 """Reinstall the instance.
2259 inst = self.instance
2260 old_name = inst.name
2262 self.cfg.RenameInstance(inst.name, self.op.new_name)
2264 # re-read the instance from the configuration after rename
2265 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2267 _StartInstanceDisks(self.cfg, inst, None)
2269 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2271 msg = ("Could run OS rename script for instance %s\n"
2273 "(but the instance has been renamed in Ganeti)" %
2274 (inst.name, inst.primary_node))
2277 _ShutdownInstanceDisks(inst, self.cfg)
2280 class LURemoveInstance(LogicalUnit):
2281 """Remove an instance.
2284 HPATH = "instance-remove"
2285 HTYPE = constants.HTYPE_INSTANCE
2286 _OP_REQP = ["instance_name"]
2288 def BuildHooksEnv(self):
2291 This runs on master, primary and secondary nodes of the instance.
2294 env = _BuildInstanceHookEnvByObject(self.instance)
2295 nl = [self.sstore.GetMasterNode()]
2298 def CheckPrereq(self):
2299 """Check prerequisites.
2301 This checks that the instance is in the cluster.
2304 instance = self.cfg.GetInstanceInfo(
2305 self.cfg.ExpandInstanceName(self.op.instance_name))
2306 if instance is None:
2307 raise errors.OpPrereqError("Instance '%s' not known" %
2308 self.op.instance_name)
2309 self.instance = instance
2311 def Exec(self, feedback_fn):
2312 """Remove the instance.
2315 instance = self.instance
2316 logger.Info("shutting down instance %s on node %s" %
2317 (instance.name, instance.primary_node))
2319 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2320 if self.op.ignore_failures:
2321 feedback_fn("Warning: can't shutdown instance")
2323 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2324 (instance.name, instance.primary_node))
2326 logger.Info("removing block devices for instance %s" % instance.name)
2328 if not _RemoveDisks(instance, self.cfg):
2329 if self.op.ignore_failures:
2330 feedback_fn("Warning: can't remove instance's disks")
2332 raise errors.OpExecError("Can't remove instance's disks")
2334 logger.Info("removing instance %s out of cluster config" % instance.name)
2336 self.cfg.RemoveInstance(instance.name)
2339 class LUQueryInstances(NoHooksLU):
2340 """Logical unit for querying instances.
2343 _OP_REQP = ["output_fields", "names"]
2345 def CheckPrereq(self):
2346 """Check prerequisites.
2348 This checks that the fields required are valid output fields.
2351 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2352 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2353 "admin_state", "admin_ram",
2354 "disk_template", "ip", "mac", "bridge",
2355 "sda_size", "sdb_size"],
2356 dynamic=self.dynamic_fields,
2357 selected=self.op.output_fields)
2359 self.wanted = _GetWantedInstances(self, self.op.names)
2361 def Exec(self, feedback_fn):
2362 """Computes the list of nodes and their attributes.
2365 instance_names = self.wanted
2366 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2369 # begin data gathering
2371 nodes = frozenset([inst.primary_node for inst in instance_list])
2374 if self.dynamic_fields.intersection(self.op.output_fields):
2376 node_data = rpc.call_all_instances_info(nodes)
2378 result = node_data[name]
2380 live_data.update(result)
2381 elif result == False:
2382 bad_nodes.append(name)
2383 # else no instance is alive
2385 live_data = dict([(name, {}) for name in instance_names])
2387 # end data gathering
2390 for instance in instance_list:
2392 for field in self.op.output_fields:
2397 elif field == "pnode":
2398 val = instance.primary_node
2399 elif field == "snodes":
2400 val = list(instance.secondary_nodes)
2401 elif field == "admin_state":
2402 val = (instance.status != "down")
2403 elif field == "oper_state":
2404 if instance.primary_node in bad_nodes:
2407 val = bool(live_data.get(instance.name))
2408 elif field == "admin_ram":
2409 val = instance.memory
2410 elif field == "oper_ram":
2411 if instance.primary_node in bad_nodes:
2413 elif instance.name in live_data:
2414 val = live_data[instance.name].get("memory", "?")
2417 elif field == "disk_template":
2418 val = instance.disk_template
2420 val = instance.nics[0].ip
2421 elif field == "bridge":
2422 val = instance.nics[0].bridge
2423 elif field == "mac":
2424 val = instance.nics[0].mac
2425 elif field == "sda_size" or field == "sdb_size":
2426 disk = instance.FindDisk(field[:3])
2432 raise errors.ParameterError(field)
2439 class LUFailoverInstance(LogicalUnit):
2440 """Failover an instance.
2443 HPATH = "instance-failover"
2444 HTYPE = constants.HTYPE_INSTANCE
2445 _OP_REQP = ["instance_name", "ignore_consistency"]
2447 def BuildHooksEnv(self):
2450 This runs on master, primary and secondary nodes of the instance.
2454 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2456 env.update(_BuildInstanceHookEnvByObject(self.instance))
2457 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2460 def CheckPrereq(self):
2461 """Check prerequisites.
2463 This checks that the instance is in the cluster.
2466 instance = self.cfg.GetInstanceInfo(
2467 self.cfg.ExpandInstanceName(self.op.instance_name))
2468 if instance is None:
2469 raise errors.OpPrereqError("Instance '%s' not known" %
2470 self.op.instance_name)
2472 if instance.disk_template not in constants.DTS_NET_MIRROR:
2473 raise errors.OpPrereqError("Instance's disk layout is not"
2474 " network mirrored, cannot failover.")
2476 secondary_nodes = instance.secondary_nodes
2477 if not secondary_nodes:
2478 raise errors.ProgrammerError("no secondary node but using "
2479 "DT_REMOTE_RAID1 template")
2481 # check memory requirements on the secondary node
2482 target_node = secondary_nodes[0]
2483 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2484 info = nodeinfo.get(target_node, None)
2486 raise errors.OpPrereqError("Cannot get current information"
2487 " from node '%s'" % nodeinfo)
2488 if instance.memory > info['memory_free']:
2489 raise errors.OpPrereqError("Not enough memory on target node %s."
2490 " %d MB available, %d MB required" %
2491 (target_node, info['memory_free'],
2494 # check bridge existance
2495 brlist = [nic.bridge for nic in instance.nics]
2496 if not rpc.call_bridges_exist(target_node, brlist):
2497 raise errors.OpPrereqError("One or more target bridges %s does not"
2498 " exist on destination node '%s'" %
2499 (brlist, target_node))
2501 self.instance = instance
2503 def Exec(self, feedback_fn):
2504 """Failover an instance.
2506 The failover is done by shutting it down on its present node and
2507 starting it on the secondary.
2510 instance = self.instance
2512 source_node = instance.primary_node
2513 target_node = instance.secondary_nodes[0]
2515 feedback_fn("* checking disk consistency between source and target")
2516 for dev in instance.disks:
2517 # for remote_raid1, these are md over drbd
2518 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2519 if not self.op.ignore_consistency:
2520 raise errors.OpExecError("Disk %s is degraded on target node,"
2521 " aborting failover." % dev.iv_name)
2523 feedback_fn("* checking target node resource availability")
2524 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2527 raise errors.OpExecError("Could not contact target node %s." %
2530 free_memory = int(nodeinfo[target_node]['memory_free'])
2531 memory = instance.memory
2532 if memory > free_memory:
2533 raise errors.OpExecError("Not enough memory to create instance %s on"
2534 " node %s. needed %s MiB, available %s MiB" %
2535 (instance.name, target_node, memory,
2538 feedback_fn("* shutting down instance on source node")
2539 logger.Info("Shutting down instance %s on node %s" %
2540 (instance.name, source_node))
2542 if not rpc.call_instance_shutdown(source_node, instance):
2543 if self.op.ignore_consistency:
2544 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2545 " anyway. Please make sure node %s is down" %
2546 (instance.name, source_node, source_node))
2548 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2549 (instance.name, source_node))
2551 feedback_fn("* deactivating the instance's disks on source node")
2552 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2553 raise errors.OpExecError("Can't shut down the instance's disks.")
2555 instance.primary_node = target_node
2556 # distribute new instance config to the other nodes
2557 self.cfg.AddInstance(instance)
2559 feedback_fn("* activating the instance's disks on target node")
2560 logger.Info("Starting instance %s on node %s" %
2561 (instance.name, target_node))
2563 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2564 ignore_secondaries=True)
2566 _ShutdownInstanceDisks(instance, self.cfg)
2567 raise errors.OpExecError("Can't activate the instance's disks")
2569 feedback_fn("* starting the instance on the target node")
2570 if not rpc.call_instance_start(target_node, instance, None):
2571 _ShutdownInstanceDisks(instance, self.cfg)
2572 raise errors.OpExecError("Could not start instance %s on node %s." %
2573 (instance.name, target_node))
2576 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2577 """Create a tree of block devices on the primary node.
2579 This always creates all devices.
2583 for child in device.children:
2584 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2587 cfg.SetDiskID(device, node)
2588 new_id = rpc.call_blockdev_create(node, device, device.size,
2589 instance.name, True, info)
2592 if device.physical_id is None:
2593 device.physical_id = new_id
2597 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2598 """Create a tree of block devices on a secondary node.
2600 If this device type has to be created on secondaries, create it and
2603 If not, just recurse to children keeping the same 'force' value.
2606 if device.CreateOnSecondary():
2609 for child in device.children:
2610 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2611 child, force, info):
2616 cfg.SetDiskID(device, node)
2617 new_id = rpc.call_blockdev_create(node, device, device.size,
2618 instance.name, False, info)
2621 if device.physical_id is None:
2622 device.physical_id = new_id
2626 def _GenerateUniqueNames(cfg, exts):
2627 """Generate a suitable LV name.
2629 This will generate a logical volume name for the given instance.
2634 new_id = cfg.GenerateUniqueID()
2635 results.append("%s%s" % (new_id, val))
2639 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2640 """Generate a drbd device complete with its children.
2643 port = cfg.AllocatePort()
2644 vgname = cfg.GetVGName()
2645 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2646 logical_id=(vgname, names[0]))
2647 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2648 logical_id=(vgname, names[1]))
2649 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2650 logical_id = (primary, secondary, port),
2651 children = [dev_data, dev_meta])
2655 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2656 """Generate a drbd8 device complete with its children.
2659 port = cfg.AllocatePort()
2660 vgname = cfg.GetVGName()
2661 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662 logical_id=(vgname, names[0]))
2663 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664 logical_id=(vgname, names[1]))
2665 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2666 logical_id = (primary, secondary, port),
2667 children = [dev_data, dev_meta],
2671 def _GenerateDiskTemplate(cfg, template_name,
2672 instance_name, primary_node,
2673 secondary_nodes, disk_sz, swap_sz):
2674 """Generate the entire disk layout for a given template type.
2677 #TODO: compute space requirements
2679 vgname = cfg.GetVGName()
2680 if template_name == "diskless":
2682 elif template_name == "plain":
2683 if len(secondary_nodes) != 0:
2684 raise errors.ProgrammerError("Wrong template configuration")
2686 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2687 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688 logical_id=(vgname, names[0]),
2690 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2691 logical_id=(vgname, names[1]),
2693 disks = [sda_dev, sdb_dev]
2694 elif template_name == "local_raid1":
2695 if len(secondary_nodes) != 0:
2696 raise errors.ProgrammerError("Wrong template configuration")
2699 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2700 ".sdb_m1", ".sdb_m2"])
2701 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2702 logical_id=(vgname, names[0]))
2703 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704 logical_id=(vgname, names[1]))
2705 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2707 children = [sda_dev_m1, sda_dev_m2])
2708 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2709 logical_id=(vgname, names[2]))
2710 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2711 logical_id=(vgname, names[3]))
2712 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2714 children = [sdb_dev_m1, sdb_dev_m2])
2715 disks = [md_sda_dev, md_sdb_dev]
2716 elif template_name == constants.DT_REMOTE_RAID1:
2717 if len(secondary_nodes) != 1:
2718 raise errors.ProgrammerError("Wrong template configuration")
2719 remote_node = secondary_nodes[0]
2720 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2721 ".sdb_data", ".sdb_meta"])
2722 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2723 disk_sz, names[0:2])
2724 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2725 children = [drbd_sda_dev], size=disk_sz)
2726 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2727 swap_sz, names[2:4])
2728 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2729 children = [drbd_sdb_dev], size=swap_sz)
2730 disks = [md_sda_dev, md_sdb_dev]
2731 elif template_name == constants.DT_DRBD8:
2732 if len(secondary_nodes) != 1:
2733 raise errors.ProgrammerError("Wrong template configuration")
2734 remote_node = secondary_nodes[0]
2735 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2736 ".sdb_data", ".sdb_meta"])
2737 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2738 disk_sz, names[0:2], "sda")
2739 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2740 swap_sz, names[2:4], "sdb")
2741 disks = [drbd_sda_dev, drbd_sdb_dev]
2743 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2747 def _GetInstanceInfoText(instance):
2748 """Compute that text that should be added to the disk's metadata.
2751 return "originstname+%s" % instance.name
2754 def _CreateDisks(cfg, instance):
2755 """Create all disks for an instance.
2757 This abstracts away some work from AddInstance.
2760 instance: the instance object
2763 True or False showing the success of the creation process
2766 info = _GetInstanceInfoText(instance)
2768 for device in instance.disks:
2769 logger.Info("creating volume %s for instance %s" %
2770 (device.iv_name, instance.name))
2772 for secondary_node in instance.secondary_nodes:
2773 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2774 device, False, info):
2775 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2776 (device.iv_name, device, secondary_node))
2779 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2780 instance, device, info):
2781 logger.Error("failed to create volume %s on primary!" %
2787 def _RemoveDisks(instance, cfg):
2788 """Remove all disks for an instance.
2790 This abstracts away some work from `AddInstance()` and
2791 `RemoveInstance()`. Note that in case some of the devices couldn't
2792 be removed, the removal will continue with the other ones (compare
2793 with `_CreateDisks()`).
2796 instance: the instance object
2799 True or False showing the success of the removal proces
2802 logger.Info("removing block devices for instance %s" % instance.name)
2805 for device in instance.disks:
2806 for node, disk in device.ComputeNodeTree(instance.primary_node):
2807 cfg.SetDiskID(disk, node)
2808 if not rpc.call_blockdev_remove(node, disk):
2809 logger.Error("could not remove block device %s on node %s,"
2810 " continuing anyway" %
2811 (device.iv_name, node))
2816 class LUCreateInstance(LogicalUnit):
2817 """Create an instance.
2820 HPATH = "instance-add"
2821 HTYPE = constants.HTYPE_INSTANCE
2822 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2823 "disk_template", "swap_size", "mode", "start", "vcpus",
2824 "wait_for_sync", "ip_check"]
2826 def BuildHooksEnv(self):
2829 This runs on master, primary and secondary nodes of the instance.
2833 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2834 "INSTANCE_DISK_SIZE": self.op.disk_size,
2835 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2836 "INSTANCE_ADD_MODE": self.op.mode,
2838 if self.op.mode == constants.INSTANCE_IMPORT:
2839 env["INSTANCE_SRC_NODE"] = self.op.src_node
2840 env["INSTANCE_SRC_PATH"] = self.op.src_path
2841 env["INSTANCE_SRC_IMAGE"] = self.src_image
2843 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2844 primary_node=self.op.pnode,
2845 secondary_nodes=self.secondaries,
2846 status=self.instance_status,
2847 os_type=self.op.os_type,
2848 memory=self.op.mem_size,
2849 vcpus=self.op.vcpus,
2850 nics=[(self.inst_ip, self.op.bridge)],
2853 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2858 def CheckPrereq(self):
2859 """Check prerequisites.
2862 if self.op.mode not in (constants.INSTANCE_CREATE,
2863 constants.INSTANCE_IMPORT):
2864 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2867 if self.op.mode == constants.INSTANCE_IMPORT:
2868 src_node = getattr(self.op, "src_node", None)
2869 src_path = getattr(self.op, "src_path", None)
2870 if src_node is None or src_path is None:
2871 raise errors.OpPrereqError("Importing an instance requires source"
2872 " node and path options")
2873 src_node_full = self.cfg.ExpandNodeName(src_node)
2874 if src_node_full is None:
2875 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2876 self.op.src_node = src_node = src_node_full
2878 if not os.path.isabs(src_path):
2879 raise errors.OpPrereqError("The source path must be absolute")
2881 export_info = rpc.call_export_info(src_node, src_path)
2884 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2886 if not export_info.has_section(constants.INISECT_EXP):
2887 raise errors.ProgrammerError("Corrupted export config")
2889 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2890 if (int(ei_version) != constants.EXPORT_VERSION):
2891 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2892 (ei_version, constants.EXPORT_VERSION))
2894 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2895 raise errors.OpPrereqError("Can't import instance with more than"
2898 # FIXME: are the old os-es, disk sizes, etc. useful?
2899 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2900 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2902 self.src_image = diskimage
2903 else: # INSTANCE_CREATE
2904 if getattr(self.op, "os_type", None) is None:
2905 raise errors.OpPrereqError("No guest OS specified")
2907 # check primary node
2908 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2910 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2912 self.op.pnode = pnode.name
2914 self.secondaries = []
2915 # disk template and mirror node verification
2916 if self.op.disk_template not in constants.DISK_TEMPLATES:
2917 raise errors.OpPrereqError("Invalid disk template name")
2919 if self.op.disk_template in constants.DTS_NET_MIRROR:
2920 if getattr(self.op, "snode", None) is None:
2921 raise errors.OpPrereqError("The networked disk templates need"
2924 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2925 if snode_name is None:
2926 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2928 elif snode_name == pnode.name:
2929 raise errors.OpPrereqError("The secondary node cannot be"
2930 " the primary node.")
2931 self.secondaries.append(snode_name)
2933 # Check lv size requirements
2934 nodenames = [pnode.name] + self.secondaries
2935 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2937 # Required free disk space as a function of disk and swap space
2939 constants.DT_DISKLESS: 0,
2940 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2941 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2942 # 256 MB are added for drbd metadata, 128MB for each drbd device
2943 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2944 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2947 if self.op.disk_template not in req_size_dict:
2948 raise errors.ProgrammerError("Disk template '%s' size requirement"
2949 " is unknown" % self.op.disk_template)
2951 req_size = req_size_dict[self.op.disk_template]
2953 for node in nodenames:
2954 info = nodeinfo.get(node, None)
2956 raise errors.OpPrereqError("Cannot get current information"
2957 " from node '%s'" % nodeinfo)
2958 if req_size > info['vg_free']:
2959 raise errors.OpPrereqError("Not enough disk space on target node %s."
2960 " %d MB available, %d MB required" %
2961 (node, info['vg_free'], req_size))
2964 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2966 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2967 " primary node" % self.op.os_type)
2969 # instance verification
2970 hostname1 = utils.HostInfo(self.op.instance_name)
2972 self.op.instance_name = instance_name = hostname1.name
2973 instance_list = self.cfg.GetInstanceList()
2974 if instance_name in instance_list:
2975 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2978 ip = getattr(self.op, "ip", None)
2979 if ip is None or ip.lower() == "none":
2981 elif ip.lower() == "auto":
2982 inst_ip = hostname1.ip
2984 if not utils.IsValidIP(ip):
2985 raise errors.OpPrereqError("given IP address '%s' doesn't look"
2986 " like a valid IP" % ip)
2988 self.inst_ip = inst_ip
2990 if self.op.start and not self.op.ip_check:
2991 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2992 " adding an instance in start mode")
2994 if self.op.ip_check:
2995 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2996 constants.DEFAULT_NODED_PORT):
2997 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2998 (hostname1.ip, instance_name))
3000 # bridge verification
3001 bridge = getattr(self.op, "bridge", None)
3003 self.op.bridge = self.cfg.GetDefBridge()
3005 self.op.bridge = bridge
3007 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3008 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3009 " destination node '%s'" %
3010 (self.op.bridge, pnode.name))
3013 self.instance_status = 'up'
3015 self.instance_status = 'down'
3017 def Exec(self, feedback_fn):
3018 """Create and add the instance to the cluster.
3021 instance = self.op.instance_name
3022 pnode_name = self.pnode.name
3024 nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3025 if self.inst_ip is not None:
3026 nic.ip = self.inst_ip
3028 disks = _GenerateDiskTemplate(self.cfg,
3029 self.op.disk_template,
3030 instance, pnode_name,
3031 self.secondaries, self.op.disk_size,
3034 iobj = objects.Instance(name=instance, os=self.op.os_type,
3035 primary_node=pnode_name,
3036 memory=self.op.mem_size,
3037 vcpus=self.op.vcpus,
3038 nics=[nic], disks=disks,
3039 disk_template=self.op.disk_template,
3040 status=self.instance_status,
3043 feedback_fn("* creating instance disks...")
3044 if not _CreateDisks(self.cfg, iobj):
3045 _RemoveDisks(iobj, self.cfg)
3046 raise errors.OpExecError("Device creation failed, reverting...")
3048 feedback_fn("adding instance %s to cluster config" % instance)
3050 self.cfg.AddInstance(iobj)
3052 if self.op.wait_for_sync:
3053 disk_abort = not _WaitForSync(self.cfg, iobj)
3054 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3055 # make sure the disks are not degraded (still sync-ing is ok)
3057 feedback_fn("* checking mirrors status")
3058 disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3063 _RemoveDisks(iobj, self.cfg)
3064 self.cfg.RemoveInstance(iobj.name)
3065 raise errors.OpExecError("There are some degraded disks for"
3068 feedback_fn("creating os for instance %s on node %s" %
3069 (instance, pnode_name))
3071 if iobj.disk_template != constants.DT_DISKLESS:
3072 if self.op.mode == constants.INSTANCE_CREATE:
3073 feedback_fn("* running the instance OS create scripts...")
3074 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3075 raise errors.OpExecError("could not add os for instance %s"
3077 (instance, pnode_name))
3079 elif self.op.mode == constants.INSTANCE_IMPORT:
3080 feedback_fn("* running the instance OS import scripts...")
3081 src_node = self.op.src_node
3082 src_image = self.src_image
3083 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3084 src_node, src_image):
3085 raise errors.OpExecError("Could not import os for instance"
3087 (instance, pnode_name))
3089 # also checked in the prereq part
3090 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3094 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3095 feedback_fn("* starting instance...")
3096 if not rpc.call_instance_start(pnode_name, iobj, None):
3097 raise errors.OpExecError("Could not start instance")
3100 class LUConnectConsole(NoHooksLU):
3101 """Connect to an instance's console.
3103 This is somewhat special in that it returns the command line that
3104 you need to run on the master node in order to connect to the
3108 _OP_REQP = ["instance_name"]
3110 def CheckPrereq(self):
3111 """Check prerequisites.
3113 This checks that the instance is in the cluster.
3116 instance = self.cfg.GetInstanceInfo(
3117 self.cfg.ExpandInstanceName(self.op.instance_name))
3118 if instance is None:
3119 raise errors.OpPrereqError("Instance '%s' not known" %
3120 self.op.instance_name)
3121 self.instance = instance
3123 def Exec(self, feedback_fn):
3124 """Connect to the console of an instance
3127 instance = self.instance
3128 node = instance.primary_node
3130 node_insts = rpc.call_instance_list([node])[node]
3131 if node_insts is False:
3132 raise errors.OpExecError("Can't connect to node %s." % node)
3134 if instance.name not in node_insts:
3135 raise errors.OpExecError("Instance %s is not running." % instance.name)
3137 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3139 hyper = hypervisor.GetHypervisor()
3140 console_cmd = hyper.GetShellCommandForConsole(instance.name)
3142 argv = ["ssh", "-q", "-t"]
3143 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3144 argv.extend(ssh.BATCH_MODE_OPTS)
3146 argv.append(console_cmd)
3150 class LUAddMDDRBDComponent(LogicalUnit):
3151 """Adda new mirror member to an instance's disk.
3154 HPATH = "mirror-add"
3155 HTYPE = constants.HTYPE_INSTANCE
3156 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3158 def BuildHooksEnv(self):
3161 This runs on the master, the primary and all the secondaries.
3165 "NEW_SECONDARY": self.op.remote_node,
3166 "DISK_NAME": self.op.disk_name,
3168 env.update(_BuildInstanceHookEnvByObject(self.instance))
3169 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3170 self.op.remote_node,] + list(self.instance.secondary_nodes)
3173 def CheckPrereq(self):
3174 """Check prerequisites.
3176 This checks that the instance is in the cluster.
3179 instance = self.cfg.GetInstanceInfo(
3180 self.cfg.ExpandInstanceName(self.op.instance_name))
3181 if instance is None:
3182 raise errors.OpPrereqError("Instance '%s' not known" %
3183 self.op.instance_name)
3184 self.instance = instance
3186 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3187 if remote_node is None:
3188 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3189 self.remote_node = remote_node
3191 if remote_node == instance.primary_node:
3192 raise errors.OpPrereqError("The specified node is the primary node of"
3195 if instance.disk_template != constants.DT_REMOTE_RAID1:
3196 raise errors.OpPrereqError("Instance's disk layout is not"
3198 for disk in instance.disks:
3199 if disk.iv_name == self.op.disk_name:
3202 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3203 " instance." % self.op.disk_name)
3204 if len(disk.children) > 1:
3205 raise errors.OpPrereqError("The device already has two slave"
3207 "This would create a 3-disk raid1"
3208 " which we don't allow.")
3211 def Exec(self, feedback_fn):
3212 """Add the mirror component
3216 instance = self.instance
3218 remote_node = self.remote_node
3219 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3220 names = _GenerateUniqueNames(self.cfg, lv_names)
3221 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3222 remote_node, disk.size, names)
3224 logger.Info("adding new mirror component on secondary")
3226 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3228 _GetInstanceInfoText(instance)):
3229 raise errors.OpExecError("Failed to create new component on secondary"
3230 " node %s" % remote_node)
3232 logger.Info("adding new mirror component on primary")
3234 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3236 _GetInstanceInfoText(instance)):
3237 # remove secondary dev
3238 self.cfg.SetDiskID(new_drbd, remote_node)
3239 rpc.call_blockdev_remove(remote_node, new_drbd)
3240 raise errors.OpExecError("Failed to create volume on primary")
3242 # the device exists now
3243 # call the primary node to add the mirror to md
3244 logger.Info("adding new mirror component to md")
3245 if not rpc.call_blockdev_addchildren(instance.primary_node,
3247 logger.Error("Can't add mirror compoment to md!")
3248 self.cfg.SetDiskID(new_drbd, remote_node)
3249 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3250 logger.Error("Can't rollback on secondary")
3251 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3252 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3253 logger.Error("Can't rollback on primary")
3254 raise errors.OpExecError("Can't add mirror component to md array")
3256 disk.children.append(new_drbd)
3258 self.cfg.AddInstance(instance)
3260 _WaitForSync(self.cfg, instance)
3265 class LURemoveMDDRBDComponent(LogicalUnit):
3266 """Remove a component from a remote_raid1 disk.
3269 HPATH = "mirror-remove"
3270 HTYPE = constants.HTYPE_INSTANCE
3271 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3273 def BuildHooksEnv(self):
3276 This runs on the master, the primary and all the secondaries.
3280 "DISK_NAME": self.op.disk_name,
3281 "DISK_ID": self.op.disk_id,
3282 "OLD_SECONDARY": self.old_secondary,
3284 env.update(_BuildInstanceHookEnvByObject(self.instance))
3285 nl = [self.sstore.GetMasterNode(),
3286 self.instance.primary_node] + list(self.instance.secondary_nodes)
3289 def CheckPrereq(self):
3290 """Check prerequisites.
3292 This checks that the instance is in the cluster.
3295 instance = self.cfg.GetInstanceInfo(
3296 self.cfg.ExpandInstanceName(self.op.instance_name))
3297 if instance is None:
3298 raise errors.OpPrereqError("Instance '%s' not known" %
3299 self.op.instance_name)
3300 self.instance = instance
3302 if instance.disk_template != constants.DT_REMOTE_RAID1:
3303 raise errors.OpPrereqError("Instance's disk layout is not"
3305 for disk in instance.disks:
3306 if disk.iv_name == self.op.disk_name:
3309 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310 " instance." % self.op.disk_name)
3311 for child in disk.children:
3312 if (child.dev_type == constants.LD_DRBD7 and
3313 child.logical_id[2] == self.op.disk_id):
3316 raise errors.OpPrereqError("Can't find the device with this port.")
3318 if len(disk.children) < 2:
3319 raise errors.OpPrereqError("Cannot remove the last component from"
3323 if self.child.logical_id[0] == instance.primary_node:
3327 self.old_secondary = self.child.logical_id[oid]
3329 def Exec(self, feedback_fn):
3330 """Remove the mirror component
3333 instance = self.instance
3336 logger.Info("remove mirror component")
3337 self.cfg.SetDiskID(disk, instance.primary_node)
3338 if not rpc.call_blockdev_removechildren(instance.primary_node,
3340 raise errors.OpExecError("Can't remove child from mirror.")
3342 for node in child.logical_id[:2]:
3343 self.cfg.SetDiskID(child, node)
3344 if not rpc.call_blockdev_remove(node, child):
3345 logger.Error("Warning: failed to remove device from node %s,"
3346 " continuing operation." % node)
3348 disk.children.remove(child)
3349 self.cfg.AddInstance(instance)
3352 class LUReplaceDisks(LogicalUnit):
3353 """Replace the disks of an instance.
3356 HPATH = "mirrors-replace"
3357 HTYPE = constants.HTYPE_INSTANCE
3358 _OP_REQP = ["instance_name", "mode", "disks"]
3360 def BuildHooksEnv(self):
3363 This runs on the master, the primary and all the secondaries.
3367 "MODE": self.op.mode,
3368 "NEW_SECONDARY": self.op.remote_node,
3369 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3371 env.update(_BuildInstanceHookEnvByObject(self.instance))
3373 self.sstore.GetMasterNode(),
3374 self.instance.primary_node,
3376 if self.op.remote_node is not None:
3377 nl.append(self.op.remote_node)
3380 def CheckPrereq(self):
3381 """Check prerequisites.
3383 This checks that the instance is in the cluster.
3386 instance = self.cfg.GetInstanceInfo(
3387 self.cfg.ExpandInstanceName(self.op.instance_name))
3388 if instance is None:
3389 raise errors.OpPrereqError("Instance '%s' not known" %
3390 self.op.instance_name)
3391 self.instance = instance
3393 if instance.disk_template not in constants.DTS_NET_MIRROR:
3394 raise errors.OpPrereqError("Instance's disk layout is not"
3395 " network mirrored.")
3397 if len(instance.secondary_nodes) != 1:
3398 raise errors.OpPrereqError("The instance has a strange layout,"
3399 " expected one secondary but found %d" %
3400 len(instance.secondary_nodes))
3402 self.sec_node = instance.secondary_nodes[0]
3404 remote_node = getattr(self.op, "remote_node", None)
3405 if remote_node is not None:
3406 remote_node = self.cfg.ExpandNodeName(remote_node)
3407 if remote_node is None:
3408 raise errors.OpPrereqError("Node '%s' not known" %
3409 self.op.remote_node)
3410 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3412 self.remote_node_info = None
3413 if remote_node == instance.primary_node:
3414 raise errors.OpPrereqError("The specified node is the primary node of"
3416 elif remote_node == self.sec_node:
3417 if self.op.mode == constants.REPLACE_DISK_SEC:
3418 # this is for DRBD8, where we can't execute the same mode of
3419 # replacement as for drbd7 (no different port allocated)
3420 raise errors.OpPrereqError("Same secondary given, cannot execute"
3422 # the user gave the current secondary, switch to
3423 # 'no-replace-secondary' mode for drbd7
3425 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3426 self.op.mode != constants.REPLACE_DISK_ALL):
3427 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3428 " disks replacement, not individual ones")
3429 if instance.disk_template == constants.DT_DRBD8:
3430 if self.op.mode == constants.REPLACE_DISK_ALL:
3431 raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3432 " secondary disk replacement, not"
3434 elif self.op.mode == constants.REPLACE_DISK_PRI:
3435 if remote_node is not None:
3436 raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3437 " the secondary while doing a primary"
3438 " node disk replacement")
3439 self.tgt_node = instance.primary_node
3440 self.oth_node = instance.secondary_nodes[0]
3441 elif self.op.mode == constants.REPLACE_DISK_SEC:
3442 self.new_node = remote_node # this can be None, in which case
3443 # we don't change the secondary
3444 self.tgt_node = instance.secondary_nodes[0]
3445 self.oth_node = instance.primary_node
3447 raise errors.ProgrammerError("Unhandled disk replace mode")
3449 for name in self.op.disks:
3450 if instance.FindDisk(name) is None:
3451 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3452 (name, instance.name))
3453 self.op.remote_node = remote_node
3455 def _ExecRR1(self, feedback_fn):
3456 """Replace the disks of an instance.
3459 instance = self.instance
3462 if self.op.remote_node is None:
3463 remote_node = self.sec_node
3465 remote_node = self.op.remote_node
3467 for dev in instance.disks:
3469 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3470 names = _GenerateUniqueNames(cfg, lv_names)
3471 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3472 remote_node, size, names)
3473 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3474 logger.Info("adding new mirror component on secondary for %s" %
3477 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3479 _GetInstanceInfoText(instance)):
3480 raise errors.OpExecError("Failed to create new component on"
3481 " secondary node %s\n"
3482 "Full abort, cleanup manually!" %
3485 logger.Info("adding new mirror component on primary")
3487 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3489 _GetInstanceInfoText(instance)):
3490 # remove secondary dev
3491 cfg.SetDiskID(new_drbd, remote_node)
3492 rpc.call_blockdev_remove(remote_node, new_drbd)
3493 raise errors.OpExecError("Failed to create volume on primary!\n"
3494 "Full abort, cleanup manually!!")
3496 # the device exists now
3497 # call the primary node to add the mirror to md
3498 logger.Info("adding new mirror component to md")
3499 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3501 logger.Error("Can't add mirror compoment to md!")
3502 cfg.SetDiskID(new_drbd, remote_node)
3503 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3504 logger.Error("Can't rollback on secondary")
3505 cfg.SetDiskID(new_drbd, instance.primary_node)
3506 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3507 logger.Error("Can't rollback on primary")
3508 raise errors.OpExecError("Full abort, cleanup manually!!")
3510 dev.children.append(new_drbd)
3511 cfg.AddInstance(instance)
3513 # this can fail as the old devices are degraded and _WaitForSync
3514 # does a combined result over all disks, so we don't check its
3516 _WaitForSync(cfg, instance, unlock=True)
3518 # so check manually all the devices
3519 for name in iv_names:
3520 dev, child, new_drbd = iv_names[name]
3521 cfg.SetDiskID(dev, instance.primary_node)
3522 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3524 raise errors.OpExecError("MD device %s is degraded!" % name)
3525 cfg.SetDiskID(new_drbd, instance.primary_node)
3526 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3528 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3530 for name in iv_names:
3531 dev, child, new_drbd = iv_names[name]
3532 logger.Info("remove mirror %s component" % name)
3533 cfg.SetDiskID(dev, instance.primary_node)
3534 if not rpc.call_blockdev_removechildren(instance.primary_node,
3536 logger.Error("Can't remove child from mirror, aborting"
3537 " *this device cleanup*.\nYou need to cleanup manually!!")
3540 for node in child.logical_id[:2]:
3541 logger.Info("remove child device on %s" % node)
3542 cfg.SetDiskID(child, node)
3543 if not rpc.call_blockdev_remove(node, child):
3544 logger.Error("Warning: failed to remove device from node %s,"
3545 " continuing operation." % node)
3547 dev.children.remove(child)
3549 cfg.AddInstance(instance)
3551 def _ExecD8DiskOnly(self, feedback_fn):
3552 """Replace a disk on the primary or secondary for dbrd8.
3554 The algorithm for replace is quite complicated:
3555 - for each disk to be replaced:
3556 - create new LVs on the target node with unique names
3557 - detach old LVs from the drbd device
3558 - rename old LVs to name_replaced.<time_t>
3559 - rename new LVs to old LVs
3560 - attach the new LVs (with the old names now) to the drbd device
3561 - wait for sync across all devices
3562 - for each modified disk:
3563 - remove old LVs (which have the name name_replaces.<time_t>)
3565 Failures are not very well handled.
3569 warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3570 instance = self.instance
3572 vgname = self.cfg.GetVGName()
3575 tgt_node = self.tgt_node
3576 oth_node = self.oth_node
3578 # Step: check device activation
3579 self.processor.LogStep(1, steps_total, "check device existence")
3580 info("checking volume groups")
3581 my_vg = cfg.GetVGName()
3582 results = rpc.call_vg_list([oth_node, tgt_node])
3584 raise errors.OpExecError("Can't list volume groups on the nodes")
3585 for node in oth_node, tgt_node:
3586 res = results.get(node, False)
3587 if not res or my_vg not in res:
3588 raise errors.OpExecError("Volume group '%s' not found on %s" %
3590 for dev in instance.disks:
3591 if not dev.iv_name in self.op.disks:
3593 for node in tgt_node, oth_node:
3594 info("checking %s on %s" % (dev.iv_name, node))
3595 cfg.SetDiskID(dev, node)
3596 if not rpc.call_blockdev_find(node, dev):
3597 raise errors.OpExecError("Can't find device %s on node %s" %
3598 (dev.iv_name, node))
3600 # Step: check other node consistency
3601 self.processor.LogStep(2, steps_total, "check peer consistency")
3602 for dev in instance.disks:
3603 if not dev.iv_name in self.op.disks:
3605 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3606 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3607 oth_node==instance.primary_node):
3608 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3609 " to replace disks on this node (%s)" %
3610 (oth_node, tgt_node))
3612 # Step: create new storage
3613 self.processor.LogStep(3, steps_total, "allocate new storage")
3614 for dev in instance.disks:
3615 if not dev.iv_name in self.op.disks:
3618 cfg.SetDiskID(dev, tgt_node)
3619 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3620 names = _GenerateUniqueNames(cfg, lv_names)
3621 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3622 logical_id=(vgname, names[0]))
3623 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3624 logical_id=(vgname, names[1]))
3625 new_lvs = [lv_data, lv_meta]
3626 old_lvs = dev.children
3627 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3628 info("creating new local storage on %s for %s" %
3629 (tgt_node, dev.iv_name))
3630 # since we *always* want to create this LV, we use the
3631 # _Create...OnPrimary (which forces the creation), even if we
3632 # are talking about the secondary node
3633 for new_lv in new_lvs:
3634 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3635 _GetInstanceInfoText(instance)):
3636 raise errors.OpExecError("Failed to create new LV named '%s' on"
3638 (new_lv.logical_id[1], tgt_node))
3640 # Step: for each lv, detach+rename*2+attach
3641 self.processor.LogStep(4, steps_total, "change drbd configuration")
3642 for dev, old_lvs, new_lvs in iv_names.itervalues():
3643 info("detaching %s drbd from local storage" % dev.iv_name)
3644 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3645 raise errors.OpExecError("Can't detach drbd from local storage on node"
3646 " %s for device %s" % (tgt_node, dev.iv_name))
3648 #cfg.Update(instance)
3650 # ok, we created the new LVs, so now we know we have the needed
3651 # storage; as such, we proceed on the target node to rename
3652 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3653 # using the assumption than logical_id == physical_id (which in
3654 # turn is the unique_id on that node)
3656 # FIXME(iustin): use a better name for the replaced LVs
3657 temp_suffix = int(time.time())
3658 ren_fn = lambda d, suff: (d.physical_id[0],
3659 d.physical_id[1] + "_replaced-%s" % suff)
3660 # build the rename list based on what LVs exist on the node
3662 for to_ren in old_lvs:
3663 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3664 if find_res is not None: # device exists
3665 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3667 info("renaming the old LVs on the target node")
3668 if not rpc.call_blockdev_rename(tgt_node, rlist):
3669 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3670 # now we rename the new LVs to the old LVs
3671 info("renaming the new LVs on the target node")
3672 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3673 if not rpc.call_blockdev_rename(tgt_node, rlist):
3674 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3676 for old, new in zip(old_lvs, new_lvs):
3677 new.logical_id = old.logical_id
3678 cfg.SetDiskID(new, tgt_node)
3680 for disk in old_lvs:
3681 disk.logical_id = ren_fn(disk, temp_suffix)
3682 cfg.SetDiskID(disk, tgt_node)
3684 # now that the new lvs have the old name, we can add them to the device
3685 info("adding new mirror component on %s" % tgt_node)
3686 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3687 for new_lv in new_lvs:
3688 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3689 warning("Can't rollback device %s", "manually cleanup unused"
3691 raise errors.OpExecError("Can't add local storage to drbd")
3693 dev.children = new_lvs
3694 cfg.Update(instance)
3696 # Step: wait for sync
3698 # this can fail as the old devices are degraded and _WaitForSync
3699 # does a combined result over all disks, so we don't check its
3701 self.processor.LogStep(5, steps_total, "sync devices")
3702 _WaitForSync(cfg, instance, unlock=True)
3704 # so check manually all the devices
3705 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3706 cfg.SetDiskID(dev, instance.primary_node)
3707 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3709 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3711 # Step: remove old storage
3712 self.processor.LogStep(6, steps_total, "removing old storage")
3713 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3714 info("remove logical volumes for %s" % name)
3716 cfg.SetDiskID(lv, tgt_node)
3717 if not rpc.call_blockdev_remove(tgt_node, lv):
3718 warning("Can't remove old LV", "manually remove unused LVs")
3721 def _ExecD8Secondary(self, feedback_fn):
3722 """Replace the secondary node for drbd8.
3724 The algorithm for replace is quite complicated:
3725 - for all disks of the instance:
3726 - create new LVs on the new node with same names
3727 - shutdown the drbd device on the old secondary
3728 - disconnect the drbd network on the primary
3729 - create the drbd device on the new secondary
3730 - network attach the drbd on the primary, using an artifice:
3731 the drbd code for Attach() will connect to the network if it
3732 finds a device which is connected to the good local disks but
3734 - wait for sync across all devices
3735 - remove all disks from the old secondary
3737 Failures are not very well handled.
3741 warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3742 instance = self.instance
3744 vgname = self.cfg.GetVGName()
3747 old_node = self.tgt_node
3748 new_node = self.new_node
3749 pri_node = instance.primary_node
3751 # Step: check device activation
3752 self.processor.LogStep(1, steps_total, "check device existence")
3753 info("checking volume groups")
3754 my_vg = cfg.GetVGName()
3755 results = rpc.call_vg_list([pri_node, new_node])
3757 raise errors.OpExecError("Can't list volume groups on the nodes")
3758 for node in pri_node, new_node:
3759 res = results.get(node, False)
3760 if not res or my_vg not in res:
3761 raise errors.OpExecError("Volume group '%s' not found on %s" %
3763 for dev in instance.disks:
3764 if not dev.iv_name in self.op.disks:
3766 info("checking %s on %s" % (dev.iv_name, pri_node))
3767 cfg.SetDiskID(dev, pri_node)
3768 if not rpc.call_blockdev_find(pri_node, dev):
3769 raise errors.OpExecError("Can't find device %s on node %s" %
3770 (dev.iv_name, pri_node))
3772 # Step: check other node consistency
3773 self.processor.LogStep(2, steps_total, "check peer consistency")
3774 for dev in instance.disks:
3775 if not dev.iv_name in self.op.disks:
3777 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3778 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3779 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3780 " unsafe to replace the secondary" %
3783 # Step: create new storage
3784 self.processor.LogStep(3, steps_total, "allocate new storage")
3785 for dev in instance.disks:
3787 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3788 # since we *always* want to create this LV, we use the
3789 # _Create...OnPrimary (which forces the creation), even if we
3790 # are talking about the secondary node
3791 for new_lv in dev.children:
3792 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3793 _GetInstanceInfoText(instance)):
3794 raise errors.OpExecError("Failed to create new LV named '%s' on"
3796 (new_lv.logical_id[1], new_node))
3798 iv_names[dev.iv_name] = (dev, dev.children)
3800 self.processor.LogStep(4, steps_total, "changing drbd configuration")
3801 for dev in instance.disks:
3803 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3804 # create new devices on new_node
3805 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3806 logical_id=(pri_node, new_node,
3808 children=dev.children)
3809 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3811 _GetInstanceInfoText(instance)):
3812 raise errors.OpExecError("Failed to create new DRBD on"
3813 " node '%s'" % new_node)
3815 for dev in instance.disks:
3816 # we have new devices, shutdown the drbd on the old secondary
3817 info("shutting down drbd for %s on old node" % dev.iv_name)
3818 cfg.SetDiskID(dev, old_node)
3819 if not rpc.call_blockdev_shutdown(old_node, dev):
3820 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3821 "Please cleanup this device manuall as soon as possible")
3823 # we have new storage, we 'rename' the network on the primary
3824 info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3825 cfg.SetDiskID(dev, pri_node)
3826 # rename to the ip of the new node
3827 new_uid = list(dev.physical_id)
3828 new_uid[2] = self.remote_node_info.secondary_ip
3829 rlist = [(dev, tuple(new_uid))]
3830 if not rpc.call_blockdev_rename(pri_node, rlist):
3831 raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3832 " %s from %s to %s" %
3833 (dev.iv_name, pri_node, old_node, new_node))
3834 dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3835 cfg.SetDiskID(dev, pri_node)
3836 cfg.Update(instance)
3839 # this can fail as the old devices are degraded and _WaitForSync
3840 # does a combined result over all disks, so we don't check its
3842 self.processor.LogStep(5, steps_total, "sync devices")
3843 _WaitForSync(cfg, instance, unlock=True)
3845 # so check manually all the devices
3846 for name, (dev, old_lvs) in iv_names.iteritems():
3847 cfg.SetDiskID(dev, pri_node)
3848 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3850 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3852 self.processor.LogStep(6, steps_total, "removing old storage")
3853 for name, (dev, old_lvs) in iv_names.iteritems():
3854 info("remove logical volumes for %s" % name)
3856 cfg.SetDiskID(lv, old_node)
3857 if not rpc.call_blockdev_remove(old_node, lv):
3858 warning("Can't remove LV on old secondary",
3859 "Cleanup stale volumes by hand")
3861 def Exec(self, feedback_fn):
3862 """Execute disk replacement.
3864 This dispatches the disk replacement to the appropriate handler.
3867 instance = self.instance
3868 if instance.disk_template == constants.DT_REMOTE_RAID1:
3870 elif instance.disk_template == constants.DT_DRBD8:
3871 if self.op.remote_node is None:
3872 fn = self._ExecD8DiskOnly
3874 fn = self._ExecD8Secondary
3876 raise errors.ProgrammerError("Unhandled disk replacement case")
3877 return fn(feedback_fn)
3880 class LUQueryInstanceData(NoHooksLU):
3881 """Query runtime instance data.
3884 _OP_REQP = ["instances"]
3886 def CheckPrereq(self):
3887 """Check prerequisites.
3889 This only checks the optional instance list against the existing names.
3892 if not isinstance(self.op.instances, list):
3893 raise errors.OpPrereqError("Invalid argument type 'instances'")
3894 if self.op.instances:
3895 self.wanted_instances = []
3896 names = self.op.instances
3898 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3899 if instance is None:
3900 raise errors.OpPrereqError("No such instance name '%s'" % name)
3901 self.wanted_instances.append(instance)
3903 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3904 in self.cfg.GetInstanceList()]
3908 def _ComputeDiskStatus(self, instance, snode, dev):
3909 """Compute block device status.
3912 self.cfg.SetDiskID(dev, instance.primary_node)
3913 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3914 if dev.dev_type in constants.LDS_DRBD:
3915 # we change the snode then (otherwise we use the one passed in)
3916 if dev.logical_id[0] == instance.primary_node:
3917 snode = dev.logical_id[1]
3919 snode = dev.logical_id[0]
3922 self.cfg.SetDiskID(dev, snode)
3923 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3928 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3929 for child in dev.children]
3934 "iv_name": dev.iv_name,
3935 "dev_type": dev.dev_type,
3936 "logical_id": dev.logical_id,
3937 "physical_id": dev.physical_id,
3938 "pstatus": dev_pstatus,
3939 "sstatus": dev_sstatus,
3940 "children": dev_children,
3945 def Exec(self, feedback_fn):
3946 """Gather and return data"""
3948 for instance in self.wanted_instances:
3949 remote_info = rpc.call_instance_info(instance.primary_node,
3951 if remote_info and "state" in remote_info:
3954 remote_state = "down"
3955 if instance.status == "down":
3956 config_state = "down"
3960 disks = [self._ComputeDiskStatus(instance, None, device)
3961 for device in instance.disks]
3964 "name": instance.name,
3965 "config_state": config_state,
3966 "run_state": remote_state,
3967 "pnode": instance.primary_node,
3968 "snodes": instance.secondary_nodes,
3970 "memory": instance.memory,
3971 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3973 "vcpus": instance.vcpus,
3976 result[instance.name] = idict
3981 class LUSetInstanceParms(LogicalUnit):
3982 """Modifies an instances's parameters.
3985 HPATH = "instance-modify"
3986 HTYPE = constants.HTYPE_INSTANCE
3987 _OP_REQP = ["instance_name"]
3989 def BuildHooksEnv(self):
3992 This runs on the master, primary and secondaries.
3997 args['memory'] = self.mem
3999 args['vcpus'] = self.vcpus
4000 if self.do_ip or self.do_bridge:
4004 ip = self.instance.nics[0].ip
4006 bridge = self.bridge
4008 bridge = self.instance.nics[0].bridge
4009 args['nics'] = [(ip, bridge)]
4010 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4011 nl = [self.sstore.GetMasterNode(),
4012 self.instance.primary_node] + list(self.instance.secondary_nodes)
4015 def CheckPrereq(self):
4016 """Check prerequisites.
4018 This only checks the instance list against the existing names.
4021 self.mem = getattr(self.op, "mem", None)
4022 self.vcpus = getattr(self.op, "vcpus", None)
4023 self.ip = getattr(self.op, "ip", None)
4024 self.bridge = getattr(self.op, "bridge", None)
4025 if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4026 raise errors.OpPrereqError("No changes submitted")
4027 if self.mem is not None:
4029 self.mem = int(self.mem)
4030 except ValueError, err:
4031 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4032 if self.vcpus is not None:
4034 self.vcpus = int(self.vcpus)
4035 except ValueError, err:
4036 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4037 if self.ip is not None:
4039 if self.ip.lower() == "none":
4042 if not utils.IsValidIP(self.ip):
4043 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4046 self.do_bridge = (self.bridge is not None)
4048 instance = self.cfg.GetInstanceInfo(
4049 self.cfg.ExpandInstanceName(self.op.instance_name))
4050 if instance is None:
4051 raise errors.OpPrereqError("No such instance name '%s'" %
4052 self.op.instance_name)
4053 self.op.instance_name = instance.name
4054 self.instance = instance
4057 def Exec(self, feedback_fn):
4058 """Modifies an instance.
4060 All parameters take effect only at the next restart of the instance.
4063 instance = self.instance
4065 instance.memory = self.mem
4066 result.append(("mem", self.mem))
4068 instance.vcpus = self.vcpus
4069 result.append(("vcpus", self.vcpus))
4071 instance.nics[0].ip = self.ip
4072 result.append(("ip", self.ip))
4074 instance.nics[0].bridge = self.bridge
4075 result.append(("bridge", self.bridge))
4077 self.cfg.AddInstance(instance)
4082 class LUQueryExports(NoHooksLU):
4083 """Query the exports list
4088 def CheckPrereq(self):
4089 """Check that the nodelist contains only existing nodes.
4092 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4094 def Exec(self, feedback_fn):
4095 """Compute the list of all the exported system images.
4098 a dictionary with the structure node->(export-list)
4099 where export-list is a list of the instances exported on
4103 return rpc.call_export_list(self.nodes)
4106 class LUExportInstance(LogicalUnit):
4107 """Export an instance to an image in the cluster.
4110 HPATH = "instance-export"
4111 HTYPE = constants.HTYPE_INSTANCE
4112 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4114 def BuildHooksEnv(self):
4117 This will run on the master, primary node and target node.
4121 "EXPORT_NODE": self.op.target_node,
4122 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4124 env.update(_BuildInstanceHookEnvByObject(self.instance))
4125 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4126 self.op.target_node]
4129 def CheckPrereq(self):
4130 """Check prerequisites.
4132 This checks that the instance name is a valid one.
4135 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4136 self.instance = self.cfg.GetInstanceInfo(instance_name)
4137 if self.instance is None:
4138 raise errors.OpPrereqError("Instance '%s' not found" %
4139 self.op.instance_name)
4142 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4143 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4145 if self.dst_node is None:
4146 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4147 self.op.target_node)
4148 self.op.target_node = self.dst_node.name
4150 def Exec(self, feedback_fn):
4151 """Export an instance to an image in the cluster.
4154 instance = self.instance
4155 dst_node = self.dst_node
4156 src_node = instance.primary_node
4157 # shutdown the instance, unless requested not to do so
4158 if self.op.shutdown:
4159 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4160 self.processor.ChainOpCode(op)
4162 vgname = self.cfg.GetVGName()
4167 for disk in instance.disks:
4168 if disk.iv_name == "sda":
4169 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4170 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4172 if not new_dev_name:
4173 logger.Error("could not snapshot block device %s on node %s" %
4174 (disk.logical_id[1], src_node))
4176 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4177 logical_id=(vgname, new_dev_name),
4178 physical_id=(vgname, new_dev_name),
4179 iv_name=disk.iv_name)
4180 snap_disks.append(new_dev)
4183 if self.op.shutdown:
4184 op = opcodes.OpStartupInstance(instance_name=instance.name,
4186 self.processor.ChainOpCode(op)
4188 # TODO: check for size
4190 for dev in snap_disks:
4191 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4193 logger.Error("could not export block device %s from node"
4195 (dev.logical_id[1], src_node, dst_node.name))
4196 if not rpc.call_blockdev_remove(src_node, dev):
4197 logger.Error("could not remove snapshot block device %s from"
4198 " node %s" % (dev.logical_id[1], src_node))
4200 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4201 logger.Error("could not finalize export for instance %s on node %s" %
4202 (instance.name, dst_node.name))
4204 nodelist = self.cfg.GetNodeList()
4205 nodelist.remove(dst_node.name)
4207 # on one-node clusters nodelist will be empty after the removal
4208 # if we proceed the backup would be removed because OpQueryExports
4209 # substitutes an empty list with the full cluster node list.
4211 op = opcodes.OpQueryExports(nodes=nodelist)
4212 exportlist = self.processor.ChainOpCode(op)
4213 for node in exportlist:
4214 if instance.name in exportlist[node]:
4215 if not rpc.call_export_remove(node, instance.name):
4216 logger.Error("could not remove older export for instance %s"
4217 " on node %s" % (instance.name, node))
4220 class TagsLU(NoHooksLU):
4223 This is an abstract class which is the parent of all the other tags LUs.
4226 def CheckPrereq(self):
4227 """Check prerequisites.
4230 if self.op.kind == constants.TAG_CLUSTER:
4231 self.target = self.cfg.GetClusterInfo()
4232 elif self.op.kind == constants.TAG_NODE:
4233 name = self.cfg.ExpandNodeName(self.op.name)
4235 raise errors.OpPrereqError("Invalid node name (%s)" %
4238 self.target = self.cfg.GetNodeInfo(name)
4239 elif self.op.kind == constants.TAG_INSTANCE:
4240 name = self.cfg.ExpandInstanceName(self.op.name)
4242 raise errors.OpPrereqError("Invalid instance name (%s)" %
4245 self.target = self.cfg.GetInstanceInfo(name)
4247 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4251 class LUGetTags(TagsLU):
4252 """Returns the tags of a given object.
4255 _OP_REQP = ["kind", "name"]
4257 def Exec(self, feedback_fn):
4258 """Returns the tag list.
4261 return self.target.GetTags()
4264 class LUSearchTags(NoHooksLU):
4265 """Searches the tags for a given pattern.
4268 _OP_REQP = ["pattern"]
4270 def CheckPrereq(self):
4271 """Check prerequisites.
4273 This checks the pattern passed for validity by compiling it.
4277 self.re = re.compile(self.op.pattern)
4278 except re.error, err:
4279 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4280 (self.op.pattern, err))
4282 def Exec(self, feedback_fn):
4283 """Returns the tag list.
4287 tgts = [("/cluster", cfg.GetClusterInfo())]
4288 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4289 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4290 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4291 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4293 for path, target in tgts:
4294 for tag in target.GetTags():
4295 if self.re.search(tag):
4296 results.append((path, tag))
4300 class LUAddTags(TagsLU):
4301 """Sets a tag on a given object.
4304 _OP_REQP = ["kind", "name", "tags"]
4306 def CheckPrereq(self):
4307 """Check prerequisites.
4309 This checks the type and length of the tag name and value.
4312 TagsLU.CheckPrereq(self)
4313 for tag in self.op.tags:
4314 objects.TaggableObject.ValidateTag(tag)
4316 def Exec(self, feedback_fn):
4321 for tag in self.op.tags:
4322 self.target.AddTag(tag)
4323 except errors.TagError, err:
4324 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4326 self.cfg.Update(self.target)
4327 except errors.ConfigurationError:
4328 raise errors.OpRetryError("There has been a modification to the"
4329 " config file and the operation has been"
4330 " aborted. Please retry.")
4333 class LUDelTags(TagsLU):
4334 """Delete a list of tags from a given object.
4337 _OP_REQP = ["kind", "name", "tags"]
4339 def CheckPrereq(self):
4340 """Check prerequisites.
4342 This checks that we have the given tag.
4345 TagsLU.CheckPrereq(self)
4346 for tag in self.op.tags:
4347 objects.TaggableObject.ValidateTag(tag)
4348 del_tags = frozenset(self.op.tags)
4349 cur_tags = self.target.GetTags()
4350 if not del_tags <= cur_tags:
4351 diff_tags = del_tags - cur_tags
4352 diff_names = ["'%s'" % tag for tag in diff_tags]
4354 raise errors.OpPrereqError("Tag(s) %s not found" %
4355 (",".join(diff_names)))
4357 def Exec(self, feedback_fn):
4358 """Remove the tag from the object.
4361 for tag in self.op.tags:
4362 self.target.RemoveTag(tag)
4364 self.cfg.Update(self.target)
4365 except errors.ConfigurationError:
4366 raise errors.OpRetryError("There has been a modification to the"
4367 " config file and the operation has been"
4368 " aborted. Please retry.")