4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge, mac) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
279 env["INSTANCE_NIC_COUNT"] = nic_count
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285 """Builds instance related env variables for hooks from an object.
288 instance: objects.Instance object of instance
289 override: dict of values to override
292 'name': instance.name,
293 'primary_node': instance.primary_node,
294 'secondary_nodes': instance.secondary_nodes,
295 'os_type': instance.os,
296 'status': instance.os,
297 'memory': instance.memory,
298 'vcpus': instance.vcpus,
299 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302 args.update(override)
303 return _BuildInstanceHookEnv(**args)
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307 """Ensure a node has a correct known_hosts entry.
310 fullnode - Fully qualified domain name of host. (str)
311 ip - IPv4 address of host (str)
312 pubkey - the public key of the cluster
315 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327 logger.Debug('read %s' % (repr(rawline),))
329 parts = rawline.rstrip('\r\n').split()
331 # Ignore unwanted lines
332 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333 fields = parts[0].split(',')
338 for spec in [ ip, fullnode ]:
339 if spec not in fields:
344 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345 if haveall and key == pubkey:
347 save_lines.append(rawline)
348 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
351 if havesome and (not haveall or key != pubkey):
353 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
356 save_lines.append(rawline)
359 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
363 save_lines = save_lines + add_lines
365 # Write a new file and replace old.
366 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368 newfile = os.fdopen(fd, 'w')
370 newfile.write(''.join(save_lines))
373 logger.Debug("Wrote new known_hosts.")
374 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
377 # Simply appending a new line will do the trick.
379 for add in add_lines:
385 def _HasValidVG(vglist, vgname):
386 """Checks if the volume group list is valid.
388 A non-None return value means there's an error, and the return value
389 is the error message.
392 vgsize = vglist.get(vgname, None)
394 return "volume group '%s' missing" % vgname
396 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
401 def _InitSSHSetup(node):
402 """Setup the SSH configuration for the cluster.
405 This generates a dsa keypair for root, adds the pub key to the
406 permitted hosts and adds the hostkey to its own known hosts.
409 node: the name of this host as a fqdn
412 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414 for name in priv_key, pub_key:
415 if os.path.exists(name):
416 utils.CreateBackup(name)
417 utils.RemoveFile(name)
419 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
423 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
426 f = open(pub_key, 'r')
428 utils.AddAuthorizedKey(auth_keys, f.read(8192))
433 def _InitGanetiServerSetup(ss):
434 """Setup the necessary configuration for the initial node daemon.
436 This creates the nodepass file containing the shared password for
437 the cluster and also generates the SSL certificate.
440 # Create pseudo random password
441 randpass = sha.new(os.urandom(64)).hexdigest()
442 # and write it into sstore
443 ss.SetKey(ss.SS_NODED_PASS, randpass)
445 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446 "-days", str(365*5), "-nodes", "-x509",
447 "-keyout", constants.SSL_CERT_FILE,
448 "-out", constants.SSL_CERT_FILE, "-batch"])
450 raise errors.OpExecError("could not generate server ssl cert, command"
451 " %s had exitcode %s and error message %s" %
452 (result.cmd, result.exit_code, result.output))
454 os.chmod(constants.SSL_CERT_FILE, 0400)
456 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
459 raise errors.OpExecError("Could not start the node daemon, command %s"
460 " had exitcode %s and error %s" %
461 (result.cmd, result.exit_code, result.output))
464 def _CheckInstanceBridgesExist(instance):
465 """Check that the brigdes needed by an instance exist.
468 # check bridges existance
469 brlist = [nic.bridge for nic in instance.nics]
470 if not rpc.call_bridges_exist(instance.primary_node, brlist):
471 raise errors.OpPrereqError("one or more target bridges %s does not"
472 " exist on destination node '%s'" %
473 (brlist, instance.primary_node))
476 class LUInitCluster(LogicalUnit):
477 """Initialise the cluster.
480 HPATH = "cluster-init"
481 HTYPE = constants.HTYPE_CLUSTER
482 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483 "def_bridge", "master_netdev"]
486 def BuildHooksEnv(self):
489 Notes: Since we don't require a cluster, we must manually add
490 ourselves in the post-run node list.
493 env = {"OP_TARGET": self.op.cluster_name}
494 return env, [], [self.hostname.name]
496 def CheckPrereq(self):
497 """Verify that the passed name is a valid one.
500 if config.ConfigWriter.IsCluster():
501 raise errors.OpPrereqError("Cluster is already initialised")
503 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504 if not os.path.exists(constants.VNC_PASSWORD_FILE):
505 raise errors.OpPrereqError("Please prepare the cluster VNC"
507 constants.VNC_PASSWORD_FILE)
509 self.hostname = hostname = utils.HostInfo()
511 if hostname.ip.startswith("127."):
512 raise errors.OpPrereqError("This host's IP resolves to the private"
513 " range (%s). Please fix DNS or /etc/hosts." %
516 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
518 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519 constants.DEFAULT_NODED_PORT):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 secondary_ip = getattr(self.op, "secondary_ip", None)
526 if secondary_ip and not utils.IsValidIP(secondary_ip):
527 raise errors.OpPrereqError("Invalid secondary ip given")
529 secondary_ip != hostname.ip and
530 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531 constants.DEFAULT_NODED_PORT))):
532 raise errors.OpPrereqError("You gave %s as secondary IP,"
533 " but it does not belong to this host." %
535 self.secondary_ip = secondary_ip
537 # checks presence of the volume group given
538 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
541 raise errors.OpPrereqError("Error: %s" % vgstatus)
543 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
545 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
548 if self.op.hypervisor_type not in constants.HYPER_TYPES:
549 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550 self.op.hypervisor_type)
552 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
554 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555 (self.op.master_netdev,
556 result.output.strip()))
558 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560 raise errors.OpPrereqError("Init.d script '%s' missing or not"
561 " executable." % constants.NODE_INITD_SCRIPT)
563 def Exec(self, feedback_fn):
564 """Initialize the cluster.
567 clustername = self.clustername
568 hostname = self.hostname
570 # set up the simple store
571 self.sstore = ss = ssconf.SimpleStore()
572 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
578 # set up the inter-node password and certificate
579 _InitGanetiServerSetup(ss)
581 # start the master ip
582 rpc.call_node_start_master(hostname.name)
584 # set up ssh config and /etc/hosts
585 f = open(constants.SSH_HOST_RSA_PUB, 'r')
590 sshkey = sshline.split(" ")[1]
592 _AddHostToEtcHosts(hostname.name)
594 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
596 _InitSSHSetup(hostname.name)
598 # init of cluster config file
599 self.cfg = cfgw = config.ConfigWriter()
600 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601 sshkey, self.op.mac_prefix,
602 self.op.vg_name, self.op.def_bridge)
605 class LUDestroyCluster(NoHooksLU):
606 """Logical unit for destroying the cluster.
611 def CheckPrereq(self):
612 """Check prerequisites.
614 This checks whether the cluster is empty.
616 Any errors are signalled by raising errors.OpPrereqError.
619 master = self.sstore.GetMasterNode()
621 nodelist = self.cfg.GetNodeList()
622 if len(nodelist) != 1 or nodelist[0] != master:
623 raise errors.OpPrereqError("There are still %d node(s) in"
624 " this cluster." % (len(nodelist) - 1))
625 instancelist = self.cfg.GetInstanceList()
627 raise errors.OpPrereqError("There are still %d instance(s) in"
628 " this cluster." % len(instancelist))
630 def Exec(self, feedback_fn):
631 """Destroys the cluster.
634 master = self.sstore.GetMasterNode()
635 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636 utils.CreateBackup(priv_key)
637 utils.CreateBackup(pub_key)
638 rpc.call_node_leave_cluster(master)
641 class LUVerifyCluster(NoHooksLU):
642 """Verifies the cluster status.
647 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648 remote_version, feedback_fn):
649 """Run multiple tests against a node.
652 - compares ganeti version
653 - checks vg existance and size > 20G
654 - checks config file checksum
655 - checks ssh to other nodes
658 node: name of the node to check
659 file_list: required list of files
660 local_cksum: dictionary of local files and their checksums
663 # compares ganeti version
664 local_version = constants.PROTOCOL_VERSION
665 if not remote_version:
666 feedback_fn(" - ERROR: connection to %s failed" % (node))
669 if local_version != remote_version:
670 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
671 (local_version, node, remote_version))
674 # checks vg existance and size > 20G
678 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
682 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
684 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
687 # checks config file checksum
690 if 'filelist' not in node_result:
692 feedback_fn(" - ERROR: node hasn't returned file checksum data")
694 remote_cksum = node_result['filelist']
695 for file_name in file_list:
696 if file_name not in remote_cksum:
698 feedback_fn(" - ERROR: file '%s' missing" % file_name)
699 elif remote_cksum[file_name] != local_cksum[file_name]:
701 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
703 if 'nodelist' not in node_result:
705 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
707 if node_result['nodelist']:
709 for node in node_result['nodelist']:
710 feedback_fn(" - ERROR: communication with node '%s': %s" %
711 (node, node_result['nodelist'][node]))
712 hyp_result = node_result.get('hypervisor', None)
713 if hyp_result is not None:
714 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
717 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718 """Verify an instance.
720 This function checks to see if the required block devices are
721 available on the instance's node.
726 instancelist = self.cfg.GetInstanceList()
727 if not instance in instancelist:
728 feedback_fn(" - ERROR: instance %s not in instance list %s" %
729 (instance, instancelist))
732 instanceconfig = self.cfg.GetInstanceInfo(instance)
733 node_current = instanceconfig.primary_node
736 instanceconfig.MapLVsByNode(node_vol_should)
738 for node in node_vol_should:
739 for volume in node_vol_should[node]:
740 if node not in node_vol_is or volume not in node_vol_is[node]:
741 feedback_fn(" - ERROR: volume %s missing on node %s" %
745 if not instanceconfig.status == 'down':
746 if not instance in node_instance[node_current]:
747 feedback_fn(" - ERROR: instance %s not running on node %s" %
748 (instance, node_current))
751 for node in node_instance:
752 if (not node == node_current):
753 if instance in node_instance[node]:
754 feedback_fn(" - ERROR: instance %s should not run on node %s" %
760 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761 """Verify if there are any unknown volumes in the cluster.
763 The .os, .swap and backup volumes are ignored. All other volumes are
769 for node in node_vol_is:
770 for volume in node_vol_is[node]:
771 if node not in node_vol_should or volume not in node_vol_should[node]:
772 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
777 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778 """Verify the list of running instances.
780 This checks what instances are running but unknown to the cluster.
784 for node in node_instance:
785 for runninginstance in node_instance[node]:
786 if runninginstance not in instancelist:
787 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
788 (runninginstance, node))
792 def CheckPrereq(self):
793 """Check prerequisites.
795 This has no prerequisites.
800 def Exec(self, feedback_fn):
801 """Verify integrity of cluster, performing various test on nodes.
805 feedback_fn("* Verifying global settings")
806 for msg in self.cfg.VerifyConfig():
807 feedback_fn(" - ERROR: %s" % msg)
809 vg_name = self.cfg.GetVGName()
810 nodelist = utils.NiceSort(self.cfg.GetNodeList())
811 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815 # FIXME: verify OS list
817 file_names = list(self.sstore.GetFileList())
818 file_names.append(constants.SSL_CERT_FILE)
819 file_names.append(constants.CLUSTER_CONF_FILE)
820 local_checksums = utils.FingerprintFiles(file_names)
822 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824 all_instanceinfo = rpc.call_instance_list(nodelist)
825 all_vglist = rpc.call_vg_list(nodelist)
826 node_verify_param = {
827 'filelist': file_names,
828 'nodelist': nodelist,
831 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832 all_rversion = rpc.call_version(nodelist)
834 for node in nodelist:
835 feedback_fn("* Verifying node %s" % node)
836 result = self._VerifyNode(node, file_names, local_checksums,
837 all_vglist[node], all_nvinfo[node],
838 all_rversion[node], feedback_fn)
842 volumeinfo = all_volumeinfo[node]
844 if isinstance(volumeinfo, basestring):
845 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
846 (node, volumeinfo[-400:].encode('string_escape')))
848 node_volume[node] = {}
849 elif not isinstance(volumeinfo, dict):
850 feedback_fn(" - ERROR: connection to %s failed" % (node,))
854 node_volume[node] = volumeinfo
857 nodeinstance = all_instanceinfo[node]
858 if type(nodeinstance) != list:
859 feedback_fn(" - ERROR: connection to %s failed" % (node,))
863 node_instance[node] = nodeinstance
867 for instance in instancelist:
868 feedback_fn("* Verifying instance %s" % instance)
869 result = self._VerifyInstance(instance, node_volume, node_instance,
873 inst_config = self.cfg.GetInstanceInfo(instance)
875 inst_config.MapLVsByNode(node_vol_should)
877 feedback_fn("* Verifying orphan volumes")
878 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
882 feedback_fn("* Verifying remaining instances")
883 result = self._VerifyOrphanInstances(instancelist, node_instance,
890 class LUVerifyDisks(NoHooksLU):
891 """Verifies the cluster disks status.
896 def CheckPrereq(self):
897 """Check prerequisites.
899 This has no prerequisites.
904 def Exec(self, feedback_fn):
905 """Verify integrity of cluster disks.
908 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
910 vg_name = self.cfg.GetVGName()
911 nodes = utils.NiceSort(self.cfg.GetNodeList())
912 instances = [self.cfg.GetInstanceInfo(name)
913 for name in self.cfg.GetInstanceList()]
916 for inst in instances:
918 if (inst.status != "up" or
919 inst.disk_template not in constants.DTS_NET_MIRROR):
921 inst.MapLVsByNode(inst_lvs)
922 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923 for node, vol_list in inst_lvs.iteritems():
925 nv_dict[(node, vol)] = inst
930 node_lvs = rpc.call_volume_list(nodes, vg_name)
937 if isinstance(lvs, basestring):
938 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
940 elif not isinstance(lvs, dict):
941 logger.Info("connection to node %s failed or invalid data returned" %
943 res_nodes.append(node)
946 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947 inst = nv_dict.pop((node, lv_name), None)
948 if (not lv_online and inst is not None
949 and inst.name not in res_instances):
950 res_instances.append(inst.name)
952 # any leftover items in nv_dict are missing LVs, let's arrange the
954 for key, inst in nv_dict.iteritems():
955 if inst.name not in res_missing:
956 res_missing[inst.name] = []
957 res_missing[inst.name].append(key)
962 class LURenameCluster(LogicalUnit):
963 """Rename the cluster.
966 HPATH = "cluster-rename"
967 HTYPE = constants.HTYPE_CLUSTER
970 def BuildHooksEnv(self):
975 "OP_TARGET": self.op.sstore.GetClusterName(),
976 "NEW_NAME": self.op.name,
978 mn = self.sstore.GetMasterNode()
979 return env, [mn], [mn]
981 def CheckPrereq(self):
982 """Verify that the passed name is a valid one.
985 hostname = utils.HostInfo(self.op.name)
987 new_name = hostname.name
988 self.ip = new_ip = hostname.ip
989 old_name = self.sstore.GetClusterName()
990 old_ip = self.sstore.GetMasterIP()
991 if new_name == old_name and new_ip == old_ip:
992 raise errors.OpPrereqError("Neither the name nor the IP address of the"
993 " cluster has changed")
995 result = utils.RunCmd(["fping", "-q", new_ip])
996 if not result.failed:
997 raise errors.OpPrereqError("The given cluster IP address (%s) is"
998 " reachable on the network. Aborting." %
1001 self.op.name = new_name
1003 def Exec(self, feedback_fn):
1004 """Rename the cluster.
1007 clustername = self.op.name
1011 # shutdown the master IP
1012 master = ss.GetMasterNode()
1013 if not rpc.call_node_stop_master(master):
1014 raise errors.OpExecError("Could not disable the master role")
1018 ss.SetKey(ss.SS_MASTER_IP, ip)
1019 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1021 # Distribute updated ss config to all nodes
1022 myself = self.cfg.GetNodeInfo(master)
1023 dist_nodes = self.cfg.GetNodeList()
1024 if myself.name in dist_nodes:
1025 dist_nodes.remove(myself.name)
1027 logger.Debug("Copying updated ssconf data to all nodes")
1028 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029 fname = ss.KeyToFilename(keyname)
1030 result = rpc.call_upload_file(dist_nodes, fname)
1031 for to_node in dist_nodes:
1032 if not result[to_node]:
1033 logger.Error("copy of file %s to node %s failed" %
1036 if not rpc.call_node_start_master(master):
1037 logger.Error("Could not re-enable the master role on the master,"
1038 " please restart manually.")
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042 """Sleep and poll for an instance's disk to sync.
1045 if not instance.disks:
1049 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1051 node = instance.primary_node
1053 for dev in instance.disks:
1054 cfgw.SetDiskID(dev, node)
1060 cumul_degraded = False
1061 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1063 proc.LogWarning("Can't get any data from node %s" % node)
1066 raise errors.RemoteError("Can't contact node %s for mirror data,"
1067 " aborting." % node)
1071 for i in range(len(rstats)):
1074 proc.LogWarning("Can't compute data for node %s/%s" %
1075 (node, instance.disks[i].iv_name))
1077 # we ignore the ldisk parameter
1078 perc_done, est_time, is_degraded, _ = mstat
1079 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080 if perc_done is not None:
1082 if est_time is not None:
1083 rem_time = "%d estimated seconds remaining" % est_time
1086 rem_time = "no time estimate"
1087 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088 (instance.disks[i].iv_name, perc_done, rem_time))
1095 time.sleep(min(60, max_time))
1101 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102 return not cumul_degraded
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106 """Check that mirrors are not degraded.
1108 The ldisk parameter, if True, will change the test from the
1109 is_degraded attribute (which represents overall non-ok status for
1110 the device(s)) to the ldisk (representing the local storage status).
1113 cfgw.SetDiskID(dev, node)
1120 if on_primary or dev.AssembleOnSecondary():
1121 rstats = rpc.call_blockdev_find(node, dev)
1123 logger.ToStderr("Can't get any data from node %s" % node)
1126 result = result and (not rstats[idx])
1128 for child in dev.children:
1129 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1134 class LUDiagnoseOS(NoHooksLU):
1135 """Logical unit for OS diagnose/query.
1140 def CheckPrereq(self):
1141 """Check prerequisites.
1143 This always succeeds, since this is a pure query LU.
1148 def Exec(self, feedback_fn):
1149 """Compute the list of OSes.
1152 node_list = self.cfg.GetNodeList()
1153 node_data = rpc.call_os_diagnose(node_list)
1154 if node_data == False:
1155 raise errors.OpExecError("Can't gather the list of OSes")
1159 class LURemoveNode(LogicalUnit):
1160 """Logical unit for removing a node.
1163 HPATH = "node-remove"
1164 HTYPE = constants.HTYPE_NODE
1165 _OP_REQP = ["node_name"]
1167 def BuildHooksEnv(self):
1170 This doesn't run on the target node in the pre phase as a failed
1171 node would not allows itself to run.
1175 "OP_TARGET": self.op.node_name,
1176 "NODE_NAME": self.op.node_name,
1178 all_nodes = self.cfg.GetNodeList()
1179 all_nodes.remove(self.op.node_name)
1180 return env, all_nodes, all_nodes
1182 def CheckPrereq(self):
1183 """Check prerequisites.
1186 - the node exists in the configuration
1187 - it does not have primary or secondary instances
1188 - it's not the master
1190 Any errors are signalled by raising errors.OpPrereqError.
1193 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1195 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1197 instance_list = self.cfg.GetInstanceList()
1199 masternode = self.sstore.GetMasterNode()
1200 if node.name == masternode:
1201 raise errors.OpPrereqError("Node is the master node,"
1202 " you need to failover first.")
1204 for instance_name in instance_list:
1205 instance = self.cfg.GetInstanceInfo(instance_name)
1206 if node.name == instance.primary_node:
1207 raise errors.OpPrereqError("Instance %s still running on the node,"
1208 " please remove first." % instance_name)
1209 if node.name in instance.secondary_nodes:
1210 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211 " please remove first." % instance_name)
1212 self.op.node_name = node.name
1215 def Exec(self, feedback_fn):
1216 """Removes the node from the cluster.
1220 logger.Info("stopping the node daemon and removing configs from node %s" %
1223 rpc.call_node_leave_cluster(node.name)
1225 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1227 logger.Info("Removing node %s from config" % node.name)
1229 self.cfg.RemoveNode(node.name)
1231 _RemoveHostFromEtcHosts(node.name)
1234 class LUQueryNodes(NoHooksLU):
1235 """Logical unit for querying nodes.
1238 _OP_REQP = ["output_fields", "names"]
1240 def CheckPrereq(self):
1241 """Check prerequisites.
1243 This checks that the fields required are valid output fields.
1246 self.dynamic_fields = frozenset(["dtotal", "dfree",
1247 "mtotal", "mnode", "mfree",
1250 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251 "pinst_list", "sinst_list",
1253 dynamic=self.dynamic_fields,
1254 selected=self.op.output_fields)
1256 self.wanted = _GetWantedNodes(self, self.op.names)
1258 def Exec(self, feedback_fn):
1259 """Computes the list of nodes and their attributes.
1262 nodenames = self.wanted
1263 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1265 # begin data gathering
1267 if self.dynamic_fields.intersection(self.op.output_fields):
1269 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270 for name in nodenames:
1271 nodeinfo = node_data.get(name, None)
1274 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279 "bootid": nodeinfo['bootid'],
1282 live_data[name] = {}
1284 live_data = dict.fromkeys(nodenames, {})
1286 node_to_primary = dict([(name, set()) for name in nodenames])
1287 node_to_secondary = dict([(name, set()) for name in nodenames])
1289 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290 "sinst_cnt", "sinst_list"))
1291 if inst_fields & frozenset(self.op.output_fields):
1292 instancelist = self.cfg.GetInstanceList()
1294 for instance_name in instancelist:
1295 inst = self.cfg.GetInstanceInfo(instance_name)
1296 if inst.primary_node in node_to_primary:
1297 node_to_primary[inst.primary_node].add(inst.name)
1298 for secnode in inst.secondary_nodes:
1299 if secnode in node_to_secondary:
1300 node_to_secondary[secnode].add(inst.name)
1302 # end data gathering
1305 for node in nodelist:
1307 for field in self.op.output_fields:
1310 elif field == "pinst_list":
1311 val = list(node_to_primary[node.name])
1312 elif field == "sinst_list":
1313 val = list(node_to_secondary[node.name])
1314 elif field == "pinst_cnt":
1315 val = len(node_to_primary[node.name])
1316 elif field == "sinst_cnt":
1317 val = len(node_to_secondary[node.name])
1318 elif field == "pip":
1319 val = node.primary_ip
1320 elif field == "sip":
1321 val = node.secondary_ip
1322 elif field in self.dynamic_fields:
1323 val = live_data[node.name].get(field, None)
1325 raise errors.ParameterError(field)
1326 node_output.append(val)
1327 output.append(node_output)
1332 class LUQueryNodeVolumes(NoHooksLU):
1333 """Logical unit for getting volumes on node(s).
1336 _OP_REQP = ["nodes", "output_fields"]
1338 def CheckPrereq(self):
1339 """Check prerequisites.
1341 This checks that the fields required are valid output fields.
1344 self.nodes = _GetWantedNodes(self, self.op.nodes)
1346 _CheckOutputFields(static=["node"],
1347 dynamic=["phys", "vg", "name", "size", "instance"],
1348 selected=self.op.output_fields)
1351 def Exec(self, feedback_fn):
1352 """Computes the list of nodes and their attributes.
1355 nodenames = self.nodes
1356 volumes = rpc.call_node_volumes(nodenames)
1358 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359 in self.cfg.GetInstanceList()]
1361 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1364 for node in nodenames:
1365 if node not in volumes or not volumes[node]:
1368 node_vols = volumes[node][:]
1369 node_vols.sort(key=lambda vol: vol['dev'])
1371 for vol in node_vols:
1373 for field in self.op.output_fields:
1376 elif field == "phys":
1380 elif field == "name":
1382 elif field == "size":
1383 val = int(float(vol['size']))
1384 elif field == "instance":
1386 if node not in lv_by_node[inst]:
1388 if vol['name'] in lv_by_node[inst][node]:
1394 raise errors.ParameterError(field)
1395 node_output.append(str(val))
1397 output.append(node_output)
1402 class LUAddNode(LogicalUnit):
1403 """Logical unit for adding node to the cluster.
1407 HTYPE = constants.HTYPE_NODE
1408 _OP_REQP = ["node_name"]
1410 def BuildHooksEnv(self):
1413 This will run on all nodes before, and on all nodes + the new node after.
1417 "OP_TARGET": self.op.node_name,
1418 "NODE_NAME": self.op.node_name,
1419 "NODE_PIP": self.op.primary_ip,
1420 "NODE_SIP": self.op.secondary_ip,
1422 nodes_0 = self.cfg.GetNodeList()
1423 nodes_1 = nodes_0 + [self.op.node_name, ]
1424 return env, nodes_0, nodes_1
1426 def CheckPrereq(self):
1427 """Check prerequisites.
1430 - the new node is not already in the config
1432 - its parameters (single/dual homed) matches the cluster
1434 Any errors are signalled by raising errors.OpPrereqError.
1437 node_name = self.op.node_name
1440 dns_data = utils.HostInfo(node_name)
1442 node = dns_data.name
1443 primary_ip = self.op.primary_ip = dns_data.ip
1444 secondary_ip = getattr(self.op, "secondary_ip", None)
1445 if secondary_ip is None:
1446 secondary_ip = primary_ip
1447 if not utils.IsValidIP(secondary_ip):
1448 raise errors.OpPrereqError("Invalid secondary IP given")
1449 self.op.secondary_ip = secondary_ip
1450 node_list = cfg.GetNodeList()
1451 if node in node_list:
1452 raise errors.OpPrereqError("Node %s is already in the configuration"
1455 for existing_node_name in node_list:
1456 existing_node = cfg.GetNodeInfo(existing_node_name)
1457 if (existing_node.primary_ip == primary_ip or
1458 existing_node.secondary_ip == primary_ip or
1459 existing_node.primary_ip == secondary_ip or
1460 existing_node.secondary_ip == secondary_ip):
1461 raise errors.OpPrereqError("New node ip address(es) conflict with"
1462 " existing node %s" % existing_node.name)
1464 # check that the type of the node (single versus dual homed) is the
1465 # same as for the master
1466 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467 master_singlehomed = myself.secondary_ip == myself.primary_ip
1468 newbie_singlehomed = secondary_ip == primary_ip
1469 if master_singlehomed != newbie_singlehomed:
1470 if master_singlehomed:
1471 raise errors.OpPrereqError("The master has no private ip but the"
1472 " new node has one")
1474 raise errors.OpPrereqError("The master has a private ip but the"
1475 " new node doesn't have one")
1477 # checks reachablity
1478 if not utils.TcpPing(utils.HostInfo().name,
1480 constants.DEFAULT_NODED_PORT):
1481 raise errors.OpPrereqError("Node not reachable by ping")
1483 if not newbie_singlehomed:
1484 # check reachability from my secondary ip to newbie's secondary ip
1485 if not utils.TcpPing(myself.secondary_ip,
1487 constants.DEFAULT_NODED_PORT):
1488 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489 " based ping to noded port")
1491 self.new_node = objects.Node(name=node,
1492 primary_ip=primary_ip,
1493 secondary_ip=secondary_ip)
1495 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498 constants.VNC_PASSWORD_FILE)
1500 def Exec(self, feedback_fn):
1501 """Adds the new node to the cluster.
1504 new_node = self.new_node
1505 node = new_node.name
1507 # set up inter-node password and certificate and restarts the node daemon
1508 gntpass = self.sstore.GetNodeDaemonPassword()
1509 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510 raise errors.OpExecError("ganeti password corruption detected")
1511 f = open(constants.SSL_CERT_FILE)
1513 gntpem = f.read(8192)
1516 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517 # so we use this to detect an invalid certificate; as long as the
1518 # cert doesn't contain this, the here-document will be correctly
1519 # parsed by the shell sequence below
1520 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522 if not gntpem.endswith("\n"):
1523 raise errors.OpExecError("PEM must end with newline")
1524 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1526 # and then connect with ssh to set password and start ganeti-noded
1527 # note that all the below variables are sanitized at this point,
1528 # either by being constants or by the checks above
1530 mycommand = ("umask 077 && "
1531 "echo '%s' > '%s' && "
1532 "cat > '%s' << '!EOF.' && \n"
1533 "%s!EOF.\n%s restart" %
1534 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535 constants.SSL_CERT_FILE, gntpem,
1536 constants.NODE_INITD_SCRIPT))
1538 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1540 raise errors.OpExecError("Remote command on node %s, error: %s,"
1542 (node, result.fail_reason, result.output))
1544 # check connectivity
1547 result = rpc.call_version([node])[node]
1549 if constants.PROTOCOL_VERSION == result:
1550 logger.Info("communication to node %s fine, sw version %s match" %
1553 raise errors.OpExecError("Version mismatch master version %s,"
1554 " node version %s" %
1555 (constants.PROTOCOL_VERSION, result))
1557 raise errors.OpExecError("Cannot get version from the new node")
1560 logger.Info("copy ssh key to node %s" % node)
1561 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1563 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1570 keyarray.append(f.read())
1574 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575 keyarray[3], keyarray[4], keyarray[5])
1578 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1580 # Add node to our /etc/hosts, and add key to known_hosts
1581 _AddHostToEtcHosts(new_node.name)
1583 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584 self.cfg.GetHostKey())
1586 if new_node.secondary_ip != new_node.primary_ip:
1587 if not rpc.call_node_tcp_ping(new_node.name,
1588 constants.LOCALHOST_IP_ADDRESS,
1589 new_node.secondary_ip,
1590 constants.DEFAULT_NODED_PORT,
1592 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593 " you gave (%s). Please fix and re-run this"
1594 " command." % new_node.secondary_ip)
1596 success, msg = ssh.VerifyNodeHostname(node)
1598 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599 " than the one the resolver gives: %s."
1600 " Please fix and re-run this command." %
1603 # Distribute updated /etc/hosts and known_hosts to all nodes,
1604 # including the node just added
1605 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606 dist_nodes = self.cfg.GetNodeList() + [node]
1607 if myself.name in dist_nodes:
1608 dist_nodes.remove(myself.name)
1610 logger.Debug("Copying hosts and known_hosts to all nodes")
1611 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612 result = rpc.call_upload_file(dist_nodes, fname)
1613 for to_node in dist_nodes:
1614 if not result[to_node]:
1615 logger.Error("copy of file %s to node %s failed" %
1618 to_copy = ss.GetFileList()
1619 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620 to_copy.append(constants.VNC_PASSWORD_FILE)
1621 for fname in to_copy:
1622 if not ssh.CopyFileToNode(node, fname):
1623 logger.Error("could not copy file %s to node %s" % (fname, node))
1625 logger.Info("adding node %s to cluster.conf" % node)
1626 self.cfg.AddNode(new_node)
1629 class LUMasterFailover(LogicalUnit):
1630 """Failover the master node to the current node.
1632 This is a special LU in that it must run on a non-master node.
1635 HPATH = "master-failover"
1636 HTYPE = constants.HTYPE_CLUSTER
1640 def BuildHooksEnv(self):
1643 This will run on the new master only in the pre phase, and on all
1644 the nodes in the post phase.
1648 "OP_TARGET": self.new_master,
1649 "NEW_MASTER": self.new_master,
1650 "OLD_MASTER": self.old_master,
1652 return env, [self.new_master], self.cfg.GetNodeList()
1654 def CheckPrereq(self):
1655 """Check prerequisites.
1657 This checks that we are not already the master.
1660 self.new_master = utils.HostInfo().name
1661 self.old_master = self.sstore.GetMasterNode()
1663 if self.old_master == self.new_master:
1664 raise errors.OpPrereqError("This commands must be run on the node"
1665 " where you want the new master to be."
1666 " %s is already the master" %
1669 def Exec(self, feedback_fn):
1670 """Failover the master node.
1672 This command, when run on a non-master node, will cause the current
1673 master to cease being master, and the non-master to become new
1677 #TODO: do not rely on gethostname returning the FQDN
1678 logger.Info("setting master to %s, old master: %s" %
1679 (self.new_master, self.old_master))
1681 if not rpc.call_node_stop_master(self.old_master):
1682 logger.Error("could disable the master role on the old master"
1683 " %s, please disable manually" % self.old_master)
1686 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689 logger.Error("could not distribute the new simple store master file"
1690 " to the other nodes, please check.")
1692 if not rpc.call_node_start_master(self.new_master):
1693 logger.Error("could not start the master role on the new master"
1694 " %s, please check" % self.new_master)
1695 feedback_fn("Error in activating the master IP on the new master,"
1696 " please fix manually.")
1700 class LUQueryClusterInfo(NoHooksLU):
1701 """Query cluster configuration.
1707 def CheckPrereq(self):
1708 """No prerequsites needed for this LU.
1713 def Exec(self, feedback_fn):
1714 """Return cluster config.
1718 "name": self.sstore.GetClusterName(),
1719 "software_version": constants.RELEASE_VERSION,
1720 "protocol_version": constants.PROTOCOL_VERSION,
1721 "config_version": constants.CONFIG_VERSION,
1722 "os_api_version": constants.OS_API_VERSION,
1723 "export_version": constants.EXPORT_VERSION,
1724 "master": self.sstore.GetMasterNode(),
1725 "architecture": (platform.architecture()[0], platform.machine()),
1731 class LUClusterCopyFile(NoHooksLU):
1732 """Copy file to cluster.
1735 _OP_REQP = ["nodes", "filename"]
1737 def CheckPrereq(self):
1738 """Check prerequisites.
1740 It should check that the named file exists and that the given list
1744 if not os.path.exists(self.op.filename):
1745 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1747 self.nodes = _GetWantedNodes(self, self.op.nodes)
1749 def Exec(self, feedback_fn):
1750 """Copy a file from master to some nodes.
1753 opts - class with options as members
1754 args - list containing a single element, the file name
1756 nodes - list containing the name of target nodes; if empty, all nodes
1759 filename = self.op.filename
1761 myname = utils.HostInfo().name
1763 for node in self.nodes:
1766 if not ssh.CopyFileToNode(node, filename):
1767 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1770 class LUDumpClusterConfig(NoHooksLU):
1771 """Return a text-representation of the cluster-config.
1776 def CheckPrereq(self):
1777 """No prerequisites.
1782 def Exec(self, feedback_fn):
1783 """Dump a representation of the cluster config to the standard output.
1786 return self.cfg.DumpConfig()
1789 class LURunClusterCommand(NoHooksLU):
1790 """Run a command on some nodes.
1793 _OP_REQP = ["command", "nodes"]
1795 def CheckPrereq(self):
1796 """Check prerequisites.
1798 It checks that the given list of nodes is valid.
1801 self.nodes = _GetWantedNodes(self, self.op.nodes)
1803 def Exec(self, feedback_fn):
1804 """Run a command on some nodes.
1808 for node in self.nodes:
1809 result = ssh.SSHCall(node, "root", self.op.command)
1810 data.append((node, result.output, result.exit_code))
1815 class LUActivateInstanceDisks(NoHooksLU):
1816 """Bring up an instance's disks.
1819 _OP_REQP = ["instance_name"]
1821 def CheckPrereq(self):
1822 """Check prerequisites.
1824 This checks that the instance is in the cluster.
1827 instance = self.cfg.GetInstanceInfo(
1828 self.cfg.ExpandInstanceName(self.op.instance_name))
1829 if instance is None:
1830 raise errors.OpPrereqError("Instance '%s' not known" %
1831 self.op.instance_name)
1832 self.instance = instance
1835 def Exec(self, feedback_fn):
1836 """Activate the disks.
1839 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1841 raise errors.OpExecError("Cannot activate block devices")
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847 """Prepare the block devices for an instance.
1849 This sets up the block devices on all nodes.
1852 instance: a ganeti.objects.Instance object
1853 ignore_secondaries: if true, errors on secondary nodes won't result
1854 in an error return from the function
1857 false if the operation failed
1858 list of (host, instance_visible_name, node_visible_name) if the operation
1859 suceeded with the mapping from node devices to instance devices
1863 iname = instance.name
1864 # With the two passes mechanism we try to reduce the window of
1865 # opportunity for the race condition of switching DRBD to primary
1866 # before handshaking occured, but we do not eliminate it
1868 # The proper fix would be to wait (with some limits) until the
1869 # connection has been made and drbd transitions from WFConnection
1870 # into any other network-connected state (Connected, SyncTarget,
1873 # 1st pass, assemble on all nodes in secondary mode
1874 for inst_disk in instance.disks:
1875 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876 cfg.SetDiskID(node_disk, node)
1877 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1879 logger.Error("could not prepare block device %s on node %s"
1880 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881 if not ignore_secondaries:
1884 # FIXME: race condition on drbd migration to primary
1886 # 2nd pass, do only the primary node
1887 for inst_disk in instance.disks:
1888 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889 if node != instance.primary_node:
1891 cfg.SetDiskID(node_disk, node)
1892 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1894 logger.Error("could not prepare block device %s on node %s"
1895 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1897 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1899 # leave the disks configured for the primary node
1900 # this is a workaround that would be fixed better by
1901 # improving the logical/physical id handling
1902 for disk in instance.disks:
1903 cfg.SetDiskID(disk, instance.primary_node)
1905 return disks_ok, device_info
1908 def _StartInstanceDisks(cfg, instance, force):
1909 """Start the disks of an instance.
1912 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913 ignore_secondaries=force)
1915 _ShutdownInstanceDisks(instance, cfg)
1916 if force is not None and not force:
1917 logger.Error("If the message above refers to a secondary node,"
1918 " you can retry the operation using '--force'.")
1919 raise errors.OpExecError("Disk consistency error")
1922 class LUDeactivateInstanceDisks(NoHooksLU):
1923 """Shutdown an instance's disks.
1926 _OP_REQP = ["instance_name"]
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 self.instance = instance
1941 def Exec(self, feedback_fn):
1942 """Deactivate the disks
1945 instance = self.instance
1946 ins_l = rpc.call_instance_list([instance.primary_node])
1947 ins_l = ins_l[instance.primary_node]
1948 if not type(ins_l) is list:
1949 raise errors.OpExecError("Can't contact node '%s'" %
1950 instance.primary_node)
1952 if self.instance.name in ins_l:
1953 raise errors.OpExecError("Instance is running, can't shutdown"
1956 _ShutdownInstanceDisks(instance, self.cfg)
1959 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960 """Shutdown block devices of an instance.
1962 This does the shutdown on all nodes of the instance.
1964 If the ignore_primary is false, errors on the primary node are
1969 for disk in instance.disks:
1970 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971 cfg.SetDiskID(top_disk, node)
1972 if not rpc.call_blockdev_shutdown(node, top_disk):
1973 logger.Error("could not shutdown block device %s on node %s" %
1974 (disk.iv_name, node))
1975 if not ignore_primary or node != instance.primary_node:
1980 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981 """Checks if a node has enough free memory.
1983 This function check if a given node has the needed amount of free
1984 memory. In case the node has less memory or we cannot get the
1985 information from the node, this function raise an OpPrereqError
1989 - cfg: a ConfigWriter instance
1990 - node: the node name
1991 - reason: string to use in the error message
1992 - requested: the amount of memory in MiB
1995 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996 if not nodeinfo or not isinstance(nodeinfo, dict):
1997 raise errors.OpPrereqError("Could not contact node %s for resource"
1998 " information" % (node,))
2000 free_mem = nodeinfo[node].get('memory_free')
2001 if not isinstance(free_mem, int):
2002 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003 " was '%s'" % (node, free_mem))
2004 if requested > free_mem:
2005 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006 " needed %s MiB, available %s MiB" %
2007 (node, reason, requested, free_mem))
2010 class LUStartupInstance(LogicalUnit):
2011 """Starts an instance.
2014 HPATH = "instance-start"
2015 HTYPE = constants.HTYPE_INSTANCE
2016 _OP_REQP = ["instance_name", "force"]
2018 def BuildHooksEnv(self):
2021 This runs on master, primary and secondary nodes of the instance.
2025 "FORCE": self.op.force,
2027 env.update(_BuildInstanceHookEnvByObject(self.instance))
2028 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029 list(self.instance.secondary_nodes))
2032 def CheckPrereq(self):
2033 """Check prerequisites.
2035 This checks that the instance is in the cluster.
2038 instance = self.cfg.GetInstanceInfo(
2039 self.cfg.ExpandInstanceName(self.op.instance_name))
2040 if instance is None:
2041 raise errors.OpPrereqError("Instance '%s' not known" %
2042 self.op.instance_name)
2044 # check bridges existance
2045 _CheckInstanceBridgesExist(instance)
2047 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048 "starting instance %s" % instance.name,
2051 self.instance = instance
2052 self.op.instance_name = instance.name
2054 def Exec(self, feedback_fn):
2055 """Start the instance.
2058 instance = self.instance
2059 force = self.op.force
2060 extra_args = getattr(self.op, "extra_args", "")
2062 node_current = instance.primary_node
2064 _StartInstanceDisks(self.cfg, instance, force)
2066 if not rpc.call_instance_start(node_current, instance, extra_args):
2067 _ShutdownInstanceDisks(instance, self.cfg)
2068 raise errors.OpExecError("Could not start instance")
2070 self.cfg.MarkInstanceUp(instance.name)
2073 class LURebootInstance(LogicalUnit):
2074 """Reboot an instance.
2077 HPATH = "instance-reboot"
2078 HTYPE = constants.HTYPE_INSTANCE
2079 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2081 def BuildHooksEnv(self):
2084 This runs on master, primary and secondary nodes of the instance.
2088 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2090 env.update(_BuildInstanceHookEnvByObject(self.instance))
2091 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092 list(self.instance.secondary_nodes))
2095 def CheckPrereq(self):
2096 """Check prerequisites.
2098 This checks that the instance is in the cluster.
2101 instance = self.cfg.GetInstanceInfo(
2102 self.cfg.ExpandInstanceName(self.op.instance_name))
2103 if instance is None:
2104 raise errors.OpPrereqError("Instance '%s' not known" %
2105 self.op.instance_name)
2107 # check bridges existance
2108 _CheckInstanceBridgesExist(instance)
2110 self.instance = instance
2111 self.op.instance_name = instance.name
2113 def Exec(self, feedback_fn):
2114 """Reboot the instance.
2117 instance = self.instance
2118 ignore_secondaries = self.op.ignore_secondaries
2119 reboot_type = self.op.reboot_type
2120 extra_args = getattr(self.op, "extra_args", "")
2122 node_current = instance.primary_node
2124 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125 constants.INSTANCE_REBOOT_HARD,
2126 constants.INSTANCE_REBOOT_FULL]:
2127 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128 (constants.INSTANCE_REBOOT_SOFT,
2129 constants.INSTANCE_REBOOT_HARD,
2130 constants.INSTANCE_REBOOT_FULL))
2132 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133 constants.INSTANCE_REBOOT_HARD]:
2134 if not rpc.call_instance_reboot(node_current, instance,
2135 reboot_type, extra_args):
2136 raise errors.OpExecError("Could not reboot instance")
2138 if not rpc.call_instance_shutdown(node_current, instance):
2139 raise errors.OpExecError("could not shutdown instance for full reboot")
2140 _ShutdownInstanceDisks(instance, self.cfg)
2141 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142 if not rpc.call_instance_start(node_current, instance, extra_args):
2143 _ShutdownInstanceDisks(instance, self.cfg)
2144 raise errors.OpExecError("Could not start instance for full reboot")
2146 self.cfg.MarkInstanceUp(instance.name)
2149 class LUShutdownInstance(LogicalUnit):
2150 """Shutdown an instance.
2153 HPATH = "instance-stop"
2154 HTYPE = constants.HTYPE_INSTANCE
2155 _OP_REQP = ["instance_name"]
2157 def BuildHooksEnv(self):
2160 This runs on master, primary and secondary nodes of the instance.
2163 env = _BuildInstanceHookEnvByObject(self.instance)
2164 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165 list(self.instance.secondary_nodes))
2168 def CheckPrereq(self):
2169 """Check prerequisites.
2171 This checks that the instance is in the cluster.
2174 instance = self.cfg.GetInstanceInfo(
2175 self.cfg.ExpandInstanceName(self.op.instance_name))
2176 if instance is None:
2177 raise errors.OpPrereqError("Instance '%s' not known" %
2178 self.op.instance_name)
2179 self.instance = instance
2181 def Exec(self, feedback_fn):
2182 """Shutdown the instance.
2185 instance = self.instance
2186 node_current = instance.primary_node
2187 if not rpc.call_instance_shutdown(node_current, instance):
2188 logger.Error("could not shutdown instance")
2190 self.cfg.MarkInstanceDown(instance.name)
2191 _ShutdownInstanceDisks(instance, self.cfg)
2194 class LUReinstallInstance(LogicalUnit):
2195 """Reinstall an instance.
2198 HPATH = "instance-reinstall"
2199 HTYPE = constants.HTYPE_INSTANCE
2200 _OP_REQP = ["instance_name"]
2202 def BuildHooksEnv(self):
2205 This runs on master, primary and secondary nodes of the instance.
2208 env = _BuildInstanceHookEnvByObject(self.instance)
2209 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210 list(self.instance.secondary_nodes))
2213 def CheckPrereq(self):
2214 """Check prerequisites.
2216 This checks that the instance is in the cluster and is not running.
2219 instance = self.cfg.GetInstanceInfo(
2220 self.cfg.ExpandInstanceName(self.op.instance_name))
2221 if instance is None:
2222 raise errors.OpPrereqError("Instance '%s' not known" %
2223 self.op.instance_name)
2224 if instance.disk_template == constants.DT_DISKLESS:
2225 raise errors.OpPrereqError("Instance '%s' has no disks" %
2226 self.op.instance_name)
2227 if instance.status != "down":
2228 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229 self.op.instance_name)
2230 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2232 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233 (self.op.instance_name,
2234 instance.primary_node))
2236 self.op.os_type = getattr(self.op, "os_type", None)
2237 if self.op.os_type is not None:
2239 pnode = self.cfg.GetNodeInfo(
2240 self.cfg.ExpandNodeName(instance.primary_node))
2242 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2244 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2246 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247 " primary node" % self.op.os_type)
2249 self.instance = instance
2251 def Exec(self, feedback_fn):
2252 """Reinstall the instance.
2255 inst = self.instance
2257 if self.op.os_type is not None:
2258 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259 inst.os = self.op.os_type
2260 self.cfg.AddInstance(inst)
2262 _StartInstanceDisks(self.cfg, inst, None)
2264 feedback_fn("Running the instance OS create scripts...")
2265 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266 raise errors.OpExecError("Could not install OS for instance %s"
2268 (inst.name, inst.primary_node))
2270 _ShutdownInstanceDisks(inst, self.cfg)
2273 class LURenameInstance(LogicalUnit):
2274 """Rename an instance.
2277 HPATH = "instance-rename"
2278 HTYPE = constants.HTYPE_INSTANCE
2279 _OP_REQP = ["instance_name", "new_name"]
2281 def BuildHooksEnv(self):
2284 This runs on master, primary and secondary nodes of the instance.
2287 env = _BuildInstanceHookEnvByObject(self.instance)
2288 env["INSTANCE_NEW_NAME"] = self.op.new_name
2289 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290 list(self.instance.secondary_nodes))
2293 def CheckPrereq(self):
2294 """Check prerequisites.
2296 This checks that the instance is in the cluster and is not running.
2299 instance = self.cfg.GetInstanceInfo(
2300 self.cfg.ExpandInstanceName(self.op.instance_name))
2301 if instance is None:
2302 raise errors.OpPrereqError("Instance '%s' not known" %
2303 self.op.instance_name)
2304 if instance.status != "down":
2305 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306 self.op.instance_name)
2307 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2309 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310 (self.op.instance_name,
2311 instance.primary_node))
2312 self.instance = instance
2314 # new name verification
2315 name_info = utils.HostInfo(self.op.new_name)
2317 self.op.new_name = new_name = name_info.name
2318 if not getattr(self.op, "ignore_ip", False):
2319 command = ["fping", "-q", name_info.ip]
2320 result = utils.RunCmd(command)
2321 if not result.failed:
2322 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2323 (name_info.ip, new_name))
2326 def Exec(self, feedback_fn):
2327 """Reinstall the instance.
2330 inst = self.instance
2331 old_name = inst.name
2333 self.cfg.RenameInstance(inst.name, self.op.new_name)
2335 # re-read the instance from the configuration after rename
2336 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2338 _StartInstanceDisks(self.cfg, inst, None)
2340 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2342 msg = ("Could run OS rename script for instance %s on node %s (but the"
2343 " instance has been renamed in Ganeti)" %
2344 (inst.name, inst.primary_node))
2347 _ShutdownInstanceDisks(inst, self.cfg)
2350 class LURemoveInstance(LogicalUnit):
2351 """Remove an instance.
2354 HPATH = "instance-remove"
2355 HTYPE = constants.HTYPE_INSTANCE
2356 _OP_REQP = ["instance_name"]
2358 def BuildHooksEnv(self):
2361 This runs on master, primary and secondary nodes of the instance.
2364 env = _BuildInstanceHookEnvByObject(self.instance)
2365 nl = [self.sstore.GetMasterNode()]
2368 def CheckPrereq(self):
2369 """Check prerequisites.
2371 This checks that the instance is in the cluster.
2374 instance = self.cfg.GetInstanceInfo(
2375 self.cfg.ExpandInstanceName(self.op.instance_name))
2376 if instance is None:
2377 raise errors.OpPrereqError("Instance '%s' not known" %
2378 self.op.instance_name)
2379 self.instance = instance
2381 def Exec(self, feedback_fn):
2382 """Remove the instance.
2385 instance = self.instance
2386 logger.Info("shutting down instance %s on node %s" %
2387 (instance.name, instance.primary_node))
2389 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2390 if self.op.ignore_failures:
2391 feedback_fn("Warning: can't shutdown instance")
2393 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2394 (instance.name, instance.primary_node))
2396 logger.Info("removing block devices for instance %s" % instance.name)
2398 if not _RemoveDisks(instance, self.cfg):
2399 if self.op.ignore_failures:
2400 feedback_fn("Warning: can't remove instance's disks")
2402 raise errors.OpExecError("Can't remove instance's disks")
2404 logger.Info("removing instance %s out of cluster config" % instance.name)
2406 self.cfg.RemoveInstance(instance.name)
2409 class LUQueryInstances(NoHooksLU):
2410 """Logical unit for querying instances.
2413 _OP_REQP = ["output_fields", "names"]
2415 def CheckPrereq(self):
2416 """Check prerequisites.
2418 This checks that the fields required are valid output fields.
2421 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2422 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423 "admin_state", "admin_ram",
2424 "disk_template", "ip", "mac", "bridge",
2425 "sda_size", "sdb_size", "vcpus"],
2426 dynamic=self.dynamic_fields,
2427 selected=self.op.output_fields)
2429 self.wanted = _GetWantedInstances(self, self.op.names)
2431 def Exec(self, feedback_fn):
2432 """Computes the list of nodes and their attributes.
2435 instance_names = self.wanted
2436 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2439 # begin data gathering
2441 nodes = frozenset([inst.primary_node for inst in instance_list])
2444 if self.dynamic_fields.intersection(self.op.output_fields):
2446 node_data = rpc.call_all_instances_info(nodes)
2448 result = node_data[name]
2450 live_data.update(result)
2451 elif result == False:
2452 bad_nodes.append(name)
2453 # else no instance is alive
2455 live_data = dict([(name, {}) for name in instance_names])
2457 # end data gathering
2460 for instance in instance_list:
2462 for field in self.op.output_fields:
2467 elif field == "pnode":
2468 val = instance.primary_node
2469 elif field == "snodes":
2470 val = list(instance.secondary_nodes)
2471 elif field == "admin_state":
2472 val = (instance.status != "down")
2473 elif field == "oper_state":
2474 if instance.primary_node in bad_nodes:
2477 val = bool(live_data.get(instance.name))
2478 elif field == "status":
2479 if instance.primary_node in bad_nodes:
2480 val = "ERROR_nodedown"
2482 running = bool(live_data.get(instance.name))
2484 if instance.status != "down":
2489 if instance.status != "down":
2493 elif field == "admin_ram":
2494 val = instance.memory
2495 elif field == "oper_ram":
2496 if instance.primary_node in bad_nodes:
2498 elif instance.name in live_data:
2499 val = live_data[instance.name].get("memory", "?")
2502 elif field == "disk_template":
2503 val = instance.disk_template
2505 val = instance.nics[0].ip
2506 elif field == "bridge":
2507 val = instance.nics[0].bridge
2508 elif field == "mac":
2509 val = instance.nics[0].mac
2510 elif field == "sda_size" or field == "sdb_size":
2511 disk = instance.FindDisk(field[:3])
2516 elif field == "vcpus":
2517 val = instance.vcpus
2519 raise errors.ParameterError(field)
2526 class LUFailoverInstance(LogicalUnit):
2527 """Failover an instance.
2530 HPATH = "instance-failover"
2531 HTYPE = constants.HTYPE_INSTANCE
2532 _OP_REQP = ["instance_name", "ignore_consistency"]
2534 def BuildHooksEnv(self):
2537 This runs on master, primary and secondary nodes of the instance.
2541 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2543 env.update(_BuildInstanceHookEnvByObject(self.instance))
2544 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2547 def CheckPrereq(self):
2548 """Check prerequisites.
2550 This checks that the instance is in the cluster.
2553 instance = self.cfg.GetInstanceInfo(
2554 self.cfg.ExpandInstanceName(self.op.instance_name))
2555 if instance is None:
2556 raise errors.OpPrereqError("Instance '%s' not known" %
2557 self.op.instance_name)
2559 if instance.disk_template not in constants.DTS_NET_MIRROR:
2560 raise errors.OpPrereqError("Instance's disk layout is not"
2561 " network mirrored, cannot failover.")
2563 secondary_nodes = instance.secondary_nodes
2564 if not secondary_nodes:
2565 raise errors.ProgrammerError("no secondary node but using "
2566 "DT_REMOTE_RAID1 template")
2568 target_node = secondary_nodes[0]
2569 # check memory requirements on the secondary node
2570 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2571 instance.name, instance.memory)
2573 # check bridge existance
2574 brlist = [nic.bridge for nic in instance.nics]
2575 if not rpc.call_bridges_exist(target_node, brlist):
2576 raise errors.OpPrereqError("One or more target bridges %s does not"
2577 " exist on destination node '%s'" %
2578 (brlist, target_node))
2580 self.instance = instance
2582 def Exec(self, feedback_fn):
2583 """Failover an instance.
2585 The failover is done by shutting it down on its present node and
2586 starting it on the secondary.
2589 instance = self.instance
2591 source_node = instance.primary_node
2592 target_node = instance.secondary_nodes[0]
2594 feedback_fn("* checking disk consistency between source and target")
2595 for dev in instance.disks:
2596 # for remote_raid1, these are md over drbd
2597 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2598 if not self.op.ignore_consistency:
2599 raise errors.OpExecError("Disk %s is degraded on target node,"
2600 " aborting failover." % dev.iv_name)
2602 feedback_fn("* shutting down instance on source node")
2603 logger.Info("Shutting down instance %s on node %s" %
2604 (instance.name, source_node))
2606 if not rpc.call_instance_shutdown(source_node, instance):
2607 if self.op.ignore_consistency:
2608 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2609 " anyway. Please make sure node %s is down" %
2610 (instance.name, source_node, source_node))
2612 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2613 (instance.name, source_node))
2615 feedback_fn("* deactivating the instance's disks on source node")
2616 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2617 raise errors.OpExecError("Can't shut down the instance's disks.")
2619 instance.primary_node = target_node
2620 # distribute new instance config to the other nodes
2621 self.cfg.AddInstance(instance)
2623 feedback_fn("* activating the instance's disks on target node")
2624 logger.Info("Starting instance %s on node %s" %
2625 (instance.name, target_node))
2627 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2628 ignore_secondaries=True)
2630 _ShutdownInstanceDisks(instance, self.cfg)
2631 raise errors.OpExecError("Can't activate the instance's disks")
2633 feedback_fn("* starting the instance on the target node")
2634 if not rpc.call_instance_start(target_node, instance, None):
2635 _ShutdownInstanceDisks(instance, self.cfg)
2636 raise errors.OpExecError("Could not start instance %s on node %s." %
2637 (instance.name, target_node))
2640 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2641 """Create a tree of block devices on the primary node.
2643 This always creates all devices.
2647 for child in device.children:
2648 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2651 cfg.SetDiskID(device, node)
2652 new_id = rpc.call_blockdev_create(node, device, device.size,
2653 instance.name, True, info)
2656 if device.physical_id is None:
2657 device.physical_id = new_id
2661 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2662 """Create a tree of block devices on a secondary node.
2664 If this device type has to be created on secondaries, create it and
2667 If not, just recurse to children keeping the same 'force' value.
2670 if device.CreateOnSecondary():
2673 for child in device.children:
2674 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2675 child, force, info):
2680 cfg.SetDiskID(device, node)
2681 new_id = rpc.call_blockdev_create(node, device, device.size,
2682 instance.name, False, info)
2685 if device.physical_id is None:
2686 device.physical_id = new_id
2690 def _GenerateUniqueNames(cfg, exts):
2691 """Generate a suitable LV name.
2693 This will generate a logical volume name for the given instance.
2698 new_id = cfg.GenerateUniqueID()
2699 results.append("%s%s" % (new_id, val))
2703 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2704 """Generate a drbd device complete with its children.
2707 port = cfg.AllocatePort()
2708 vgname = cfg.GetVGName()
2709 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2710 logical_id=(vgname, names[0]))
2711 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2712 logical_id=(vgname, names[1]))
2713 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2714 logical_id = (primary, secondary, port),
2715 children = [dev_data, dev_meta])
2719 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2720 """Generate a drbd8 device complete with its children.
2723 port = cfg.AllocatePort()
2724 vgname = cfg.GetVGName()
2725 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2726 logical_id=(vgname, names[0]))
2727 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2728 logical_id=(vgname, names[1]))
2729 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2730 logical_id = (primary, secondary, port),
2731 children = [dev_data, dev_meta],
2735 def _GenerateDiskTemplate(cfg, template_name,
2736 instance_name, primary_node,
2737 secondary_nodes, disk_sz, swap_sz):
2738 """Generate the entire disk layout for a given template type.
2741 #TODO: compute space requirements
2743 vgname = cfg.GetVGName()
2744 if template_name == "diskless":
2746 elif template_name == "plain":
2747 if len(secondary_nodes) != 0:
2748 raise errors.ProgrammerError("Wrong template configuration")
2750 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2751 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2752 logical_id=(vgname, names[0]),
2754 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2755 logical_id=(vgname, names[1]),
2757 disks = [sda_dev, sdb_dev]
2758 elif template_name == "local_raid1":
2759 if len(secondary_nodes) != 0:
2760 raise errors.ProgrammerError("Wrong template configuration")
2763 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2764 ".sdb_m1", ".sdb_m2"])
2765 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2766 logical_id=(vgname, names[0]))
2767 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2768 logical_id=(vgname, names[1]))
2769 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2771 children = [sda_dev_m1, sda_dev_m2])
2772 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2773 logical_id=(vgname, names[2]))
2774 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2775 logical_id=(vgname, names[3]))
2776 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2778 children = [sdb_dev_m1, sdb_dev_m2])
2779 disks = [md_sda_dev, md_sdb_dev]
2780 elif template_name == constants.DT_REMOTE_RAID1:
2781 if len(secondary_nodes) != 1:
2782 raise errors.ProgrammerError("Wrong template configuration")
2783 remote_node = secondary_nodes[0]
2784 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2785 ".sdb_data", ".sdb_meta"])
2786 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2787 disk_sz, names[0:2])
2788 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2789 children = [drbd_sda_dev], size=disk_sz)
2790 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2791 swap_sz, names[2:4])
2792 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2793 children = [drbd_sdb_dev], size=swap_sz)
2794 disks = [md_sda_dev, md_sdb_dev]
2795 elif template_name == constants.DT_DRBD8:
2796 if len(secondary_nodes) != 1:
2797 raise errors.ProgrammerError("Wrong template configuration")
2798 remote_node = secondary_nodes[0]
2799 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2800 ".sdb_data", ".sdb_meta"])
2801 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2802 disk_sz, names[0:2], "sda")
2803 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2804 swap_sz, names[2:4], "sdb")
2805 disks = [drbd_sda_dev, drbd_sdb_dev]
2807 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2811 def _GetInstanceInfoText(instance):
2812 """Compute that text that should be added to the disk's metadata.
2815 return "originstname+%s" % instance.name
2818 def _CreateDisks(cfg, instance):
2819 """Create all disks for an instance.
2821 This abstracts away some work from AddInstance.
2824 instance: the instance object
2827 True or False showing the success of the creation process
2830 info = _GetInstanceInfoText(instance)
2832 for device in instance.disks:
2833 logger.Info("creating volume %s for instance %s" %
2834 (device.iv_name, instance.name))
2836 for secondary_node in instance.secondary_nodes:
2837 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2838 device, False, info):
2839 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2840 (device.iv_name, device, secondary_node))
2843 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2844 instance, device, info):
2845 logger.Error("failed to create volume %s on primary!" %
2851 def _RemoveDisks(instance, cfg):
2852 """Remove all disks for an instance.
2854 This abstracts away some work from `AddInstance()` and
2855 `RemoveInstance()`. Note that in case some of the devices couldn't
2856 be removed, the removal will continue with the other ones (compare
2857 with `_CreateDisks()`).
2860 instance: the instance object
2863 True or False showing the success of the removal proces
2866 logger.Info("removing block devices for instance %s" % instance.name)
2869 for device in instance.disks:
2870 for node, disk in device.ComputeNodeTree(instance.primary_node):
2871 cfg.SetDiskID(disk, node)
2872 if not rpc.call_blockdev_remove(node, disk):
2873 logger.Error("could not remove block device %s on node %s,"
2874 " continuing anyway" %
2875 (device.iv_name, node))
2880 class LUCreateInstance(LogicalUnit):
2881 """Create an instance.
2884 HPATH = "instance-add"
2885 HTYPE = constants.HTYPE_INSTANCE
2886 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2887 "disk_template", "swap_size", "mode", "start", "vcpus",
2888 "wait_for_sync", "ip_check", "mac"]
2890 def BuildHooksEnv(self):
2893 This runs on master, primary and secondary nodes of the instance.
2897 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2898 "INSTANCE_DISK_SIZE": self.op.disk_size,
2899 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2900 "INSTANCE_ADD_MODE": self.op.mode,
2902 if self.op.mode == constants.INSTANCE_IMPORT:
2903 env["INSTANCE_SRC_NODE"] = self.op.src_node
2904 env["INSTANCE_SRC_PATH"] = self.op.src_path
2905 env["INSTANCE_SRC_IMAGE"] = self.src_image
2907 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2908 primary_node=self.op.pnode,
2909 secondary_nodes=self.secondaries,
2910 status=self.instance_status,
2911 os_type=self.op.os_type,
2912 memory=self.op.mem_size,
2913 vcpus=self.op.vcpus,
2914 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2917 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2922 def CheckPrereq(self):
2923 """Check prerequisites.
2926 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2927 if not hasattr(self.op, attr):
2928 setattr(self.op, attr, None)
2930 if self.op.mode not in (constants.INSTANCE_CREATE,
2931 constants.INSTANCE_IMPORT):
2932 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2935 if self.op.mode == constants.INSTANCE_IMPORT:
2936 src_node = getattr(self.op, "src_node", None)
2937 src_path = getattr(self.op, "src_path", None)
2938 if src_node is None or src_path is None:
2939 raise errors.OpPrereqError("Importing an instance requires source"
2940 " node and path options")
2941 src_node_full = self.cfg.ExpandNodeName(src_node)
2942 if src_node_full is None:
2943 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2944 self.op.src_node = src_node = src_node_full
2946 if not os.path.isabs(src_path):
2947 raise errors.OpPrereqError("The source path must be absolute")
2949 export_info = rpc.call_export_info(src_node, src_path)
2952 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2954 if not export_info.has_section(constants.INISECT_EXP):
2955 raise errors.ProgrammerError("Corrupted export config")
2957 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2958 if (int(ei_version) != constants.EXPORT_VERSION):
2959 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2960 (ei_version, constants.EXPORT_VERSION))
2962 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2963 raise errors.OpPrereqError("Can't import instance with more than"
2966 # FIXME: are the old os-es, disk sizes, etc. useful?
2967 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2968 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2970 self.src_image = diskimage
2971 else: # INSTANCE_CREATE
2972 if getattr(self.op, "os_type", None) is None:
2973 raise errors.OpPrereqError("No guest OS specified")
2975 # check primary node
2976 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2978 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2980 self.op.pnode = pnode.name
2982 self.secondaries = []
2983 # disk template and mirror node verification
2984 if self.op.disk_template not in constants.DISK_TEMPLATES:
2985 raise errors.OpPrereqError("Invalid disk template name")
2987 if self.op.disk_template in constants.DTS_NET_MIRROR:
2988 if getattr(self.op, "snode", None) is None:
2989 raise errors.OpPrereqError("The networked disk templates need"
2992 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2993 if snode_name is None:
2994 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2996 elif snode_name == pnode.name:
2997 raise errors.OpPrereqError("The secondary node cannot be"
2998 " the primary node.")
2999 self.secondaries.append(snode_name)
3001 # Required free disk space as a function of disk and swap space
3003 constants.DT_DISKLESS: None,
3004 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3005 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3006 # 256 MB are added for drbd metadata, 128MB for each drbd device
3007 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3008 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3011 if self.op.disk_template not in req_size_dict:
3012 raise errors.ProgrammerError("Disk template '%s' size requirement"
3013 " is unknown" % self.op.disk_template)
3015 req_size = req_size_dict[self.op.disk_template]
3017 # Check lv size requirements
3018 if req_size is not None:
3019 nodenames = [pnode.name] + self.secondaries
3020 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3021 for node in nodenames:
3022 info = nodeinfo.get(node, None)
3024 raise errors.OpPrereqError("Cannot get current information"
3025 " from node '%s'" % nodeinfo)
3026 vg_free = info.get('vg_free', None)
3027 if not isinstance(vg_free, int):
3028 raise errors.OpPrereqError("Can't compute free disk space on"
3030 if req_size > info['vg_free']:
3031 raise errors.OpPrereqError("Not enough disk space on target node %s."
3032 " %d MB available, %d MB required" %
3033 (node, info['vg_free'], req_size))
3036 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3038 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3039 " primary node" % self.op.os_type)
3041 if self.op.kernel_path == constants.VALUE_NONE:
3042 raise errors.OpPrereqError("Can't set instance kernel to none")
3044 # instance verification
3045 hostname1 = utils.HostInfo(self.op.instance_name)
3047 self.op.instance_name = instance_name = hostname1.name
3048 instance_list = self.cfg.GetInstanceList()
3049 if instance_name in instance_list:
3050 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3053 ip = getattr(self.op, "ip", None)
3054 if ip is None or ip.lower() == "none":
3056 elif ip.lower() == "auto":
3057 inst_ip = hostname1.ip
3059 if not utils.IsValidIP(ip):
3060 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3061 " like a valid IP" % ip)
3063 self.inst_ip = inst_ip
3065 if self.op.start and not self.op.ip_check:
3066 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3067 " adding an instance in start mode")
3069 if self.op.ip_check:
3070 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3071 constants.DEFAULT_NODED_PORT):
3072 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3073 (hostname1.ip, instance_name))
3075 # MAC address verification
3076 if self.op.mac != "auto":
3077 if not utils.IsValidMac(self.op.mac.lower()):
3078 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3081 # bridge verification
3082 bridge = getattr(self.op, "bridge", None)
3084 self.op.bridge = self.cfg.GetDefBridge()
3086 self.op.bridge = bridge
3088 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3089 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3090 " destination node '%s'" %
3091 (self.op.bridge, pnode.name))
3093 # boot order verification
3094 if self.op.hvm_boot_order is not None:
3095 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3096 raise errors.OpPrereqError("invalid boot order specified,"
3097 " must be one or more of [acdn]")
3100 self.instance_status = 'up'
3102 self.instance_status = 'down'
3104 def Exec(self, feedback_fn):
3105 """Create and add the instance to the cluster.
3108 instance = self.op.instance_name
3109 pnode_name = self.pnode.name
3111 if self.op.mac == "auto":
3112 mac_address = self.cfg.GenerateMAC()
3114 mac_address = self.op.mac
3116 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3117 if self.inst_ip is not None:
3118 nic.ip = self.inst_ip
3120 ht_kind = self.sstore.GetHypervisorType()
3121 if ht_kind in constants.HTS_REQ_PORT:
3122 network_port = self.cfg.AllocatePort()
3126 disks = _GenerateDiskTemplate(self.cfg,
3127 self.op.disk_template,
3128 instance, pnode_name,
3129 self.secondaries, self.op.disk_size,
3132 iobj = objects.Instance(name=instance, os=self.op.os_type,
3133 primary_node=pnode_name,
3134 memory=self.op.mem_size,
3135 vcpus=self.op.vcpus,
3136 nics=[nic], disks=disks,
3137 disk_template=self.op.disk_template,
3138 status=self.instance_status,
3139 network_port=network_port,
3140 kernel_path=self.op.kernel_path,
3141 initrd_path=self.op.initrd_path,
3142 hvm_boot_order=self.op.hvm_boot_order,
3145 feedback_fn("* creating instance disks...")
3146 if not _CreateDisks(self.cfg, iobj):
3147 _RemoveDisks(iobj, self.cfg)
3148 raise errors.OpExecError("Device creation failed, reverting...")
3150 feedback_fn("adding instance %s to cluster config" % instance)
3152 self.cfg.AddInstance(iobj)
3154 if self.op.wait_for_sync:
3155 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3156 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3157 # make sure the disks are not degraded (still sync-ing is ok)
3159 feedback_fn("* checking mirrors status")
3160 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3165 _RemoveDisks(iobj, self.cfg)
3166 self.cfg.RemoveInstance(iobj.name)
3167 raise errors.OpExecError("There are some degraded disks for"
3170 feedback_fn("creating os for instance %s on node %s" %
3171 (instance, pnode_name))
3173 if iobj.disk_template != constants.DT_DISKLESS:
3174 if self.op.mode == constants.INSTANCE_CREATE:
3175 feedback_fn("* running the instance OS create scripts...")
3176 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3177 raise errors.OpExecError("could not add os for instance %s"
3179 (instance, pnode_name))
3181 elif self.op.mode == constants.INSTANCE_IMPORT:
3182 feedback_fn("* running the instance OS import scripts...")
3183 src_node = self.op.src_node
3184 src_image = self.src_image
3185 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3186 src_node, src_image):
3187 raise errors.OpExecError("Could not import os for instance"
3189 (instance, pnode_name))
3191 # also checked in the prereq part
3192 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3196 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3197 feedback_fn("* starting instance...")
3198 if not rpc.call_instance_start(pnode_name, iobj, None):
3199 raise errors.OpExecError("Could not start instance")
3202 class LUConnectConsole(NoHooksLU):
3203 """Connect to an instance's console.
3205 This is somewhat special in that it returns the command line that
3206 you need to run on the master node in order to connect to the
3210 _OP_REQP = ["instance_name"]
3212 def CheckPrereq(self):
3213 """Check prerequisites.
3215 This checks that the instance is in the cluster.
3218 instance = self.cfg.GetInstanceInfo(
3219 self.cfg.ExpandInstanceName(self.op.instance_name))
3220 if instance is None:
3221 raise errors.OpPrereqError("Instance '%s' not known" %
3222 self.op.instance_name)
3223 self.instance = instance
3225 def Exec(self, feedback_fn):
3226 """Connect to the console of an instance
3229 instance = self.instance
3230 node = instance.primary_node
3232 node_insts = rpc.call_instance_list([node])[node]
3233 if node_insts is False:
3234 raise errors.OpExecError("Can't connect to node %s." % node)
3236 if instance.name not in node_insts:
3237 raise errors.OpExecError("Instance %s is not running." % instance.name)
3239 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3241 hyper = hypervisor.GetHypervisor()
3242 console_cmd = hyper.GetShellCommandForConsole(instance)
3244 argv = ["ssh", "-q", "-t"]
3245 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3246 argv.extend(ssh.BATCH_MODE_OPTS)
3248 argv.append(console_cmd)
3252 class LUAddMDDRBDComponent(LogicalUnit):
3253 """Adda new mirror member to an instance's disk.
3256 HPATH = "mirror-add"
3257 HTYPE = constants.HTYPE_INSTANCE
3258 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3260 def BuildHooksEnv(self):
3263 This runs on the master, the primary and all the secondaries.
3267 "NEW_SECONDARY": self.op.remote_node,
3268 "DISK_NAME": self.op.disk_name,
3270 env.update(_BuildInstanceHookEnvByObject(self.instance))
3271 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3272 self.op.remote_node,] + list(self.instance.secondary_nodes)
3275 def CheckPrereq(self):
3276 """Check prerequisites.
3278 This checks that the instance is in the cluster.
3281 instance = self.cfg.GetInstanceInfo(
3282 self.cfg.ExpandInstanceName(self.op.instance_name))
3283 if instance is None:
3284 raise errors.OpPrereqError("Instance '%s' not known" %
3285 self.op.instance_name)
3286 self.instance = instance
3288 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3289 if remote_node is None:
3290 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3291 self.remote_node = remote_node
3293 if remote_node == instance.primary_node:
3294 raise errors.OpPrereqError("The specified node is the primary node of"
3297 if instance.disk_template != constants.DT_REMOTE_RAID1:
3298 raise errors.OpPrereqError("Instance's disk layout is not"
3300 for disk in instance.disks:
3301 if disk.iv_name == self.op.disk_name:
3304 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3305 " instance." % self.op.disk_name)
3306 if len(disk.children) > 1:
3307 raise errors.OpPrereqError("The device already has two slave devices."
3308 " This would create a 3-disk raid1 which we"
3312 def Exec(self, feedback_fn):
3313 """Add the mirror component
3317 instance = self.instance
3319 remote_node = self.remote_node
3320 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3321 names = _GenerateUniqueNames(self.cfg, lv_names)
3322 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3323 remote_node, disk.size, names)
3325 logger.Info("adding new mirror component on secondary")
3327 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3329 _GetInstanceInfoText(instance)):
3330 raise errors.OpExecError("Failed to create new component on secondary"
3331 " node %s" % remote_node)
3333 logger.Info("adding new mirror component on primary")
3335 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3337 _GetInstanceInfoText(instance)):
3338 # remove secondary dev
3339 self.cfg.SetDiskID(new_drbd, remote_node)
3340 rpc.call_blockdev_remove(remote_node, new_drbd)
3341 raise errors.OpExecError("Failed to create volume on primary")
3343 # the device exists now
3344 # call the primary node to add the mirror to md
3345 logger.Info("adding new mirror component to md")
3346 if not rpc.call_blockdev_addchildren(instance.primary_node,
3348 logger.Error("Can't add mirror compoment to md!")
3349 self.cfg.SetDiskID(new_drbd, remote_node)
3350 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3351 logger.Error("Can't rollback on secondary")
3352 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3353 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3354 logger.Error("Can't rollback on primary")
3355 raise errors.OpExecError("Can't add mirror component to md array")
3357 disk.children.append(new_drbd)
3359 self.cfg.AddInstance(instance)
3361 _WaitForSync(self.cfg, instance, self.proc)
3366 class LURemoveMDDRBDComponent(LogicalUnit):
3367 """Remove a component from a remote_raid1 disk.
3370 HPATH = "mirror-remove"
3371 HTYPE = constants.HTYPE_INSTANCE
3372 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3374 def BuildHooksEnv(self):
3377 This runs on the master, the primary and all the secondaries.
3381 "DISK_NAME": self.op.disk_name,
3382 "DISK_ID": self.op.disk_id,
3383 "OLD_SECONDARY": self.old_secondary,
3385 env.update(_BuildInstanceHookEnvByObject(self.instance))
3386 nl = [self.sstore.GetMasterNode(),
3387 self.instance.primary_node] + list(self.instance.secondary_nodes)
3390 def CheckPrereq(self):
3391 """Check prerequisites.
3393 This checks that the instance is in the cluster.
3396 instance = self.cfg.GetInstanceInfo(
3397 self.cfg.ExpandInstanceName(self.op.instance_name))
3398 if instance is None:
3399 raise errors.OpPrereqError("Instance '%s' not known" %
3400 self.op.instance_name)
3401 self.instance = instance
3403 if instance.disk_template != constants.DT_REMOTE_RAID1:
3404 raise errors.OpPrereqError("Instance's disk layout is not"
3406 for disk in instance.disks:
3407 if disk.iv_name == self.op.disk_name:
3410 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3411 " instance." % self.op.disk_name)
3412 for child in disk.children:
3413 if (child.dev_type == constants.LD_DRBD7 and
3414 child.logical_id[2] == self.op.disk_id):
3417 raise errors.OpPrereqError("Can't find the device with this port.")
3419 if len(disk.children) < 2:
3420 raise errors.OpPrereqError("Cannot remove the last component from"
3424 if self.child.logical_id[0] == instance.primary_node:
3428 self.old_secondary = self.child.logical_id[oid]
3430 def Exec(self, feedback_fn):
3431 """Remove the mirror component
3434 instance = self.instance
3437 logger.Info("remove mirror component")
3438 self.cfg.SetDiskID(disk, instance.primary_node)
3439 if not rpc.call_blockdev_removechildren(instance.primary_node,
3441 raise errors.OpExecError("Can't remove child from mirror.")
3443 for node in child.logical_id[:2]:
3444 self.cfg.SetDiskID(child, node)
3445 if not rpc.call_blockdev_remove(node, child):
3446 logger.Error("Warning: failed to remove device from node %s,"
3447 " continuing operation." % node)
3449 disk.children.remove(child)
3450 self.cfg.AddInstance(instance)
3453 class LUReplaceDisks(LogicalUnit):
3454 """Replace the disks of an instance.
3457 HPATH = "mirrors-replace"
3458 HTYPE = constants.HTYPE_INSTANCE
3459 _OP_REQP = ["instance_name", "mode", "disks"]
3461 def BuildHooksEnv(self):
3464 This runs on the master, the primary and all the secondaries.
3468 "MODE": self.op.mode,
3469 "NEW_SECONDARY": self.op.remote_node,
3470 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3472 env.update(_BuildInstanceHookEnvByObject(self.instance))
3474 self.sstore.GetMasterNode(),
3475 self.instance.primary_node,
3477 if self.op.remote_node is not None:
3478 nl.append(self.op.remote_node)
3481 def CheckPrereq(self):
3482 """Check prerequisites.
3484 This checks that the instance is in the cluster.
3487 instance = self.cfg.GetInstanceInfo(
3488 self.cfg.ExpandInstanceName(self.op.instance_name))
3489 if instance is None:
3490 raise errors.OpPrereqError("Instance '%s' not known" %
3491 self.op.instance_name)
3492 self.instance = instance
3493 self.op.instance_name = instance.name
3495 if instance.disk_template not in constants.DTS_NET_MIRROR:
3496 raise errors.OpPrereqError("Instance's disk layout is not"
3497 " network mirrored.")
3499 if len(instance.secondary_nodes) != 1:
3500 raise errors.OpPrereqError("The instance has a strange layout,"
3501 " expected one secondary but found %d" %
3502 len(instance.secondary_nodes))
3504 self.sec_node = instance.secondary_nodes[0]
3506 remote_node = getattr(self.op, "remote_node", None)
3507 if remote_node is not None:
3508 remote_node = self.cfg.ExpandNodeName(remote_node)
3509 if remote_node is None:
3510 raise errors.OpPrereqError("Node '%s' not known" %
3511 self.op.remote_node)
3512 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3514 self.remote_node_info = None
3515 if remote_node == instance.primary_node:
3516 raise errors.OpPrereqError("The specified node is the primary node of"
3518 elif remote_node == self.sec_node:
3519 if self.op.mode == constants.REPLACE_DISK_SEC:
3520 # this is for DRBD8, where we can't execute the same mode of
3521 # replacement as for drbd7 (no different port allocated)
3522 raise errors.OpPrereqError("Same secondary given, cannot execute"
3524 # the user gave the current secondary, switch to
3525 # 'no-replace-secondary' mode for drbd7
3527 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3528 self.op.mode != constants.REPLACE_DISK_ALL):
3529 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3530 " disks replacement, not individual ones")
3531 if instance.disk_template == constants.DT_DRBD8:
3532 if (self.op.mode == constants.REPLACE_DISK_ALL and
3533 remote_node is not None):
3534 # switch to replace secondary mode
3535 self.op.mode = constants.REPLACE_DISK_SEC
3537 if self.op.mode == constants.REPLACE_DISK_ALL:
3538 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3539 " secondary disk replacement, not"
3541 elif self.op.mode == constants.REPLACE_DISK_PRI:
3542 if remote_node is not None:
3543 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3544 " the secondary while doing a primary"
3545 " node disk replacement")
3546 self.tgt_node = instance.primary_node
3547 self.oth_node = instance.secondary_nodes[0]
3548 elif self.op.mode == constants.REPLACE_DISK_SEC:
3549 self.new_node = remote_node # this can be None, in which case
3550 # we don't change the secondary
3551 self.tgt_node = instance.secondary_nodes[0]
3552 self.oth_node = instance.primary_node
3554 raise errors.ProgrammerError("Unhandled disk replace mode")
3556 for name in self.op.disks:
3557 if instance.FindDisk(name) is None:
3558 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3559 (name, instance.name))
3560 self.op.remote_node = remote_node
3562 def _ExecRR1(self, feedback_fn):
3563 """Replace the disks of an instance.
3566 instance = self.instance
3569 if self.op.remote_node is None:
3570 remote_node = self.sec_node
3572 remote_node = self.op.remote_node
3574 for dev in instance.disks:
3576 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3577 names = _GenerateUniqueNames(cfg, lv_names)
3578 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3579 remote_node, size, names)
3580 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3581 logger.Info("adding new mirror component on secondary for %s" %
3584 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3586 _GetInstanceInfoText(instance)):
3587 raise errors.OpExecError("Failed to create new component on secondary"
3588 " node %s. Full abort, cleanup manually!" %
3591 logger.Info("adding new mirror component on primary")
3593 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3595 _GetInstanceInfoText(instance)):
3596 # remove secondary dev
3597 cfg.SetDiskID(new_drbd, remote_node)
3598 rpc.call_blockdev_remove(remote_node, new_drbd)
3599 raise errors.OpExecError("Failed to create volume on primary!"
3600 " Full abort, cleanup manually!!")
3602 # the device exists now
3603 # call the primary node to add the mirror to md
3604 logger.Info("adding new mirror component to md")
3605 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3607 logger.Error("Can't add mirror compoment to md!")
3608 cfg.SetDiskID(new_drbd, remote_node)
3609 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3610 logger.Error("Can't rollback on secondary")
3611 cfg.SetDiskID(new_drbd, instance.primary_node)
3612 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3613 logger.Error("Can't rollback on primary")
3614 raise errors.OpExecError("Full abort, cleanup manually!!")
3616 dev.children.append(new_drbd)
3617 cfg.AddInstance(instance)
3619 # this can fail as the old devices are degraded and _WaitForSync
3620 # does a combined result over all disks, so we don't check its
3622 _WaitForSync(cfg, instance, self.proc, unlock=True)
3624 # so check manually all the devices
3625 for name in iv_names:
3626 dev, child, new_drbd = iv_names[name]
3627 cfg.SetDiskID(dev, instance.primary_node)
3628 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3630 raise errors.OpExecError("MD device %s is degraded!" % name)
3631 cfg.SetDiskID(new_drbd, instance.primary_node)
3632 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3634 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3636 for name in iv_names:
3637 dev, child, new_drbd = iv_names[name]
3638 logger.Info("remove mirror %s component" % name)
3639 cfg.SetDiskID(dev, instance.primary_node)
3640 if not rpc.call_blockdev_removechildren(instance.primary_node,
3642 logger.Error("Can't remove child from mirror, aborting"
3643 " *this device cleanup*.\nYou need to cleanup manually!!")
3646 for node in child.logical_id[:2]:
3647 logger.Info("remove child device on %s" % node)
3648 cfg.SetDiskID(child, node)
3649 if not rpc.call_blockdev_remove(node, child):
3650 logger.Error("Warning: failed to remove device from node %s,"
3651 " continuing operation." % node)
3653 dev.children.remove(child)
3655 cfg.AddInstance(instance)
3657 def _ExecD8DiskOnly(self, feedback_fn):
3658 """Replace a disk on the primary or secondary for dbrd8.
3660 The algorithm for replace is quite complicated:
3661 - for each disk to be replaced:
3662 - create new LVs on the target node with unique names
3663 - detach old LVs from the drbd device
3664 - rename old LVs to name_replaced.<time_t>
3665 - rename new LVs to old LVs
3666 - attach the new LVs (with the old names now) to the drbd device
3667 - wait for sync across all devices
3668 - for each modified disk:
3669 - remove old LVs (which have the name name_replaces.<time_t>)
3671 Failures are not very well handled.
3675 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3676 instance = self.instance
3678 vgname = self.cfg.GetVGName()
3681 tgt_node = self.tgt_node
3682 oth_node = self.oth_node
3684 # Step: check device activation
3685 self.proc.LogStep(1, steps_total, "check device existence")
3686 info("checking volume groups")
3687 my_vg = cfg.GetVGName()
3688 results = rpc.call_vg_list([oth_node, tgt_node])
3690 raise errors.OpExecError("Can't list volume groups on the nodes")
3691 for node in oth_node, tgt_node:
3692 res = results.get(node, False)
3693 if not res or my_vg not in res:
3694 raise errors.OpExecError("Volume group '%s' not found on %s" %
3696 for dev in instance.disks:
3697 if not dev.iv_name in self.op.disks:
3699 for node in tgt_node, oth_node:
3700 info("checking %s on %s" % (dev.iv_name, node))
3701 cfg.SetDiskID(dev, node)
3702 if not rpc.call_blockdev_find(node, dev):
3703 raise errors.OpExecError("Can't find device %s on node %s" %
3704 (dev.iv_name, node))
3706 # Step: check other node consistency
3707 self.proc.LogStep(2, steps_total, "check peer consistency")
3708 for dev in instance.disks:
3709 if not dev.iv_name in self.op.disks:
3711 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3712 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3713 oth_node==instance.primary_node):
3714 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3715 " to replace disks on this node (%s)" %
3716 (oth_node, tgt_node))
3718 # Step: create new storage
3719 self.proc.LogStep(3, steps_total, "allocate new storage")
3720 for dev in instance.disks:
3721 if not dev.iv_name in self.op.disks:
3724 cfg.SetDiskID(dev, tgt_node)
3725 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3726 names = _GenerateUniqueNames(cfg, lv_names)
3727 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3728 logical_id=(vgname, names[0]))
3729 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3730 logical_id=(vgname, names[1]))
3731 new_lvs = [lv_data, lv_meta]
3732 old_lvs = dev.children
3733 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3734 info("creating new local storage on %s for %s" %
3735 (tgt_node, dev.iv_name))
3736 # since we *always* want to create this LV, we use the
3737 # _Create...OnPrimary (which forces the creation), even if we
3738 # are talking about the secondary node
3739 for new_lv in new_lvs:
3740 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3741 _GetInstanceInfoText(instance)):
3742 raise errors.OpExecError("Failed to create new LV named '%s' on"
3744 (new_lv.logical_id[1], tgt_node))
3746 # Step: for each lv, detach+rename*2+attach
3747 self.proc.LogStep(4, steps_total, "change drbd configuration")
3748 for dev, old_lvs, new_lvs in iv_names.itervalues():
3749 info("detaching %s drbd from local storage" % dev.iv_name)
3750 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3751 raise errors.OpExecError("Can't detach drbd from local storage on node"
3752 " %s for device %s" % (tgt_node, dev.iv_name))
3754 #cfg.Update(instance)
3756 # ok, we created the new LVs, so now we know we have the needed
3757 # storage; as such, we proceed on the target node to rename
3758 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3759 # using the assumption than logical_id == physical_id (which in
3760 # turn is the unique_id on that node)
3762 # FIXME(iustin): use a better name for the replaced LVs
3763 temp_suffix = int(time.time())
3764 ren_fn = lambda d, suff: (d.physical_id[0],
3765 d.physical_id[1] + "_replaced-%s" % suff)
3766 # build the rename list based on what LVs exist on the node
3768 for to_ren in old_lvs:
3769 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3770 if find_res is not None: # device exists
3771 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3773 info("renaming the old LVs on the target node")
3774 if not rpc.call_blockdev_rename(tgt_node, rlist):
3775 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3776 # now we rename the new LVs to the old LVs
3777 info("renaming the new LVs on the target node")
3778 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3779 if not rpc.call_blockdev_rename(tgt_node, rlist):
3780 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3782 for old, new in zip(old_lvs, new_lvs):
3783 new.logical_id = old.logical_id
3784 cfg.SetDiskID(new, tgt_node)
3786 for disk in old_lvs:
3787 disk.logical_id = ren_fn(disk, temp_suffix)
3788 cfg.SetDiskID(disk, tgt_node)
3790 # now that the new lvs have the old name, we can add them to the device
3791 info("adding new mirror component on %s" % tgt_node)
3792 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3793 for new_lv in new_lvs:
3794 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3795 warning("Can't rollback device %s", hint="manually cleanup unused"
3797 raise errors.OpExecError("Can't add local storage to drbd")
3799 dev.children = new_lvs
3800 cfg.Update(instance)
3802 # Step: wait for sync
3804 # this can fail as the old devices are degraded and _WaitForSync
3805 # does a combined result over all disks, so we don't check its
3807 self.proc.LogStep(5, steps_total, "sync devices")
3808 _WaitForSync(cfg, instance, self.proc, unlock=True)
3810 # so check manually all the devices
3811 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3812 cfg.SetDiskID(dev, instance.primary_node)
3813 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3815 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3817 # Step: remove old storage
3818 self.proc.LogStep(6, steps_total, "removing old storage")
3819 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3820 info("remove logical volumes for %s" % name)
3822 cfg.SetDiskID(lv, tgt_node)
3823 if not rpc.call_blockdev_remove(tgt_node, lv):
3824 warning("Can't remove old LV", hint="manually remove unused LVs")
3827 def _ExecD8Secondary(self, feedback_fn):
3828 """Replace the secondary node for drbd8.
3830 The algorithm for replace is quite complicated:
3831 - for all disks of the instance:
3832 - create new LVs on the new node with same names
3833 - shutdown the drbd device on the old secondary
3834 - disconnect the drbd network on the primary
3835 - create the drbd device on the new secondary
3836 - network attach the drbd on the primary, using an artifice:
3837 the drbd code for Attach() will connect to the network if it
3838 finds a device which is connected to the good local disks but
3840 - wait for sync across all devices
3841 - remove all disks from the old secondary
3843 Failures are not very well handled.
3847 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3848 instance = self.instance
3850 vgname = self.cfg.GetVGName()
3853 old_node = self.tgt_node
3854 new_node = self.new_node
3855 pri_node = instance.primary_node
3857 # Step: check device activation
3858 self.proc.LogStep(1, steps_total, "check device existence")
3859 info("checking volume groups")
3860 my_vg = cfg.GetVGName()
3861 results = rpc.call_vg_list([pri_node, new_node])
3863 raise errors.OpExecError("Can't list volume groups on the nodes")
3864 for node in pri_node, new_node:
3865 res = results.get(node, False)
3866 if not res or my_vg not in res:
3867 raise errors.OpExecError("Volume group '%s' not found on %s" %
3869 for dev in instance.disks:
3870 if not dev.iv_name in self.op.disks:
3872 info("checking %s on %s" % (dev.iv_name, pri_node))
3873 cfg.SetDiskID(dev, pri_node)
3874 if not rpc.call_blockdev_find(pri_node, dev):
3875 raise errors.OpExecError("Can't find device %s on node %s" %
3876 (dev.iv_name, pri_node))
3878 # Step: check other node consistency
3879 self.proc.LogStep(2, steps_total, "check peer consistency")
3880 for dev in instance.disks:
3881 if not dev.iv_name in self.op.disks:
3883 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3884 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3885 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3886 " unsafe to replace the secondary" %
3889 # Step: create new storage
3890 self.proc.LogStep(3, steps_total, "allocate new storage")
3891 for dev in instance.disks:
3893 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3894 # since we *always* want to create this LV, we use the
3895 # _Create...OnPrimary (which forces the creation), even if we
3896 # are talking about the secondary node
3897 for new_lv in dev.children:
3898 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3899 _GetInstanceInfoText(instance)):
3900 raise errors.OpExecError("Failed to create new LV named '%s' on"
3902 (new_lv.logical_id[1], new_node))
3904 iv_names[dev.iv_name] = (dev, dev.children)
3906 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3907 for dev in instance.disks:
3909 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3910 # create new devices on new_node
3911 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3912 logical_id=(pri_node, new_node,
3914 children=dev.children)
3915 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3917 _GetInstanceInfoText(instance)):
3918 raise errors.OpExecError("Failed to create new DRBD on"
3919 " node '%s'" % new_node)
3921 for dev in instance.disks:
3922 # we have new devices, shutdown the drbd on the old secondary
3923 info("shutting down drbd for %s on old node" % dev.iv_name)
3924 cfg.SetDiskID(dev, old_node)
3925 if not rpc.call_blockdev_shutdown(old_node, dev):
3926 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3927 hint="Please cleanup this device manually as soon as possible")
3929 info("detaching primary drbds from the network (=> standalone)")
3931 for dev in instance.disks:
3932 cfg.SetDiskID(dev, pri_node)
3933 # set the physical (unique in bdev terms) id to None, meaning
3934 # detach from network
3935 dev.physical_id = (None,) * len(dev.physical_id)
3936 # and 'find' the device, which will 'fix' it to match the
3938 if rpc.call_blockdev_find(pri_node, dev):
3941 warning("Failed to detach drbd %s from network, unusual case" %
3945 # no detaches succeeded (very unlikely)
3946 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3948 # if we managed to detach at least one, we update all the disks of
3949 # the instance to point to the new secondary
3950 info("updating instance configuration")
3951 for dev in instance.disks:
3952 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3953 cfg.SetDiskID(dev, pri_node)
3954 cfg.Update(instance)
3956 # and now perform the drbd attach
3957 info("attaching primary drbds to new secondary (standalone => connected)")
3959 for dev in instance.disks:
3960 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3961 # since the attach is smart, it's enough to 'find' the device,
3962 # it will automatically activate the network, if the physical_id
3964 cfg.SetDiskID(dev, pri_node)
3965 if not rpc.call_blockdev_find(pri_node, dev):
3966 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3967 "please do a gnt-instance info to see the status of disks")
3969 # this can fail as the old devices are degraded and _WaitForSync
3970 # does a combined result over all disks, so we don't check its
3972 self.proc.LogStep(5, steps_total, "sync devices")
3973 _WaitForSync(cfg, instance, self.proc, unlock=True)
3975 # so check manually all the devices
3976 for name, (dev, old_lvs) in iv_names.iteritems():
3977 cfg.SetDiskID(dev, pri_node)
3978 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3980 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3982 self.proc.LogStep(6, steps_total, "removing old storage")
3983 for name, (dev, old_lvs) in iv_names.iteritems():
3984 info("remove logical volumes for %s" % name)
3986 cfg.SetDiskID(lv, old_node)
3987 if not rpc.call_blockdev_remove(old_node, lv):
3988 warning("Can't remove LV on old secondary",
3989 hint="Cleanup stale volumes by hand")
3991 def Exec(self, feedback_fn):
3992 """Execute disk replacement.
3994 This dispatches the disk replacement to the appropriate handler.
3997 instance = self.instance
3998 if instance.disk_template == constants.DT_REMOTE_RAID1:
4000 elif instance.disk_template == constants.DT_DRBD8:
4001 if self.op.remote_node is None:
4002 fn = self._ExecD8DiskOnly
4004 fn = self._ExecD8Secondary
4006 raise errors.ProgrammerError("Unhandled disk replacement case")
4007 return fn(feedback_fn)
4010 class LUQueryInstanceData(NoHooksLU):
4011 """Query runtime instance data.
4014 _OP_REQP = ["instances"]
4016 def CheckPrereq(self):
4017 """Check prerequisites.
4019 This only checks the optional instance list against the existing names.
4022 if not isinstance(self.op.instances, list):
4023 raise errors.OpPrereqError("Invalid argument type 'instances'")
4024 if self.op.instances:
4025 self.wanted_instances = []
4026 names = self.op.instances
4028 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4029 if instance is None:
4030 raise errors.OpPrereqError("No such instance name '%s'" % name)
4031 self.wanted_instances.append(instance)
4033 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4034 in self.cfg.GetInstanceList()]
4038 def _ComputeDiskStatus(self, instance, snode, dev):
4039 """Compute block device status.
4042 self.cfg.SetDiskID(dev, instance.primary_node)
4043 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4044 if dev.dev_type in constants.LDS_DRBD:
4045 # we change the snode then (otherwise we use the one passed in)
4046 if dev.logical_id[0] == instance.primary_node:
4047 snode = dev.logical_id[1]
4049 snode = dev.logical_id[0]
4052 self.cfg.SetDiskID(dev, snode)
4053 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4058 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4059 for child in dev.children]
4064 "iv_name": dev.iv_name,
4065 "dev_type": dev.dev_type,
4066 "logical_id": dev.logical_id,
4067 "physical_id": dev.physical_id,
4068 "pstatus": dev_pstatus,
4069 "sstatus": dev_sstatus,
4070 "children": dev_children,
4075 def Exec(self, feedback_fn):
4076 """Gather and return data"""
4078 for instance in self.wanted_instances:
4079 remote_info = rpc.call_instance_info(instance.primary_node,
4081 if remote_info and "state" in remote_info:
4084 remote_state = "down"
4085 if instance.status == "down":
4086 config_state = "down"
4090 disks = [self._ComputeDiskStatus(instance, None, device)
4091 for device in instance.disks]
4094 "name": instance.name,
4095 "config_state": config_state,
4096 "run_state": remote_state,
4097 "pnode": instance.primary_node,
4098 "snodes": instance.secondary_nodes,
4100 "memory": instance.memory,
4101 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4103 "network_port": instance.network_port,
4104 "vcpus": instance.vcpus,
4105 "kernel_path": instance.kernel_path,
4106 "initrd_path": instance.initrd_path,
4107 "hvm_boot_order": instance.hvm_boot_order,
4110 result[instance.name] = idict
4115 class LUSetInstanceParms(LogicalUnit):
4116 """Modifies an instances's parameters.
4119 HPATH = "instance-modify"
4120 HTYPE = constants.HTYPE_INSTANCE
4121 _OP_REQP = ["instance_name"]
4123 def BuildHooksEnv(self):
4126 This runs on the master, primary and secondaries.
4131 args['memory'] = self.mem
4133 args['vcpus'] = self.vcpus
4134 if self.do_ip or self.do_bridge or self.mac:
4138 ip = self.instance.nics[0].ip
4140 bridge = self.bridge
4142 bridge = self.instance.nics[0].bridge
4146 mac = self.instance.nics[0].mac
4147 args['nics'] = [(ip, bridge, mac)]
4148 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4149 nl = [self.sstore.GetMasterNode(),
4150 self.instance.primary_node] + list(self.instance.secondary_nodes)
4153 def CheckPrereq(self):
4154 """Check prerequisites.
4156 This only checks the instance list against the existing names.
4159 self.mem = getattr(self.op, "mem", None)
4160 self.vcpus = getattr(self.op, "vcpus", None)
4161 self.ip = getattr(self.op, "ip", None)
4162 self.mac = getattr(self.op, "mac", None)
4163 self.bridge = getattr(self.op, "bridge", None)
4164 self.kernel_path = getattr(self.op, "kernel_path", None)
4165 self.initrd_path = getattr(self.op, "initrd_path", None)
4166 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4167 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4168 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4169 if all_parms.count(None) == len(all_parms):
4170 raise errors.OpPrereqError("No changes submitted")
4171 if self.mem is not None:
4173 self.mem = int(self.mem)
4174 except ValueError, err:
4175 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4176 if self.vcpus is not None:
4178 self.vcpus = int(self.vcpus)
4179 except ValueError, err:
4180 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4181 if self.ip is not None:
4183 if self.ip.lower() == "none":
4186 if not utils.IsValidIP(self.ip):
4187 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4190 self.do_bridge = (self.bridge is not None)
4191 if self.mac is not None:
4192 if self.cfg.IsMacInUse(self.mac):
4193 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4195 if not utils.IsValidMac(self.mac):
4196 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4198 if self.kernel_path is not None:
4199 self.do_kernel_path = True
4200 if self.kernel_path == constants.VALUE_NONE:
4201 raise errors.OpPrereqError("Can't set instance to no kernel")
4203 if self.kernel_path != constants.VALUE_DEFAULT:
4204 if not os.path.isabs(self.kernel_path):
4205 raise errors.OpPrereqError("The kernel path must be an absolute"
4208 self.do_kernel_path = False
4210 if self.initrd_path is not None:
4211 self.do_initrd_path = True
4212 if self.initrd_path not in (constants.VALUE_NONE,
4213 constants.VALUE_DEFAULT):
4214 if not os.path.isabs(self.initrd_path):
4215 raise errors.OpPrereqError("The initrd path must be an absolute"
4218 self.do_initrd_path = False
4220 # boot order verification
4221 if self.hvm_boot_order is not None:
4222 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4223 if len(self.hvm_boot_order.strip("acdn")) != 0:
4224 raise errors.OpPrereqError("invalid boot order specified,"
4225 " must be one or more of [acdn]"
4228 instance = self.cfg.GetInstanceInfo(
4229 self.cfg.ExpandInstanceName(self.op.instance_name))
4230 if instance is None:
4231 raise errors.OpPrereqError("No such instance name '%s'" %
4232 self.op.instance_name)
4233 self.op.instance_name = instance.name
4234 self.instance = instance
4237 def Exec(self, feedback_fn):
4238 """Modifies an instance.
4240 All parameters take effect only at the next restart of the instance.
4243 instance = self.instance
4245 instance.memory = self.mem
4246 result.append(("mem", self.mem))
4248 instance.vcpus = self.vcpus
4249 result.append(("vcpus", self.vcpus))
4251 instance.nics[0].ip = self.ip
4252 result.append(("ip", self.ip))
4254 instance.nics[0].bridge = self.bridge
4255 result.append(("bridge", self.bridge))
4257 instance.nics[0].mac = self.mac
4258 result.append(("mac", self.mac))
4259 if self.do_kernel_path:
4260 instance.kernel_path = self.kernel_path
4261 result.append(("kernel_path", self.kernel_path))
4262 if self.do_initrd_path:
4263 instance.initrd_path = self.initrd_path
4264 result.append(("initrd_path", self.initrd_path))
4265 if self.hvm_boot_order:
4266 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4267 instance.hvm_boot_order = None
4269 instance.hvm_boot_order = self.hvm_boot_order
4270 result.append(("hvm_boot_order", self.hvm_boot_order))
4272 self.cfg.AddInstance(instance)
4277 class LUQueryExports(NoHooksLU):
4278 """Query the exports list
4283 def CheckPrereq(self):
4284 """Check that the nodelist contains only existing nodes.
4287 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4289 def Exec(self, feedback_fn):
4290 """Compute the list of all the exported system images.
4293 a dictionary with the structure node->(export-list)
4294 where export-list is a list of the instances exported on
4298 return rpc.call_export_list(self.nodes)
4301 class LUExportInstance(LogicalUnit):
4302 """Export an instance to an image in the cluster.
4305 HPATH = "instance-export"
4306 HTYPE = constants.HTYPE_INSTANCE
4307 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4309 def BuildHooksEnv(self):
4312 This will run on the master, primary node and target node.
4316 "EXPORT_NODE": self.op.target_node,
4317 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4319 env.update(_BuildInstanceHookEnvByObject(self.instance))
4320 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4321 self.op.target_node]
4324 def CheckPrereq(self):
4325 """Check prerequisites.
4327 This checks that the instance name is a valid one.
4330 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4331 self.instance = self.cfg.GetInstanceInfo(instance_name)
4332 if self.instance is None:
4333 raise errors.OpPrereqError("Instance '%s' not found" %
4334 self.op.instance_name)
4337 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4338 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4340 if self.dst_node is None:
4341 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4342 self.op.target_node)
4343 self.op.target_node = self.dst_node.name
4345 def Exec(self, feedback_fn):
4346 """Export an instance to an image in the cluster.
4349 instance = self.instance
4350 dst_node = self.dst_node
4351 src_node = instance.primary_node
4352 # shutdown the instance, unless requested not to do so
4353 if self.op.shutdown:
4354 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4355 self.proc.ChainOpCode(op)
4357 vgname = self.cfg.GetVGName()
4362 for disk in instance.disks:
4363 if disk.iv_name == "sda":
4364 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4365 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4367 if not new_dev_name:
4368 logger.Error("could not snapshot block device %s on node %s" %
4369 (disk.logical_id[1], src_node))
4371 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4372 logical_id=(vgname, new_dev_name),
4373 physical_id=(vgname, new_dev_name),
4374 iv_name=disk.iv_name)
4375 snap_disks.append(new_dev)
4378 if self.op.shutdown:
4379 op = opcodes.OpStartupInstance(instance_name=instance.name,
4381 self.proc.ChainOpCode(op)
4383 # TODO: check for size
4385 for dev in snap_disks:
4386 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4388 logger.Error("could not export block device %s from node"
4390 (dev.logical_id[1], src_node, dst_node.name))
4391 if not rpc.call_blockdev_remove(src_node, dev):
4392 logger.Error("could not remove snapshot block device %s from"
4393 " node %s" % (dev.logical_id[1], src_node))
4395 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4396 logger.Error("could not finalize export for instance %s on node %s" %
4397 (instance.name, dst_node.name))
4399 nodelist = self.cfg.GetNodeList()
4400 nodelist.remove(dst_node.name)
4402 # on one-node clusters nodelist will be empty after the removal
4403 # if we proceed the backup would be removed because OpQueryExports
4404 # substitutes an empty list with the full cluster node list.
4406 op = opcodes.OpQueryExports(nodes=nodelist)
4407 exportlist = self.proc.ChainOpCode(op)
4408 for node in exportlist:
4409 if instance.name in exportlist[node]:
4410 if not rpc.call_export_remove(node, instance.name):
4411 logger.Error("could not remove older export for instance %s"
4412 " on node %s" % (instance.name, node))
4415 class TagsLU(NoHooksLU):
4418 This is an abstract class which is the parent of all the other tags LUs.
4421 def CheckPrereq(self):
4422 """Check prerequisites.
4425 if self.op.kind == constants.TAG_CLUSTER:
4426 self.target = self.cfg.GetClusterInfo()
4427 elif self.op.kind == constants.TAG_NODE:
4428 name = self.cfg.ExpandNodeName(self.op.name)
4430 raise errors.OpPrereqError("Invalid node name (%s)" %
4433 self.target = self.cfg.GetNodeInfo(name)
4434 elif self.op.kind == constants.TAG_INSTANCE:
4435 name = self.cfg.ExpandInstanceName(self.op.name)
4437 raise errors.OpPrereqError("Invalid instance name (%s)" %
4440 self.target = self.cfg.GetInstanceInfo(name)
4442 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4446 class LUGetTags(TagsLU):
4447 """Returns the tags of a given object.
4450 _OP_REQP = ["kind", "name"]
4452 def Exec(self, feedback_fn):
4453 """Returns the tag list.
4456 return self.target.GetTags()
4459 class LUSearchTags(NoHooksLU):
4460 """Searches the tags for a given pattern.
4463 _OP_REQP = ["pattern"]
4465 def CheckPrereq(self):
4466 """Check prerequisites.
4468 This checks the pattern passed for validity by compiling it.
4472 self.re = re.compile(self.op.pattern)
4473 except re.error, err:
4474 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4475 (self.op.pattern, err))
4477 def Exec(self, feedback_fn):
4478 """Returns the tag list.
4482 tgts = [("/cluster", cfg.GetClusterInfo())]
4483 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4484 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4485 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4486 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4488 for path, target in tgts:
4489 for tag in target.GetTags():
4490 if self.re.search(tag):
4491 results.append((path, tag))
4495 class LUAddTags(TagsLU):
4496 """Sets a tag on a given object.
4499 _OP_REQP = ["kind", "name", "tags"]
4501 def CheckPrereq(self):
4502 """Check prerequisites.
4504 This checks the type and length of the tag name and value.
4507 TagsLU.CheckPrereq(self)
4508 for tag in self.op.tags:
4509 objects.TaggableObject.ValidateTag(tag)
4511 def Exec(self, feedback_fn):
4516 for tag in self.op.tags:
4517 self.target.AddTag(tag)
4518 except errors.TagError, err:
4519 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4521 self.cfg.Update(self.target)
4522 except errors.ConfigurationError:
4523 raise errors.OpRetryError("There has been a modification to the"
4524 " config file and the operation has been"
4525 " aborted. Please retry.")
4528 class LUDelTags(TagsLU):
4529 """Delete a list of tags from a given object.
4532 _OP_REQP = ["kind", "name", "tags"]
4534 def CheckPrereq(self):
4535 """Check prerequisites.
4537 This checks that we have the given tag.
4540 TagsLU.CheckPrereq(self)
4541 for tag in self.op.tags:
4542 objects.TaggableObject.ValidateTag(tag)
4543 del_tags = frozenset(self.op.tags)
4544 cur_tags = self.target.GetTags()
4545 if not del_tags <= cur_tags:
4546 diff_tags = del_tags - cur_tags
4547 diff_names = ["'%s'" % tag for tag in diff_tags]
4549 raise errors.OpPrereqError("Tag(s) %s not found" %
4550 (",".join(diff_names)))
4552 def Exec(self, feedback_fn):
4553 """Remove the tag from the object.
4556 for tag in self.op.tags:
4557 self.target.RemoveTag(tag)
4559 self.cfg.Update(self.target)
4560 except errors.ConfigurationError:
4561 raise errors.OpRetryError("There has been a modification to the"
4562 " config file and the operation has been"
4563 " aborted. Please retry.")
4565 class LUTestDelay(NoHooksLU):
4566 """Sleep for a specified amount of time.
4568 This LU sleeps on the master and/or nodes for a specified amoutn of
4572 _OP_REQP = ["duration", "on_master", "on_nodes"]
4574 def CheckPrereq(self):
4575 """Check prerequisites.
4577 This checks that we have a good list of nodes and/or the duration
4582 if self.op.on_nodes:
4583 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4585 def Exec(self, feedback_fn):
4586 """Do the actual sleep.
4589 if self.op.on_master:
4590 if not utils.TestDelay(self.op.duration):
4591 raise errors.OpExecError("Error during master delay test")
4592 if self.op.on_nodes:
4593 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4595 raise errors.OpExecError("Complete failure from rpc call")
4596 for node, node_result in result.items():
4598 raise errors.OpExecError("Failure during rpc call to node %s,"
4599 " result: %s" % (node, node_result))