4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge, mac) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
279 env["INSTANCE_NIC_COUNT"] = nic_count
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285 """Builds instance related env variables for hooks from an object.
288 instance: objects.Instance object of instance
289 override: dict of values to override
292 'name': instance.name,
293 'primary_node': instance.primary_node,
294 'secondary_nodes': instance.secondary_nodes,
295 'os_type': instance.os,
296 'status': instance.os,
297 'memory': instance.memory,
298 'vcpus': instance.vcpus,
299 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302 args.update(override)
303 return _BuildInstanceHookEnv(**args)
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307 """Ensure a node has a correct known_hosts entry.
310 fullnode - Fully qualified domain name of host. (str)
311 ip - IPv4 address of host (str)
312 pubkey - the public key of the cluster
315 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327 logger.Debug('read %s' % (repr(rawline),))
329 parts = rawline.rstrip('\r\n').split()
331 # Ignore unwanted lines
332 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333 fields = parts[0].split(',')
338 for spec in [ ip, fullnode ]:
339 if spec not in fields:
344 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345 if haveall and key == pubkey:
347 save_lines.append(rawline)
348 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
351 if havesome and (not haveall or key != pubkey):
353 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
356 save_lines.append(rawline)
359 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
363 save_lines = save_lines + add_lines
365 # Write a new file and replace old.
366 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368 newfile = os.fdopen(fd, 'w')
370 newfile.write(''.join(save_lines))
373 logger.Debug("Wrote new known_hosts.")
374 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
377 # Simply appending a new line will do the trick.
379 for add in add_lines:
385 def _HasValidVG(vglist, vgname):
386 """Checks if the volume group list is valid.
388 A non-None return value means there's an error, and the return value
389 is the error message.
392 vgsize = vglist.get(vgname, None)
394 return "volume group '%s' missing" % vgname
396 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
401 def _InitSSHSetup(node):
402 """Setup the SSH configuration for the cluster.
405 This generates a dsa keypair for root, adds the pub key to the
406 permitted hosts and adds the hostkey to its own known hosts.
409 node: the name of this host as a fqdn
412 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414 for name in priv_key, pub_key:
415 if os.path.exists(name):
416 utils.CreateBackup(name)
417 utils.RemoveFile(name)
419 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
423 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
426 f = open(pub_key, 'r')
428 utils.AddAuthorizedKey(auth_keys, f.read(8192))
433 def _InitGanetiServerSetup(ss):
434 """Setup the necessary configuration for the initial node daemon.
436 This creates the nodepass file containing the shared password for
437 the cluster and also generates the SSL certificate.
440 # Create pseudo random password
441 randpass = sha.new(os.urandom(64)).hexdigest()
442 # and write it into sstore
443 ss.SetKey(ss.SS_NODED_PASS, randpass)
445 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446 "-days", str(365*5), "-nodes", "-x509",
447 "-keyout", constants.SSL_CERT_FILE,
448 "-out", constants.SSL_CERT_FILE, "-batch"])
450 raise errors.OpExecError("could not generate server ssl cert, command"
451 " %s had exitcode %s and error message %s" %
452 (result.cmd, result.exit_code, result.output))
454 os.chmod(constants.SSL_CERT_FILE, 0400)
456 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
459 raise errors.OpExecError("Could not start the node daemon, command %s"
460 " had exitcode %s and error %s" %
461 (result.cmd, result.exit_code, result.output))
464 def _CheckInstanceBridgesExist(instance):
465 """Check that the brigdes needed by an instance exist.
468 # check bridges existance
469 brlist = [nic.bridge for nic in instance.nics]
470 if not rpc.call_bridges_exist(instance.primary_node, brlist):
471 raise errors.OpPrereqError("one or more target bridges %s does not"
472 " exist on destination node '%s'" %
473 (brlist, instance.primary_node))
476 class LUInitCluster(LogicalUnit):
477 """Initialise the cluster.
480 HPATH = "cluster-init"
481 HTYPE = constants.HTYPE_CLUSTER
482 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483 "def_bridge", "master_netdev"]
486 def BuildHooksEnv(self):
489 Notes: Since we don't require a cluster, we must manually add
490 ourselves in the post-run node list.
493 env = {"OP_TARGET": self.op.cluster_name}
494 return env, [], [self.hostname.name]
496 def CheckPrereq(self):
497 """Verify that the passed name is a valid one.
500 if config.ConfigWriter.IsCluster():
501 raise errors.OpPrereqError("Cluster is already initialised")
503 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504 if not os.path.exists(constants.VNC_PASSWORD_FILE):
505 raise errors.OpPrereqError("Please prepare the cluster VNC"
507 constants.VNC_PASSWORD_FILE)
509 self.hostname = hostname = utils.HostInfo()
511 if hostname.ip.startswith("127."):
512 raise errors.OpPrereqError("This host's IP resolves to the private"
513 " range (%s). Please fix DNS or %s." %
514 (hostname.ip, constants.ETC_HOSTS))
516 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
518 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519 constants.DEFAULT_NODED_PORT):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 secondary_ip = getattr(self.op, "secondary_ip", None)
526 if secondary_ip and not utils.IsValidIP(secondary_ip):
527 raise errors.OpPrereqError("Invalid secondary ip given")
529 secondary_ip != hostname.ip and
530 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531 constants.DEFAULT_NODED_PORT))):
532 raise errors.OpPrereqError("You gave %s as secondary IP,"
533 " but it does not belong to this host." %
535 self.secondary_ip = secondary_ip
537 # checks presence of the volume group given
538 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
541 raise errors.OpPrereqError("Error: %s" % vgstatus)
543 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
545 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
548 if self.op.hypervisor_type not in constants.HYPER_TYPES:
549 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550 self.op.hypervisor_type)
552 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
554 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555 (self.op.master_netdev,
556 result.output.strip()))
558 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560 raise errors.OpPrereqError("Init.d script '%s' missing or not"
561 " executable." % constants.NODE_INITD_SCRIPT)
563 def Exec(self, feedback_fn):
564 """Initialize the cluster.
567 clustername = self.clustername
568 hostname = self.hostname
570 # set up the simple store
571 self.sstore = ss = ssconf.SimpleStore()
572 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
578 # set up the inter-node password and certificate
579 _InitGanetiServerSetup(ss)
581 # start the master ip
582 rpc.call_node_start_master(hostname.name)
584 # set up ssh config and /etc/hosts
585 f = open(constants.SSH_HOST_RSA_PUB, 'r')
590 sshkey = sshline.split(" ")[1]
592 _AddHostToEtcHosts(hostname.name)
594 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
596 _InitSSHSetup(hostname.name)
598 # init of cluster config file
599 self.cfg = cfgw = config.ConfigWriter()
600 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601 sshkey, self.op.mac_prefix,
602 self.op.vg_name, self.op.def_bridge)
605 class LUDestroyCluster(NoHooksLU):
606 """Logical unit for destroying the cluster.
611 def CheckPrereq(self):
612 """Check prerequisites.
614 This checks whether the cluster is empty.
616 Any errors are signalled by raising errors.OpPrereqError.
619 master = self.sstore.GetMasterNode()
621 nodelist = self.cfg.GetNodeList()
622 if len(nodelist) != 1 or nodelist[0] != master:
623 raise errors.OpPrereqError("There are still %d node(s) in"
624 " this cluster." % (len(nodelist) - 1))
625 instancelist = self.cfg.GetInstanceList()
627 raise errors.OpPrereqError("There are still %d instance(s) in"
628 " this cluster." % len(instancelist))
630 def Exec(self, feedback_fn):
631 """Destroys the cluster.
634 master = self.sstore.GetMasterNode()
635 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636 utils.CreateBackup(priv_key)
637 utils.CreateBackup(pub_key)
638 rpc.call_node_leave_cluster(master)
641 class LUVerifyCluster(NoHooksLU):
642 """Verifies the cluster status.
647 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648 remote_version, feedback_fn):
649 """Run multiple tests against a node.
652 - compares ganeti version
653 - checks vg existance and size > 20G
654 - checks config file checksum
655 - checks ssh to other nodes
658 node: name of the node to check
659 file_list: required list of files
660 local_cksum: dictionary of local files and their checksums
663 # compares ganeti version
664 local_version = constants.PROTOCOL_VERSION
665 if not remote_version:
666 feedback_fn(" - ERROR: connection to %s failed" % (node))
669 if local_version != remote_version:
670 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
671 (local_version, node, remote_version))
674 # checks vg existance and size > 20G
678 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
682 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
684 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
687 # checks config file checksum
690 if 'filelist' not in node_result:
692 feedback_fn(" - ERROR: node hasn't returned file checksum data")
694 remote_cksum = node_result['filelist']
695 for file_name in file_list:
696 if file_name not in remote_cksum:
698 feedback_fn(" - ERROR: file '%s' missing" % file_name)
699 elif remote_cksum[file_name] != local_cksum[file_name]:
701 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
703 if 'nodelist' not in node_result:
705 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
707 if node_result['nodelist']:
709 for node in node_result['nodelist']:
710 feedback_fn(" - ERROR: communication with node '%s': %s" %
711 (node, node_result['nodelist'][node]))
712 hyp_result = node_result.get('hypervisor', None)
713 if hyp_result is not None:
714 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
717 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718 """Verify an instance.
720 This function checks to see if the required block devices are
721 available on the instance's node.
726 instancelist = self.cfg.GetInstanceList()
727 if not instance in instancelist:
728 feedback_fn(" - ERROR: instance %s not in instance list %s" %
729 (instance, instancelist))
732 instanceconfig = self.cfg.GetInstanceInfo(instance)
733 node_current = instanceconfig.primary_node
736 instanceconfig.MapLVsByNode(node_vol_should)
738 for node in node_vol_should:
739 for volume in node_vol_should[node]:
740 if node not in node_vol_is or volume not in node_vol_is[node]:
741 feedback_fn(" - ERROR: volume %s missing on node %s" %
745 if not instanceconfig.status == 'down':
746 if not instance in node_instance[node_current]:
747 feedback_fn(" - ERROR: instance %s not running on node %s" %
748 (instance, node_current))
751 for node in node_instance:
752 if (not node == node_current):
753 if instance in node_instance[node]:
754 feedback_fn(" - ERROR: instance %s should not run on node %s" %
760 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761 """Verify if there are any unknown volumes in the cluster.
763 The .os, .swap and backup volumes are ignored. All other volumes are
769 for node in node_vol_is:
770 for volume in node_vol_is[node]:
771 if node not in node_vol_should or volume not in node_vol_should[node]:
772 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
777 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778 """Verify the list of running instances.
780 This checks what instances are running but unknown to the cluster.
784 for node in node_instance:
785 for runninginstance in node_instance[node]:
786 if runninginstance not in instancelist:
787 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
788 (runninginstance, node))
792 def CheckPrereq(self):
793 """Check prerequisites.
795 This has no prerequisites.
800 def Exec(self, feedback_fn):
801 """Verify integrity of cluster, performing various test on nodes.
805 feedback_fn("* Verifying global settings")
806 for msg in self.cfg.VerifyConfig():
807 feedback_fn(" - ERROR: %s" % msg)
809 vg_name = self.cfg.GetVGName()
810 nodelist = utils.NiceSort(self.cfg.GetNodeList())
811 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815 # FIXME: verify OS list
817 file_names = list(self.sstore.GetFileList())
818 file_names.append(constants.SSL_CERT_FILE)
819 file_names.append(constants.CLUSTER_CONF_FILE)
820 local_checksums = utils.FingerprintFiles(file_names)
822 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824 all_instanceinfo = rpc.call_instance_list(nodelist)
825 all_vglist = rpc.call_vg_list(nodelist)
826 node_verify_param = {
827 'filelist': file_names,
828 'nodelist': nodelist,
831 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832 all_rversion = rpc.call_version(nodelist)
834 for node in nodelist:
835 feedback_fn("* Verifying node %s" % node)
836 result = self._VerifyNode(node, file_names, local_checksums,
837 all_vglist[node], all_nvinfo[node],
838 all_rversion[node], feedback_fn)
842 volumeinfo = all_volumeinfo[node]
844 if isinstance(volumeinfo, basestring):
845 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
846 (node, volumeinfo[-400:].encode('string_escape')))
848 node_volume[node] = {}
849 elif not isinstance(volumeinfo, dict):
850 feedback_fn(" - ERROR: connection to %s failed" % (node,))
854 node_volume[node] = volumeinfo
857 nodeinstance = all_instanceinfo[node]
858 if type(nodeinstance) != list:
859 feedback_fn(" - ERROR: connection to %s failed" % (node,))
863 node_instance[node] = nodeinstance
867 for instance in instancelist:
868 feedback_fn("* Verifying instance %s" % instance)
869 result = self._VerifyInstance(instance, node_volume, node_instance,
873 inst_config = self.cfg.GetInstanceInfo(instance)
875 inst_config.MapLVsByNode(node_vol_should)
877 feedback_fn("* Verifying orphan volumes")
878 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
882 feedback_fn("* Verifying remaining instances")
883 result = self._VerifyOrphanInstances(instancelist, node_instance,
890 class LUVerifyDisks(NoHooksLU):
891 """Verifies the cluster disks status.
896 def CheckPrereq(self):
897 """Check prerequisites.
899 This has no prerequisites.
904 def Exec(self, feedback_fn):
905 """Verify integrity of cluster disks.
908 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
910 vg_name = self.cfg.GetVGName()
911 nodes = utils.NiceSort(self.cfg.GetNodeList())
912 instances = [self.cfg.GetInstanceInfo(name)
913 for name in self.cfg.GetInstanceList()]
916 for inst in instances:
918 if (inst.status != "up" or
919 inst.disk_template not in constants.DTS_NET_MIRROR):
921 inst.MapLVsByNode(inst_lvs)
922 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923 for node, vol_list in inst_lvs.iteritems():
925 nv_dict[(node, vol)] = inst
930 node_lvs = rpc.call_volume_list(nodes, vg_name)
937 if isinstance(lvs, basestring):
938 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
940 elif not isinstance(lvs, dict):
941 logger.Info("connection to node %s failed or invalid data returned" %
943 res_nodes.append(node)
946 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947 inst = nv_dict.pop((node, lv_name), None)
948 if (not lv_online and inst is not None
949 and inst.name not in res_instances):
950 res_instances.append(inst.name)
952 # any leftover items in nv_dict are missing LVs, let's arrange the
954 for key, inst in nv_dict.iteritems():
955 if inst.name not in res_missing:
956 res_missing[inst.name] = []
957 res_missing[inst.name].append(key)
962 class LURenameCluster(LogicalUnit):
963 """Rename the cluster.
966 HPATH = "cluster-rename"
967 HTYPE = constants.HTYPE_CLUSTER
970 def BuildHooksEnv(self):
975 "OP_TARGET": self.op.sstore.GetClusterName(),
976 "NEW_NAME": self.op.name,
978 mn = self.sstore.GetMasterNode()
979 return env, [mn], [mn]
981 def CheckPrereq(self):
982 """Verify that the passed name is a valid one.
985 hostname = utils.HostInfo(self.op.name)
987 new_name = hostname.name
988 self.ip = new_ip = hostname.ip
989 old_name = self.sstore.GetClusterName()
990 old_ip = self.sstore.GetMasterIP()
991 if new_name == old_name and new_ip == old_ip:
992 raise errors.OpPrereqError("Neither the name nor the IP address of the"
993 " cluster has changed")
995 result = utils.RunCmd(["fping", "-q", new_ip])
996 if not result.failed:
997 raise errors.OpPrereqError("The given cluster IP address (%s) is"
998 " reachable on the network. Aborting." %
1001 self.op.name = new_name
1003 def Exec(self, feedback_fn):
1004 """Rename the cluster.
1007 clustername = self.op.name
1011 # shutdown the master IP
1012 master = ss.GetMasterNode()
1013 if not rpc.call_node_stop_master(master):
1014 raise errors.OpExecError("Could not disable the master role")
1018 ss.SetKey(ss.SS_MASTER_IP, ip)
1019 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1021 # Distribute updated ss config to all nodes
1022 myself = self.cfg.GetNodeInfo(master)
1023 dist_nodes = self.cfg.GetNodeList()
1024 if myself.name in dist_nodes:
1025 dist_nodes.remove(myself.name)
1027 logger.Debug("Copying updated ssconf data to all nodes")
1028 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029 fname = ss.KeyToFilename(keyname)
1030 result = rpc.call_upload_file(dist_nodes, fname)
1031 for to_node in dist_nodes:
1032 if not result[to_node]:
1033 logger.Error("copy of file %s to node %s failed" %
1036 if not rpc.call_node_start_master(master):
1037 logger.Error("Could not re-enable the master role on the master,"
1038 " please restart manually.")
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042 """Sleep and poll for an instance's disk to sync.
1045 if not instance.disks:
1049 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1051 node = instance.primary_node
1053 for dev in instance.disks:
1054 cfgw.SetDiskID(dev, node)
1060 cumul_degraded = False
1061 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1063 proc.LogWarning("Can't get any data from node %s" % node)
1066 raise errors.RemoteError("Can't contact node %s for mirror data,"
1067 " aborting." % node)
1071 for i in range(len(rstats)):
1074 proc.LogWarning("Can't compute data for node %s/%s" %
1075 (node, instance.disks[i].iv_name))
1077 # we ignore the ldisk parameter
1078 perc_done, est_time, is_degraded, _ = mstat
1079 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080 if perc_done is not None:
1082 if est_time is not None:
1083 rem_time = "%d estimated seconds remaining" % est_time
1086 rem_time = "no time estimate"
1087 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088 (instance.disks[i].iv_name, perc_done, rem_time))
1095 time.sleep(min(60, max_time))
1101 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102 return not cumul_degraded
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106 """Check that mirrors are not degraded.
1108 The ldisk parameter, if True, will change the test from the
1109 is_degraded attribute (which represents overall non-ok status for
1110 the device(s)) to the ldisk (representing the local storage status).
1113 cfgw.SetDiskID(dev, node)
1120 if on_primary or dev.AssembleOnSecondary():
1121 rstats = rpc.call_blockdev_find(node, dev)
1123 logger.ToStderr("Can't get any data from node %s" % node)
1126 result = result and (not rstats[idx])
1128 for child in dev.children:
1129 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1134 class LUDiagnoseOS(NoHooksLU):
1135 """Logical unit for OS diagnose/query.
1140 def CheckPrereq(self):
1141 """Check prerequisites.
1143 This always succeeds, since this is a pure query LU.
1148 def Exec(self, feedback_fn):
1149 """Compute the list of OSes.
1152 node_list = self.cfg.GetNodeList()
1153 node_data = rpc.call_os_diagnose(node_list)
1154 if node_data == False:
1155 raise errors.OpExecError("Can't gather the list of OSes")
1159 class LURemoveNode(LogicalUnit):
1160 """Logical unit for removing a node.
1163 HPATH = "node-remove"
1164 HTYPE = constants.HTYPE_NODE
1165 _OP_REQP = ["node_name"]
1167 def BuildHooksEnv(self):
1170 This doesn't run on the target node in the pre phase as a failed
1171 node would not allows itself to run.
1175 "OP_TARGET": self.op.node_name,
1176 "NODE_NAME": self.op.node_name,
1178 all_nodes = self.cfg.GetNodeList()
1179 all_nodes.remove(self.op.node_name)
1180 return env, all_nodes, all_nodes
1182 def CheckPrereq(self):
1183 """Check prerequisites.
1186 - the node exists in the configuration
1187 - it does not have primary or secondary instances
1188 - it's not the master
1190 Any errors are signalled by raising errors.OpPrereqError.
1193 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1195 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1197 instance_list = self.cfg.GetInstanceList()
1199 masternode = self.sstore.GetMasterNode()
1200 if node.name == masternode:
1201 raise errors.OpPrereqError("Node is the master node,"
1202 " you need to failover first.")
1204 for instance_name in instance_list:
1205 instance = self.cfg.GetInstanceInfo(instance_name)
1206 if node.name == instance.primary_node:
1207 raise errors.OpPrereqError("Instance %s still running on the node,"
1208 " please remove first." % instance_name)
1209 if node.name in instance.secondary_nodes:
1210 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211 " please remove first." % instance_name)
1212 self.op.node_name = node.name
1215 def Exec(self, feedback_fn):
1216 """Removes the node from the cluster.
1220 logger.Info("stopping the node daemon and removing configs from node %s" %
1223 rpc.call_node_leave_cluster(node.name)
1225 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1227 logger.Info("Removing node %s from config" % node.name)
1229 self.cfg.RemoveNode(node.name)
1231 _RemoveHostFromEtcHosts(node.name)
1234 class LUQueryNodes(NoHooksLU):
1235 """Logical unit for querying nodes.
1238 _OP_REQP = ["output_fields", "names"]
1240 def CheckPrereq(self):
1241 """Check prerequisites.
1243 This checks that the fields required are valid output fields.
1246 self.dynamic_fields = frozenset(["dtotal", "dfree",
1247 "mtotal", "mnode", "mfree",
1250 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251 "pinst_list", "sinst_list",
1253 dynamic=self.dynamic_fields,
1254 selected=self.op.output_fields)
1256 self.wanted = _GetWantedNodes(self, self.op.names)
1258 def Exec(self, feedback_fn):
1259 """Computes the list of nodes and their attributes.
1262 nodenames = self.wanted
1263 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1265 # begin data gathering
1267 if self.dynamic_fields.intersection(self.op.output_fields):
1269 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270 for name in nodenames:
1271 nodeinfo = node_data.get(name, None)
1274 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279 "bootid": nodeinfo['bootid'],
1282 live_data[name] = {}
1284 live_data = dict.fromkeys(nodenames, {})
1286 node_to_primary = dict([(name, set()) for name in nodenames])
1287 node_to_secondary = dict([(name, set()) for name in nodenames])
1289 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290 "sinst_cnt", "sinst_list"))
1291 if inst_fields & frozenset(self.op.output_fields):
1292 instancelist = self.cfg.GetInstanceList()
1294 for instance_name in instancelist:
1295 inst = self.cfg.GetInstanceInfo(instance_name)
1296 if inst.primary_node in node_to_primary:
1297 node_to_primary[inst.primary_node].add(inst.name)
1298 for secnode in inst.secondary_nodes:
1299 if secnode in node_to_secondary:
1300 node_to_secondary[secnode].add(inst.name)
1302 # end data gathering
1305 for node in nodelist:
1307 for field in self.op.output_fields:
1310 elif field == "pinst_list":
1311 val = list(node_to_primary[node.name])
1312 elif field == "sinst_list":
1313 val = list(node_to_secondary[node.name])
1314 elif field == "pinst_cnt":
1315 val = len(node_to_primary[node.name])
1316 elif field == "sinst_cnt":
1317 val = len(node_to_secondary[node.name])
1318 elif field == "pip":
1319 val = node.primary_ip
1320 elif field == "sip":
1321 val = node.secondary_ip
1322 elif field in self.dynamic_fields:
1323 val = live_data[node.name].get(field, None)
1325 raise errors.ParameterError(field)
1326 node_output.append(val)
1327 output.append(node_output)
1332 class LUQueryNodeVolumes(NoHooksLU):
1333 """Logical unit for getting volumes on node(s).
1336 _OP_REQP = ["nodes", "output_fields"]
1338 def CheckPrereq(self):
1339 """Check prerequisites.
1341 This checks that the fields required are valid output fields.
1344 self.nodes = _GetWantedNodes(self, self.op.nodes)
1346 _CheckOutputFields(static=["node"],
1347 dynamic=["phys", "vg", "name", "size", "instance"],
1348 selected=self.op.output_fields)
1351 def Exec(self, feedback_fn):
1352 """Computes the list of nodes and their attributes.
1355 nodenames = self.nodes
1356 volumes = rpc.call_node_volumes(nodenames)
1358 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359 in self.cfg.GetInstanceList()]
1361 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1364 for node in nodenames:
1365 if node not in volumes or not volumes[node]:
1368 node_vols = volumes[node][:]
1369 node_vols.sort(key=lambda vol: vol['dev'])
1371 for vol in node_vols:
1373 for field in self.op.output_fields:
1376 elif field == "phys":
1380 elif field == "name":
1382 elif field == "size":
1383 val = int(float(vol['size']))
1384 elif field == "instance":
1386 if node not in lv_by_node[inst]:
1388 if vol['name'] in lv_by_node[inst][node]:
1394 raise errors.ParameterError(field)
1395 node_output.append(str(val))
1397 output.append(node_output)
1402 class LUAddNode(LogicalUnit):
1403 """Logical unit for adding node to the cluster.
1407 HTYPE = constants.HTYPE_NODE
1408 _OP_REQP = ["node_name"]
1410 def BuildHooksEnv(self):
1413 This will run on all nodes before, and on all nodes + the new node after.
1417 "OP_TARGET": self.op.node_name,
1418 "NODE_NAME": self.op.node_name,
1419 "NODE_PIP": self.op.primary_ip,
1420 "NODE_SIP": self.op.secondary_ip,
1422 nodes_0 = self.cfg.GetNodeList()
1423 nodes_1 = nodes_0 + [self.op.node_name, ]
1424 return env, nodes_0, nodes_1
1426 def CheckPrereq(self):
1427 """Check prerequisites.
1430 - the new node is not already in the config
1432 - its parameters (single/dual homed) matches the cluster
1434 Any errors are signalled by raising errors.OpPrereqError.
1437 node_name = self.op.node_name
1440 dns_data = utils.HostInfo(node_name)
1442 node = dns_data.name
1443 primary_ip = self.op.primary_ip = dns_data.ip
1444 secondary_ip = getattr(self.op, "secondary_ip", None)
1445 if secondary_ip is None:
1446 secondary_ip = primary_ip
1447 if not utils.IsValidIP(secondary_ip):
1448 raise errors.OpPrereqError("Invalid secondary IP given")
1449 self.op.secondary_ip = secondary_ip
1450 node_list = cfg.GetNodeList()
1451 if node in node_list:
1452 raise errors.OpPrereqError("Node %s is already in the configuration"
1455 for existing_node_name in node_list:
1456 existing_node = cfg.GetNodeInfo(existing_node_name)
1457 if (existing_node.primary_ip == primary_ip or
1458 existing_node.secondary_ip == primary_ip or
1459 existing_node.primary_ip == secondary_ip or
1460 existing_node.secondary_ip == secondary_ip):
1461 raise errors.OpPrereqError("New node ip address(es) conflict with"
1462 " existing node %s" % existing_node.name)
1464 # check that the type of the node (single versus dual homed) is the
1465 # same as for the master
1466 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467 master_singlehomed = myself.secondary_ip == myself.primary_ip
1468 newbie_singlehomed = secondary_ip == primary_ip
1469 if master_singlehomed != newbie_singlehomed:
1470 if master_singlehomed:
1471 raise errors.OpPrereqError("The master has no private ip but the"
1472 " new node has one")
1474 raise errors.OpPrereqError("The master has a private ip but the"
1475 " new node doesn't have one")
1477 # checks reachablity
1478 if not utils.TcpPing(utils.HostInfo().name,
1480 constants.DEFAULT_NODED_PORT):
1481 raise errors.OpPrereqError("Node not reachable by ping")
1483 if not newbie_singlehomed:
1484 # check reachability from my secondary ip to newbie's secondary ip
1485 if not utils.TcpPing(myself.secondary_ip,
1487 constants.DEFAULT_NODED_PORT):
1488 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489 " based ping to noded port")
1491 self.new_node = objects.Node(name=node,
1492 primary_ip=primary_ip,
1493 secondary_ip=secondary_ip)
1495 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498 constants.VNC_PASSWORD_FILE)
1500 def Exec(self, feedback_fn):
1501 """Adds the new node to the cluster.
1504 new_node = self.new_node
1505 node = new_node.name
1507 # set up inter-node password and certificate and restarts the node daemon
1508 gntpass = self.sstore.GetNodeDaemonPassword()
1509 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510 raise errors.OpExecError("ganeti password corruption detected")
1511 f = open(constants.SSL_CERT_FILE)
1513 gntpem = f.read(8192)
1516 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517 # so we use this to detect an invalid certificate; as long as the
1518 # cert doesn't contain this, the here-document will be correctly
1519 # parsed by the shell sequence below
1520 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522 if not gntpem.endswith("\n"):
1523 raise errors.OpExecError("PEM must end with newline")
1524 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1526 # and then connect with ssh to set password and start ganeti-noded
1527 # note that all the below variables are sanitized at this point,
1528 # either by being constants or by the checks above
1530 mycommand = ("umask 077 && "
1531 "echo '%s' > '%s' && "
1532 "cat > '%s' << '!EOF.' && \n"
1533 "%s!EOF.\n%s restart" %
1534 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535 constants.SSL_CERT_FILE, gntpem,
1536 constants.NODE_INITD_SCRIPT))
1538 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1540 raise errors.OpExecError("Remote command on node %s, error: %s,"
1542 (node, result.fail_reason, result.output))
1544 # check connectivity
1547 result = rpc.call_version([node])[node]
1549 if constants.PROTOCOL_VERSION == result:
1550 logger.Info("communication to node %s fine, sw version %s match" %
1553 raise errors.OpExecError("Version mismatch master version %s,"
1554 " node version %s" %
1555 (constants.PROTOCOL_VERSION, result))
1557 raise errors.OpExecError("Cannot get version from the new node")
1560 logger.Info("copy ssh key to node %s" % node)
1561 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1563 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1570 keyarray.append(f.read())
1574 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575 keyarray[3], keyarray[4], keyarray[5])
1578 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1580 # Add node to our /etc/hosts, and add key to known_hosts
1581 _AddHostToEtcHosts(new_node.name)
1583 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584 self.cfg.GetHostKey())
1586 if new_node.secondary_ip != new_node.primary_ip:
1587 if not rpc.call_node_tcp_ping(new_node.name,
1588 constants.LOCALHOST_IP_ADDRESS,
1589 new_node.secondary_ip,
1590 constants.DEFAULT_NODED_PORT,
1592 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593 " you gave (%s). Please fix and re-run this"
1594 " command." % new_node.secondary_ip)
1596 success, msg = ssh.VerifyNodeHostname(node)
1598 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599 " than the one the resolver gives: %s."
1600 " Please fix and re-run this command." %
1603 # Distribute updated /etc/hosts and known_hosts to all nodes,
1604 # including the node just added
1605 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606 dist_nodes = self.cfg.GetNodeList() + [node]
1607 if myself.name in dist_nodes:
1608 dist_nodes.remove(myself.name)
1610 logger.Debug("Copying hosts and known_hosts to all nodes")
1611 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1612 result = rpc.call_upload_file(dist_nodes, fname)
1613 for to_node in dist_nodes:
1614 if not result[to_node]:
1615 logger.Error("copy of file %s to node %s failed" %
1618 to_copy = ss.GetFileList()
1619 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620 to_copy.append(constants.VNC_PASSWORD_FILE)
1621 for fname in to_copy:
1622 if not ssh.CopyFileToNode(node, fname):
1623 logger.Error("could not copy file %s to node %s" % (fname, node))
1625 logger.Info("adding node %s to cluster.conf" % node)
1626 self.cfg.AddNode(new_node)
1629 class LUMasterFailover(LogicalUnit):
1630 """Failover the master node to the current node.
1632 This is a special LU in that it must run on a non-master node.
1635 HPATH = "master-failover"
1636 HTYPE = constants.HTYPE_CLUSTER
1640 def BuildHooksEnv(self):
1643 This will run on the new master only in the pre phase, and on all
1644 the nodes in the post phase.
1648 "OP_TARGET": self.new_master,
1649 "NEW_MASTER": self.new_master,
1650 "OLD_MASTER": self.old_master,
1652 return env, [self.new_master], self.cfg.GetNodeList()
1654 def CheckPrereq(self):
1655 """Check prerequisites.
1657 This checks that we are not already the master.
1660 self.new_master = utils.HostInfo().name
1661 self.old_master = self.sstore.GetMasterNode()
1663 if self.old_master == self.new_master:
1664 raise errors.OpPrereqError("This commands must be run on the node"
1665 " where you want the new master to be."
1666 " %s is already the master" %
1669 def Exec(self, feedback_fn):
1670 """Failover the master node.
1672 This command, when run on a non-master node, will cause the current
1673 master to cease being master, and the non-master to become new
1677 #TODO: do not rely on gethostname returning the FQDN
1678 logger.Info("setting master to %s, old master: %s" %
1679 (self.new_master, self.old_master))
1681 if not rpc.call_node_stop_master(self.old_master):
1682 logger.Error("could disable the master role on the old master"
1683 " %s, please disable manually" % self.old_master)
1686 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689 logger.Error("could not distribute the new simple store master file"
1690 " to the other nodes, please check.")
1692 if not rpc.call_node_start_master(self.new_master):
1693 logger.Error("could not start the master role on the new master"
1694 " %s, please check" % self.new_master)
1695 feedback_fn("Error in activating the master IP on the new master,"
1696 " please fix manually.")
1700 class LUQueryClusterInfo(NoHooksLU):
1701 """Query cluster configuration.
1707 def CheckPrereq(self):
1708 """No prerequsites needed for this LU.
1713 def Exec(self, feedback_fn):
1714 """Return cluster config.
1718 "name": self.sstore.GetClusterName(),
1719 "software_version": constants.RELEASE_VERSION,
1720 "protocol_version": constants.PROTOCOL_VERSION,
1721 "config_version": constants.CONFIG_VERSION,
1722 "os_api_version": constants.OS_API_VERSION,
1723 "export_version": constants.EXPORT_VERSION,
1724 "master": self.sstore.GetMasterNode(),
1725 "architecture": (platform.architecture()[0], platform.machine()),
1731 class LUClusterCopyFile(NoHooksLU):
1732 """Copy file to cluster.
1735 _OP_REQP = ["nodes", "filename"]
1737 def CheckPrereq(self):
1738 """Check prerequisites.
1740 It should check that the named file exists and that the given list
1744 if not os.path.exists(self.op.filename):
1745 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1747 self.nodes = _GetWantedNodes(self, self.op.nodes)
1749 def Exec(self, feedback_fn):
1750 """Copy a file from master to some nodes.
1753 opts - class with options as members
1754 args - list containing a single element, the file name
1756 nodes - list containing the name of target nodes; if empty, all nodes
1759 filename = self.op.filename
1761 myname = utils.HostInfo().name
1763 for node in self.nodes:
1766 if not ssh.CopyFileToNode(node, filename):
1767 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1770 class LUDumpClusterConfig(NoHooksLU):
1771 """Return a text-representation of the cluster-config.
1776 def CheckPrereq(self):
1777 """No prerequisites.
1782 def Exec(self, feedback_fn):
1783 """Dump a representation of the cluster config to the standard output.
1786 return self.cfg.DumpConfig()
1789 class LURunClusterCommand(NoHooksLU):
1790 """Run a command on some nodes.
1793 _OP_REQP = ["command", "nodes"]
1795 def CheckPrereq(self):
1796 """Check prerequisites.
1798 It checks that the given list of nodes is valid.
1801 self.nodes = _GetWantedNodes(self, self.op.nodes)
1803 def Exec(self, feedback_fn):
1804 """Run a command on some nodes.
1808 for node in self.nodes:
1809 result = ssh.SSHCall(node, "root", self.op.command)
1810 data.append((node, result.output, result.exit_code))
1815 class LUActivateInstanceDisks(NoHooksLU):
1816 """Bring up an instance's disks.
1819 _OP_REQP = ["instance_name"]
1821 def CheckPrereq(self):
1822 """Check prerequisites.
1824 This checks that the instance is in the cluster.
1827 instance = self.cfg.GetInstanceInfo(
1828 self.cfg.ExpandInstanceName(self.op.instance_name))
1829 if instance is None:
1830 raise errors.OpPrereqError("Instance '%s' not known" %
1831 self.op.instance_name)
1832 self.instance = instance
1835 def Exec(self, feedback_fn):
1836 """Activate the disks.
1839 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1841 raise errors.OpExecError("Cannot activate block devices")
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847 """Prepare the block devices for an instance.
1849 This sets up the block devices on all nodes.
1852 instance: a ganeti.objects.Instance object
1853 ignore_secondaries: if true, errors on secondary nodes won't result
1854 in an error return from the function
1857 false if the operation failed
1858 list of (host, instance_visible_name, node_visible_name) if the operation
1859 suceeded with the mapping from node devices to instance devices
1863 iname = instance.name
1864 # With the two passes mechanism we try to reduce the window of
1865 # opportunity for the race condition of switching DRBD to primary
1866 # before handshaking occured, but we do not eliminate it
1868 # The proper fix would be to wait (with some limits) until the
1869 # connection has been made and drbd transitions from WFConnection
1870 # into any other network-connected state (Connected, SyncTarget,
1873 # 1st pass, assemble on all nodes in secondary mode
1874 for inst_disk in instance.disks:
1875 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876 cfg.SetDiskID(node_disk, node)
1877 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1879 logger.Error("could not prepare block device %s on node %s"
1880 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881 if not ignore_secondaries:
1884 # FIXME: race condition on drbd migration to primary
1886 # 2nd pass, do only the primary node
1887 for inst_disk in instance.disks:
1888 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889 if node != instance.primary_node:
1891 cfg.SetDiskID(node_disk, node)
1892 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1894 logger.Error("could not prepare block device %s on node %s"
1895 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1897 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1899 # leave the disks configured for the primary node
1900 # this is a workaround that would be fixed better by
1901 # improving the logical/physical id handling
1902 for disk in instance.disks:
1903 cfg.SetDiskID(disk, instance.primary_node)
1905 return disks_ok, device_info
1908 def _StartInstanceDisks(cfg, instance, force):
1909 """Start the disks of an instance.
1912 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913 ignore_secondaries=force)
1915 _ShutdownInstanceDisks(instance, cfg)
1916 if force is not None and not force:
1917 logger.Error("If the message above refers to a secondary node,"
1918 " you can retry the operation using '--force'.")
1919 raise errors.OpExecError("Disk consistency error")
1922 class LUDeactivateInstanceDisks(NoHooksLU):
1923 """Shutdown an instance's disks.
1926 _OP_REQP = ["instance_name"]
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 self.instance = instance
1941 def Exec(self, feedback_fn):
1942 """Deactivate the disks
1945 instance = self.instance
1946 ins_l = rpc.call_instance_list([instance.primary_node])
1947 ins_l = ins_l[instance.primary_node]
1948 if not type(ins_l) is list:
1949 raise errors.OpExecError("Can't contact node '%s'" %
1950 instance.primary_node)
1952 if self.instance.name in ins_l:
1953 raise errors.OpExecError("Instance is running, can't shutdown"
1956 _ShutdownInstanceDisks(instance, self.cfg)
1959 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960 """Shutdown block devices of an instance.
1962 This does the shutdown on all nodes of the instance.
1964 If the ignore_primary is false, errors on the primary node are
1969 for disk in instance.disks:
1970 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971 cfg.SetDiskID(top_disk, node)
1972 if not rpc.call_blockdev_shutdown(node, top_disk):
1973 logger.Error("could not shutdown block device %s on node %s" %
1974 (disk.iv_name, node))
1975 if not ignore_primary or node != instance.primary_node:
1980 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981 """Checks if a node has enough free memory.
1983 This function check if a given node has the needed amount of free
1984 memory. In case the node has less memory or we cannot get the
1985 information from the node, this function raise an OpPrereqError
1989 - cfg: a ConfigWriter instance
1990 - node: the node name
1991 - reason: string to use in the error message
1992 - requested: the amount of memory in MiB
1995 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996 if not nodeinfo or not isinstance(nodeinfo, dict):
1997 raise errors.OpPrereqError("Could not contact node %s for resource"
1998 " information" % (node,))
2000 free_mem = nodeinfo[node].get('memory_free')
2001 if not isinstance(free_mem, int):
2002 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003 " was '%s'" % (node, free_mem))
2004 if requested > free_mem:
2005 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006 " needed %s MiB, available %s MiB" %
2007 (node, reason, requested, free_mem))
2010 class LUStartupInstance(LogicalUnit):
2011 """Starts an instance.
2014 HPATH = "instance-start"
2015 HTYPE = constants.HTYPE_INSTANCE
2016 _OP_REQP = ["instance_name", "force"]
2018 def BuildHooksEnv(self):
2021 This runs on master, primary and secondary nodes of the instance.
2025 "FORCE": self.op.force,
2027 env.update(_BuildInstanceHookEnvByObject(self.instance))
2028 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029 list(self.instance.secondary_nodes))
2032 def CheckPrereq(self):
2033 """Check prerequisites.
2035 This checks that the instance is in the cluster.
2038 instance = self.cfg.GetInstanceInfo(
2039 self.cfg.ExpandInstanceName(self.op.instance_name))
2040 if instance is None:
2041 raise errors.OpPrereqError("Instance '%s' not known" %
2042 self.op.instance_name)
2044 # check bridges existance
2045 _CheckInstanceBridgesExist(instance)
2047 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048 "starting instance %s" % instance.name,
2051 self.instance = instance
2052 self.op.instance_name = instance.name
2054 def Exec(self, feedback_fn):
2055 """Start the instance.
2058 instance = self.instance
2059 force = self.op.force
2060 extra_args = getattr(self.op, "extra_args", "")
2062 node_current = instance.primary_node
2064 _StartInstanceDisks(self.cfg, instance, force)
2066 if not rpc.call_instance_start(node_current, instance, extra_args):
2067 _ShutdownInstanceDisks(instance, self.cfg)
2068 raise errors.OpExecError("Could not start instance")
2070 self.cfg.MarkInstanceUp(instance.name)
2073 class LURebootInstance(LogicalUnit):
2074 """Reboot an instance.
2077 HPATH = "instance-reboot"
2078 HTYPE = constants.HTYPE_INSTANCE
2079 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2081 def BuildHooksEnv(self):
2084 This runs on master, primary and secondary nodes of the instance.
2088 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2090 env.update(_BuildInstanceHookEnvByObject(self.instance))
2091 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092 list(self.instance.secondary_nodes))
2095 def CheckPrereq(self):
2096 """Check prerequisites.
2098 This checks that the instance is in the cluster.
2101 instance = self.cfg.GetInstanceInfo(
2102 self.cfg.ExpandInstanceName(self.op.instance_name))
2103 if instance is None:
2104 raise errors.OpPrereqError("Instance '%s' not known" %
2105 self.op.instance_name)
2107 # check bridges existance
2108 _CheckInstanceBridgesExist(instance)
2110 self.instance = instance
2111 self.op.instance_name = instance.name
2113 def Exec(self, feedback_fn):
2114 """Reboot the instance.
2117 instance = self.instance
2118 ignore_secondaries = self.op.ignore_secondaries
2119 reboot_type = self.op.reboot_type
2120 extra_args = getattr(self.op, "extra_args", "")
2122 node_current = instance.primary_node
2124 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125 constants.INSTANCE_REBOOT_HARD,
2126 constants.INSTANCE_REBOOT_FULL]:
2127 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128 (constants.INSTANCE_REBOOT_SOFT,
2129 constants.INSTANCE_REBOOT_HARD,
2130 constants.INSTANCE_REBOOT_FULL))
2132 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133 constants.INSTANCE_REBOOT_HARD]:
2134 if not rpc.call_instance_reboot(node_current, instance,
2135 reboot_type, extra_args):
2136 raise errors.OpExecError("Could not reboot instance")
2138 if not rpc.call_instance_shutdown(node_current, instance):
2139 raise errors.OpExecError("could not shutdown instance for full reboot")
2140 _ShutdownInstanceDisks(instance, self.cfg)
2141 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142 if not rpc.call_instance_start(node_current, instance, extra_args):
2143 _ShutdownInstanceDisks(instance, self.cfg)
2144 raise errors.OpExecError("Could not start instance for full reboot")
2146 self.cfg.MarkInstanceUp(instance.name)
2149 class LUShutdownInstance(LogicalUnit):
2150 """Shutdown an instance.
2153 HPATH = "instance-stop"
2154 HTYPE = constants.HTYPE_INSTANCE
2155 _OP_REQP = ["instance_name"]
2157 def BuildHooksEnv(self):
2160 This runs on master, primary and secondary nodes of the instance.
2163 env = _BuildInstanceHookEnvByObject(self.instance)
2164 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165 list(self.instance.secondary_nodes))
2168 def CheckPrereq(self):
2169 """Check prerequisites.
2171 This checks that the instance is in the cluster.
2174 instance = self.cfg.GetInstanceInfo(
2175 self.cfg.ExpandInstanceName(self.op.instance_name))
2176 if instance is None:
2177 raise errors.OpPrereqError("Instance '%s' not known" %
2178 self.op.instance_name)
2179 self.instance = instance
2181 def Exec(self, feedback_fn):
2182 """Shutdown the instance.
2185 instance = self.instance
2186 node_current = instance.primary_node
2187 if not rpc.call_instance_shutdown(node_current, instance):
2188 logger.Error("could not shutdown instance")
2190 self.cfg.MarkInstanceDown(instance.name)
2191 _ShutdownInstanceDisks(instance, self.cfg)
2194 class LUReinstallInstance(LogicalUnit):
2195 """Reinstall an instance.
2198 HPATH = "instance-reinstall"
2199 HTYPE = constants.HTYPE_INSTANCE
2200 _OP_REQP = ["instance_name"]
2202 def BuildHooksEnv(self):
2205 This runs on master, primary and secondary nodes of the instance.
2208 env = _BuildInstanceHookEnvByObject(self.instance)
2209 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210 list(self.instance.secondary_nodes))
2213 def CheckPrereq(self):
2214 """Check prerequisites.
2216 This checks that the instance is in the cluster and is not running.
2219 instance = self.cfg.GetInstanceInfo(
2220 self.cfg.ExpandInstanceName(self.op.instance_name))
2221 if instance is None:
2222 raise errors.OpPrereqError("Instance '%s' not known" %
2223 self.op.instance_name)
2224 if instance.disk_template == constants.DT_DISKLESS:
2225 raise errors.OpPrereqError("Instance '%s' has no disks" %
2226 self.op.instance_name)
2227 if instance.status != "down":
2228 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229 self.op.instance_name)
2230 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2232 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233 (self.op.instance_name,
2234 instance.primary_node))
2236 self.op.os_type = getattr(self.op, "os_type", None)
2237 if self.op.os_type is not None:
2239 pnode = self.cfg.GetNodeInfo(
2240 self.cfg.ExpandNodeName(instance.primary_node))
2242 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2244 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2246 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247 " primary node" % self.op.os_type)
2249 self.instance = instance
2251 def Exec(self, feedback_fn):
2252 """Reinstall the instance.
2255 inst = self.instance
2257 if self.op.os_type is not None:
2258 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259 inst.os = self.op.os_type
2260 self.cfg.AddInstance(inst)
2262 _StartInstanceDisks(self.cfg, inst, None)
2264 feedback_fn("Running the instance OS create scripts...")
2265 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266 raise errors.OpExecError("Could not install OS for instance %s"
2268 (inst.name, inst.primary_node))
2270 _ShutdownInstanceDisks(inst, self.cfg)
2273 class LURenameInstance(LogicalUnit):
2274 """Rename an instance.
2277 HPATH = "instance-rename"
2278 HTYPE = constants.HTYPE_INSTANCE
2279 _OP_REQP = ["instance_name", "new_name"]
2281 def BuildHooksEnv(self):
2284 This runs on master, primary and secondary nodes of the instance.
2287 env = _BuildInstanceHookEnvByObject(self.instance)
2288 env["INSTANCE_NEW_NAME"] = self.op.new_name
2289 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290 list(self.instance.secondary_nodes))
2293 def CheckPrereq(self):
2294 """Check prerequisites.
2296 This checks that the instance is in the cluster and is not running.
2299 instance = self.cfg.GetInstanceInfo(
2300 self.cfg.ExpandInstanceName(self.op.instance_name))
2301 if instance is None:
2302 raise errors.OpPrereqError("Instance '%s' not known" %
2303 self.op.instance_name)
2304 if instance.status != "down":
2305 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306 self.op.instance_name)
2307 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2309 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310 (self.op.instance_name,
2311 instance.primary_node))
2312 self.instance = instance
2314 # new name verification
2315 name_info = utils.HostInfo(self.op.new_name)
2317 self.op.new_name = new_name = name_info.name
2318 instance_list = self.cfg.GetInstanceList()
2319 if new_name in instance_list:
2320 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2323 if not getattr(self.op, "ignore_ip", False):
2324 command = ["fping", "-q", name_info.ip]
2325 result = utils.RunCmd(command)
2326 if not result.failed:
2327 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2328 (name_info.ip, new_name))
2331 def Exec(self, feedback_fn):
2332 """Reinstall the instance.
2335 inst = self.instance
2336 old_name = inst.name
2338 self.cfg.RenameInstance(inst.name, self.op.new_name)
2340 # re-read the instance from the configuration after rename
2341 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2343 _StartInstanceDisks(self.cfg, inst, None)
2345 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2347 msg = ("Could run OS rename script for instance %s on node %s (but the"
2348 " instance has been renamed in Ganeti)" %
2349 (inst.name, inst.primary_node))
2352 _ShutdownInstanceDisks(inst, self.cfg)
2355 class LURemoveInstance(LogicalUnit):
2356 """Remove an instance.
2359 HPATH = "instance-remove"
2360 HTYPE = constants.HTYPE_INSTANCE
2361 _OP_REQP = ["instance_name"]
2363 def BuildHooksEnv(self):
2366 This runs on master, primary and secondary nodes of the instance.
2369 env = _BuildInstanceHookEnvByObject(self.instance)
2370 nl = [self.sstore.GetMasterNode()]
2373 def CheckPrereq(self):
2374 """Check prerequisites.
2376 This checks that the instance is in the cluster.
2379 instance = self.cfg.GetInstanceInfo(
2380 self.cfg.ExpandInstanceName(self.op.instance_name))
2381 if instance is None:
2382 raise errors.OpPrereqError("Instance '%s' not known" %
2383 self.op.instance_name)
2384 self.instance = instance
2386 def Exec(self, feedback_fn):
2387 """Remove the instance.
2390 instance = self.instance
2391 logger.Info("shutting down instance %s on node %s" %
2392 (instance.name, instance.primary_node))
2394 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2395 if self.op.ignore_failures:
2396 feedback_fn("Warning: can't shutdown instance")
2398 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2399 (instance.name, instance.primary_node))
2401 logger.Info("removing block devices for instance %s" % instance.name)
2403 if not _RemoveDisks(instance, self.cfg):
2404 if self.op.ignore_failures:
2405 feedback_fn("Warning: can't remove instance's disks")
2407 raise errors.OpExecError("Can't remove instance's disks")
2409 logger.Info("removing instance %s out of cluster config" % instance.name)
2411 self.cfg.RemoveInstance(instance.name)
2414 class LUQueryInstances(NoHooksLU):
2415 """Logical unit for querying instances.
2418 _OP_REQP = ["output_fields", "names"]
2420 def CheckPrereq(self):
2421 """Check prerequisites.
2423 This checks that the fields required are valid output fields.
2426 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2427 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2428 "admin_state", "admin_ram",
2429 "disk_template", "ip", "mac", "bridge",
2430 "sda_size", "sdb_size", "vcpus"],
2431 dynamic=self.dynamic_fields,
2432 selected=self.op.output_fields)
2434 self.wanted = _GetWantedInstances(self, self.op.names)
2436 def Exec(self, feedback_fn):
2437 """Computes the list of nodes and their attributes.
2440 instance_names = self.wanted
2441 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2444 # begin data gathering
2446 nodes = frozenset([inst.primary_node for inst in instance_list])
2449 if self.dynamic_fields.intersection(self.op.output_fields):
2451 node_data = rpc.call_all_instances_info(nodes)
2453 result = node_data[name]
2455 live_data.update(result)
2456 elif result == False:
2457 bad_nodes.append(name)
2458 # else no instance is alive
2460 live_data = dict([(name, {}) for name in instance_names])
2462 # end data gathering
2465 for instance in instance_list:
2467 for field in self.op.output_fields:
2472 elif field == "pnode":
2473 val = instance.primary_node
2474 elif field == "snodes":
2475 val = list(instance.secondary_nodes)
2476 elif field == "admin_state":
2477 val = (instance.status != "down")
2478 elif field == "oper_state":
2479 if instance.primary_node in bad_nodes:
2482 val = bool(live_data.get(instance.name))
2483 elif field == "status":
2484 if instance.primary_node in bad_nodes:
2485 val = "ERROR_nodedown"
2487 running = bool(live_data.get(instance.name))
2489 if instance.status != "down":
2494 if instance.status != "down":
2498 elif field == "admin_ram":
2499 val = instance.memory
2500 elif field == "oper_ram":
2501 if instance.primary_node in bad_nodes:
2503 elif instance.name in live_data:
2504 val = live_data[instance.name].get("memory", "?")
2507 elif field == "disk_template":
2508 val = instance.disk_template
2510 val = instance.nics[0].ip
2511 elif field == "bridge":
2512 val = instance.nics[0].bridge
2513 elif field == "mac":
2514 val = instance.nics[0].mac
2515 elif field == "sda_size" or field == "sdb_size":
2516 disk = instance.FindDisk(field[:3])
2521 elif field == "vcpus":
2522 val = instance.vcpus
2524 raise errors.ParameterError(field)
2531 class LUFailoverInstance(LogicalUnit):
2532 """Failover an instance.
2535 HPATH = "instance-failover"
2536 HTYPE = constants.HTYPE_INSTANCE
2537 _OP_REQP = ["instance_name", "ignore_consistency"]
2539 def BuildHooksEnv(self):
2542 This runs on master, primary and secondary nodes of the instance.
2546 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2548 env.update(_BuildInstanceHookEnvByObject(self.instance))
2549 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2552 def CheckPrereq(self):
2553 """Check prerequisites.
2555 This checks that the instance is in the cluster.
2558 instance = self.cfg.GetInstanceInfo(
2559 self.cfg.ExpandInstanceName(self.op.instance_name))
2560 if instance is None:
2561 raise errors.OpPrereqError("Instance '%s' not known" %
2562 self.op.instance_name)
2564 if instance.disk_template not in constants.DTS_NET_MIRROR:
2565 raise errors.OpPrereqError("Instance's disk layout is not"
2566 " network mirrored, cannot failover.")
2568 secondary_nodes = instance.secondary_nodes
2569 if not secondary_nodes:
2570 raise errors.ProgrammerError("no secondary node but using "
2571 "DT_REMOTE_RAID1 template")
2573 target_node = secondary_nodes[0]
2574 # check memory requirements on the secondary node
2575 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2576 instance.name, instance.memory)
2578 # check bridge existance
2579 brlist = [nic.bridge for nic in instance.nics]
2580 if not rpc.call_bridges_exist(target_node, brlist):
2581 raise errors.OpPrereqError("One or more target bridges %s does not"
2582 " exist on destination node '%s'" %
2583 (brlist, target_node))
2585 self.instance = instance
2587 def Exec(self, feedback_fn):
2588 """Failover an instance.
2590 The failover is done by shutting it down on its present node and
2591 starting it on the secondary.
2594 instance = self.instance
2596 source_node = instance.primary_node
2597 target_node = instance.secondary_nodes[0]
2599 feedback_fn("* checking disk consistency between source and target")
2600 for dev in instance.disks:
2601 # for remote_raid1, these are md over drbd
2602 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2603 if not self.op.ignore_consistency:
2604 raise errors.OpExecError("Disk %s is degraded on target node,"
2605 " aborting failover." % dev.iv_name)
2607 feedback_fn("* shutting down instance on source node")
2608 logger.Info("Shutting down instance %s on node %s" %
2609 (instance.name, source_node))
2611 if not rpc.call_instance_shutdown(source_node, instance):
2612 if self.op.ignore_consistency:
2613 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2614 " anyway. Please make sure node %s is down" %
2615 (instance.name, source_node, source_node))
2617 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2618 (instance.name, source_node))
2620 feedback_fn("* deactivating the instance's disks on source node")
2621 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2622 raise errors.OpExecError("Can't shut down the instance's disks.")
2624 instance.primary_node = target_node
2625 # distribute new instance config to the other nodes
2626 self.cfg.AddInstance(instance)
2628 feedback_fn("* activating the instance's disks on target node")
2629 logger.Info("Starting instance %s on node %s" %
2630 (instance.name, target_node))
2632 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2633 ignore_secondaries=True)
2635 _ShutdownInstanceDisks(instance, self.cfg)
2636 raise errors.OpExecError("Can't activate the instance's disks")
2638 feedback_fn("* starting the instance on the target node")
2639 if not rpc.call_instance_start(target_node, instance, None):
2640 _ShutdownInstanceDisks(instance, self.cfg)
2641 raise errors.OpExecError("Could not start instance %s on node %s." %
2642 (instance.name, target_node))
2645 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2646 """Create a tree of block devices on the primary node.
2648 This always creates all devices.
2652 for child in device.children:
2653 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2656 cfg.SetDiskID(device, node)
2657 new_id = rpc.call_blockdev_create(node, device, device.size,
2658 instance.name, True, info)
2661 if device.physical_id is None:
2662 device.physical_id = new_id
2666 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2667 """Create a tree of block devices on a secondary node.
2669 If this device type has to be created on secondaries, create it and
2672 If not, just recurse to children keeping the same 'force' value.
2675 if device.CreateOnSecondary():
2678 for child in device.children:
2679 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2680 child, force, info):
2685 cfg.SetDiskID(device, node)
2686 new_id = rpc.call_blockdev_create(node, device, device.size,
2687 instance.name, False, info)
2690 if device.physical_id is None:
2691 device.physical_id = new_id
2695 def _GenerateUniqueNames(cfg, exts):
2696 """Generate a suitable LV name.
2698 This will generate a logical volume name for the given instance.
2703 new_id = cfg.GenerateUniqueID()
2704 results.append("%s%s" % (new_id, val))
2708 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2709 """Generate a drbd device complete with its children.
2712 port = cfg.AllocatePort()
2713 vgname = cfg.GetVGName()
2714 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2715 logical_id=(vgname, names[0]))
2716 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2717 logical_id=(vgname, names[1]))
2718 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2719 logical_id = (primary, secondary, port),
2720 children = [dev_data, dev_meta])
2724 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2725 """Generate a drbd8 device complete with its children.
2728 port = cfg.AllocatePort()
2729 vgname = cfg.GetVGName()
2730 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2731 logical_id=(vgname, names[0]))
2732 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2733 logical_id=(vgname, names[1]))
2734 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2735 logical_id = (primary, secondary, port),
2736 children = [dev_data, dev_meta],
2740 def _GenerateDiskTemplate(cfg, template_name,
2741 instance_name, primary_node,
2742 secondary_nodes, disk_sz, swap_sz):
2743 """Generate the entire disk layout for a given template type.
2746 #TODO: compute space requirements
2748 vgname = cfg.GetVGName()
2749 if template_name == "diskless":
2751 elif template_name == "plain":
2752 if len(secondary_nodes) != 0:
2753 raise errors.ProgrammerError("Wrong template configuration")
2755 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2756 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2757 logical_id=(vgname, names[0]),
2759 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2760 logical_id=(vgname, names[1]),
2762 disks = [sda_dev, sdb_dev]
2763 elif template_name == "local_raid1":
2764 if len(secondary_nodes) != 0:
2765 raise errors.ProgrammerError("Wrong template configuration")
2768 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2769 ".sdb_m1", ".sdb_m2"])
2770 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2771 logical_id=(vgname, names[0]))
2772 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2773 logical_id=(vgname, names[1]))
2774 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2776 children = [sda_dev_m1, sda_dev_m2])
2777 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2778 logical_id=(vgname, names[2]))
2779 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2780 logical_id=(vgname, names[3]))
2781 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2783 children = [sdb_dev_m1, sdb_dev_m2])
2784 disks = [md_sda_dev, md_sdb_dev]
2785 elif template_name == constants.DT_REMOTE_RAID1:
2786 if len(secondary_nodes) != 1:
2787 raise errors.ProgrammerError("Wrong template configuration")
2788 remote_node = secondary_nodes[0]
2789 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2790 ".sdb_data", ".sdb_meta"])
2791 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2792 disk_sz, names[0:2])
2793 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2794 children = [drbd_sda_dev], size=disk_sz)
2795 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2796 swap_sz, names[2:4])
2797 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2798 children = [drbd_sdb_dev], size=swap_sz)
2799 disks = [md_sda_dev, md_sdb_dev]
2800 elif template_name == constants.DT_DRBD8:
2801 if len(secondary_nodes) != 1:
2802 raise errors.ProgrammerError("Wrong template configuration")
2803 remote_node = secondary_nodes[0]
2804 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2805 ".sdb_data", ".sdb_meta"])
2806 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2807 disk_sz, names[0:2], "sda")
2808 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809 swap_sz, names[2:4], "sdb")
2810 disks = [drbd_sda_dev, drbd_sdb_dev]
2812 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2816 def _GetInstanceInfoText(instance):
2817 """Compute that text that should be added to the disk's metadata.
2820 return "originstname+%s" % instance.name
2823 def _CreateDisks(cfg, instance):
2824 """Create all disks for an instance.
2826 This abstracts away some work from AddInstance.
2829 instance: the instance object
2832 True or False showing the success of the creation process
2835 info = _GetInstanceInfoText(instance)
2837 for device in instance.disks:
2838 logger.Info("creating volume %s for instance %s" %
2839 (device.iv_name, instance.name))
2841 for secondary_node in instance.secondary_nodes:
2842 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2843 device, False, info):
2844 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2845 (device.iv_name, device, secondary_node))
2848 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2849 instance, device, info):
2850 logger.Error("failed to create volume %s on primary!" %
2856 def _RemoveDisks(instance, cfg):
2857 """Remove all disks for an instance.
2859 This abstracts away some work from `AddInstance()` and
2860 `RemoveInstance()`. Note that in case some of the devices couldn't
2861 be removed, the removal will continue with the other ones (compare
2862 with `_CreateDisks()`).
2865 instance: the instance object
2868 True or False showing the success of the removal proces
2871 logger.Info("removing block devices for instance %s" % instance.name)
2874 for device in instance.disks:
2875 for node, disk in device.ComputeNodeTree(instance.primary_node):
2876 cfg.SetDiskID(disk, node)
2877 if not rpc.call_blockdev_remove(node, disk):
2878 logger.Error("could not remove block device %s on node %s,"
2879 " continuing anyway" %
2880 (device.iv_name, node))
2885 class LUCreateInstance(LogicalUnit):
2886 """Create an instance.
2889 HPATH = "instance-add"
2890 HTYPE = constants.HTYPE_INSTANCE
2891 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2892 "disk_template", "swap_size", "mode", "start", "vcpus",
2893 "wait_for_sync", "ip_check", "mac"]
2895 def BuildHooksEnv(self):
2898 This runs on master, primary and secondary nodes of the instance.
2902 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2903 "INSTANCE_DISK_SIZE": self.op.disk_size,
2904 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2905 "INSTANCE_ADD_MODE": self.op.mode,
2907 if self.op.mode == constants.INSTANCE_IMPORT:
2908 env["INSTANCE_SRC_NODE"] = self.op.src_node
2909 env["INSTANCE_SRC_PATH"] = self.op.src_path
2910 env["INSTANCE_SRC_IMAGE"] = self.src_image
2912 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2913 primary_node=self.op.pnode,
2914 secondary_nodes=self.secondaries,
2915 status=self.instance_status,
2916 os_type=self.op.os_type,
2917 memory=self.op.mem_size,
2918 vcpus=self.op.vcpus,
2919 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2922 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2927 def CheckPrereq(self):
2928 """Check prerequisites.
2931 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2932 if not hasattr(self.op, attr):
2933 setattr(self.op, attr, None)
2935 if self.op.mode not in (constants.INSTANCE_CREATE,
2936 constants.INSTANCE_IMPORT):
2937 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2940 if self.op.mode == constants.INSTANCE_IMPORT:
2941 src_node = getattr(self.op, "src_node", None)
2942 src_path = getattr(self.op, "src_path", None)
2943 if src_node is None or src_path is None:
2944 raise errors.OpPrereqError("Importing an instance requires source"
2945 " node and path options")
2946 src_node_full = self.cfg.ExpandNodeName(src_node)
2947 if src_node_full is None:
2948 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2949 self.op.src_node = src_node = src_node_full
2951 if not os.path.isabs(src_path):
2952 raise errors.OpPrereqError("The source path must be absolute")
2954 export_info = rpc.call_export_info(src_node, src_path)
2957 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2959 if not export_info.has_section(constants.INISECT_EXP):
2960 raise errors.ProgrammerError("Corrupted export config")
2962 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2963 if (int(ei_version) != constants.EXPORT_VERSION):
2964 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2965 (ei_version, constants.EXPORT_VERSION))
2967 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2968 raise errors.OpPrereqError("Can't import instance with more than"
2971 # FIXME: are the old os-es, disk sizes, etc. useful?
2972 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2973 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2975 self.src_image = diskimage
2976 else: # INSTANCE_CREATE
2977 if getattr(self.op, "os_type", None) is None:
2978 raise errors.OpPrereqError("No guest OS specified")
2980 # check primary node
2981 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2983 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2985 self.op.pnode = pnode.name
2987 self.secondaries = []
2988 # disk template and mirror node verification
2989 if self.op.disk_template not in constants.DISK_TEMPLATES:
2990 raise errors.OpPrereqError("Invalid disk template name")
2992 if self.op.disk_template in constants.DTS_NET_MIRROR:
2993 if getattr(self.op, "snode", None) is None:
2994 raise errors.OpPrereqError("The networked disk templates need"
2997 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2998 if snode_name is None:
2999 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3001 elif snode_name == pnode.name:
3002 raise errors.OpPrereqError("The secondary node cannot be"
3003 " the primary node.")
3004 self.secondaries.append(snode_name)
3006 # Required free disk space as a function of disk and swap space
3008 constants.DT_DISKLESS: None,
3009 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3010 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3011 # 256 MB are added for drbd metadata, 128MB for each drbd device
3012 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3013 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3016 if self.op.disk_template not in req_size_dict:
3017 raise errors.ProgrammerError("Disk template '%s' size requirement"
3018 " is unknown" % self.op.disk_template)
3020 req_size = req_size_dict[self.op.disk_template]
3022 # Check lv size requirements
3023 if req_size is not None:
3024 nodenames = [pnode.name] + self.secondaries
3025 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3026 for node in nodenames:
3027 info = nodeinfo.get(node, None)
3029 raise errors.OpPrereqError("Cannot get current information"
3030 " from node '%s'" % nodeinfo)
3031 vg_free = info.get('vg_free', None)
3032 if not isinstance(vg_free, int):
3033 raise errors.OpPrereqError("Can't compute free disk space on"
3035 if req_size > info['vg_free']:
3036 raise errors.OpPrereqError("Not enough disk space on target node %s."
3037 " %d MB available, %d MB required" %
3038 (node, info['vg_free'], req_size))
3041 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3043 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3044 " primary node" % self.op.os_type)
3046 if self.op.kernel_path == constants.VALUE_NONE:
3047 raise errors.OpPrereqError("Can't set instance kernel to none")
3049 # instance verification
3050 hostname1 = utils.HostInfo(self.op.instance_name)
3052 self.op.instance_name = instance_name = hostname1.name
3053 instance_list = self.cfg.GetInstanceList()
3054 if instance_name in instance_list:
3055 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3058 ip = getattr(self.op, "ip", None)
3059 if ip is None or ip.lower() == "none":
3061 elif ip.lower() == "auto":
3062 inst_ip = hostname1.ip
3064 if not utils.IsValidIP(ip):
3065 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3066 " like a valid IP" % ip)
3068 self.inst_ip = inst_ip
3070 if self.op.start and not self.op.ip_check:
3071 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3072 " adding an instance in start mode")
3074 if self.op.ip_check:
3075 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3076 constants.DEFAULT_NODED_PORT):
3077 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3078 (hostname1.ip, instance_name))
3080 # MAC address verification
3081 if self.op.mac != "auto":
3082 if not utils.IsValidMac(self.op.mac.lower()):
3083 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3086 # bridge verification
3087 bridge = getattr(self.op, "bridge", None)
3089 self.op.bridge = self.cfg.GetDefBridge()
3091 self.op.bridge = bridge
3093 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3094 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3095 " destination node '%s'" %
3096 (self.op.bridge, pnode.name))
3098 # boot order verification
3099 if self.op.hvm_boot_order is not None:
3100 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3101 raise errors.OpPrereqError("invalid boot order specified,"
3102 " must be one or more of [acdn]")
3105 self.instance_status = 'up'
3107 self.instance_status = 'down'
3109 def Exec(self, feedback_fn):
3110 """Create and add the instance to the cluster.
3113 instance = self.op.instance_name
3114 pnode_name = self.pnode.name
3116 if self.op.mac == "auto":
3117 mac_address = self.cfg.GenerateMAC()
3119 mac_address = self.op.mac
3121 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3122 if self.inst_ip is not None:
3123 nic.ip = self.inst_ip
3125 ht_kind = self.sstore.GetHypervisorType()
3126 if ht_kind in constants.HTS_REQ_PORT:
3127 network_port = self.cfg.AllocatePort()
3131 disks = _GenerateDiskTemplate(self.cfg,
3132 self.op.disk_template,
3133 instance, pnode_name,
3134 self.secondaries, self.op.disk_size,
3137 iobj = objects.Instance(name=instance, os=self.op.os_type,
3138 primary_node=pnode_name,
3139 memory=self.op.mem_size,
3140 vcpus=self.op.vcpus,
3141 nics=[nic], disks=disks,
3142 disk_template=self.op.disk_template,
3143 status=self.instance_status,
3144 network_port=network_port,
3145 kernel_path=self.op.kernel_path,
3146 initrd_path=self.op.initrd_path,
3147 hvm_boot_order=self.op.hvm_boot_order,
3150 feedback_fn("* creating instance disks...")
3151 if not _CreateDisks(self.cfg, iobj):
3152 _RemoveDisks(iobj, self.cfg)
3153 raise errors.OpExecError("Device creation failed, reverting...")
3155 feedback_fn("adding instance %s to cluster config" % instance)
3157 self.cfg.AddInstance(iobj)
3159 if self.op.wait_for_sync:
3160 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3161 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3162 # make sure the disks are not degraded (still sync-ing is ok)
3164 feedback_fn("* checking mirrors status")
3165 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3170 _RemoveDisks(iobj, self.cfg)
3171 self.cfg.RemoveInstance(iobj.name)
3172 raise errors.OpExecError("There are some degraded disks for"
3175 feedback_fn("creating os for instance %s on node %s" %
3176 (instance, pnode_name))
3178 if iobj.disk_template != constants.DT_DISKLESS:
3179 if self.op.mode == constants.INSTANCE_CREATE:
3180 feedback_fn("* running the instance OS create scripts...")
3181 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3182 raise errors.OpExecError("could not add os for instance %s"
3184 (instance, pnode_name))
3186 elif self.op.mode == constants.INSTANCE_IMPORT:
3187 feedback_fn("* running the instance OS import scripts...")
3188 src_node = self.op.src_node
3189 src_image = self.src_image
3190 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3191 src_node, src_image):
3192 raise errors.OpExecError("Could not import os for instance"
3194 (instance, pnode_name))
3196 # also checked in the prereq part
3197 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3201 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3202 feedback_fn("* starting instance...")
3203 if not rpc.call_instance_start(pnode_name, iobj, None):
3204 raise errors.OpExecError("Could not start instance")
3207 class LUConnectConsole(NoHooksLU):
3208 """Connect to an instance's console.
3210 This is somewhat special in that it returns the command line that
3211 you need to run on the master node in order to connect to the
3215 _OP_REQP = ["instance_name"]
3217 def CheckPrereq(self):
3218 """Check prerequisites.
3220 This checks that the instance is in the cluster.
3223 instance = self.cfg.GetInstanceInfo(
3224 self.cfg.ExpandInstanceName(self.op.instance_name))
3225 if instance is None:
3226 raise errors.OpPrereqError("Instance '%s' not known" %
3227 self.op.instance_name)
3228 self.instance = instance
3230 def Exec(self, feedback_fn):
3231 """Connect to the console of an instance
3234 instance = self.instance
3235 node = instance.primary_node
3237 node_insts = rpc.call_instance_list([node])[node]
3238 if node_insts is False:
3239 raise errors.OpExecError("Can't connect to node %s." % node)
3241 if instance.name not in node_insts:
3242 raise errors.OpExecError("Instance %s is not running." % instance.name)
3244 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3246 hyper = hypervisor.GetHypervisor()
3247 console_cmd = hyper.GetShellCommandForConsole(instance)
3249 argv = ["ssh", "-q", "-t"]
3250 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3251 argv.extend(ssh.BATCH_MODE_OPTS)
3253 argv.append(console_cmd)
3257 class LUAddMDDRBDComponent(LogicalUnit):
3258 """Adda new mirror member to an instance's disk.
3261 HPATH = "mirror-add"
3262 HTYPE = constants.HTYPE_INSTANCE
3263 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3265 def BuildHooksEnv(self):
3268 This runs on the master, the primary and all the secondaries.
3272 "NEW_SECONDARY": self.op.remote_node,
3273 "DISK_NAME": self.op.disk_name,
3275 env.update(_BuildInstanceHookEnvByObject(self.instance))
3276 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3277 self.op.remote_node,] + list(self.instance.secondary_nodes)
3280 def CheckPrereq(self):
3281 """Check prerequisites.
3283 This checks that the instance is in the cluster.
3286 instance = self.cfg.GetInstanceInfo(
3287 self.cfg.ExpandInstanceName(self.op.instance_name))
3288 if instance is None:
3289 raise errors.OpPrereqError("Instance '%s' not known" %
3290 self.op.instance_name)
3291 self.instance = instance
3293 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3294 if remote_node is None:
3295 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3296 self.remote_node = remote_node
3298 if remote_node == instance.primary_node:
3299 raise errors.OpPrereqError("The specified node is the primary node of"
3302 if instance.disk_template != constants.DT_REMOTE_RAID1:
3303 raise errors.OpPrereqError("Instance's disk layout is not"
3305 for disk in instance.disks:
3306 if disk.iv_name == self.op.disk_name:
3309 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310 " instance." % self.op.disk_name)
3311 if len(disk.children) > 1:
3312 raise errors.OpPrereqError("The device already has two slave devices."
3313 " This would create a 3-disk raid1 which we"
3317 def Exec(self, feedback_fn):
3318 """Add the mirror component
3322 instance = self.instance
3324 remote_node = self.remote_node
3325 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3326 names = _GenerateUniqueNames(self.cfg, lv_names)
3327 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3328 remote_node, disk.size, names)
3330 logger.Info("adding new mirror component on secondary")
3332 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3334 _GetInstanceInfoText(instance)):
3335 raise errors.OpExecError("Failed to create new component on secondary"
3336 " node %s" % remote_node)
3338 logger.Info("adding new mirror component on primary")
3340 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3342 _GetInstanceInfoText(instance)):
3343 # remove secondary dev
3344 self.cfg.SetDiskID(new_drbd, remote_node)
3345 rpc.call_blockdev_remove(remote_node, new_drbd)
3346 raise errors.OpExecError("Failed to create volume on primary")
3348 # the device exists now
3349 # call the primary node to add the mirror to md
3350 logger.Info("adding new mirror component to md")
3351 if not rpc.call_blockdev_addchildren(instance.primary_node,
3353 logger.Error("Can't add mirror compoment to md!")
3354 self.cfg.SetDiskID(new_drbd, remote_node)
3355 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3356 logger.Error("Can't rollback on secondary")
3357 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3358 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3359 logger.Error("Can't rollback on primary")
3360 raise errors.OpExecError("Can't add mirror component to md array")
3362 disk.children.append(new_drbd)
3364 self.cfg.AddInstance(instance)
3366 _WaitForSync(self.cfg, instance, self.proc)
3371 class LURemoveMDDRBDComponent(LogicalUnit):
3372 """Remove a component from a remote_raid1 disk.
3375 HPATH = "mirror-remove"
3376 HTYPE = constants.HTYPE_INSTANCE
3377 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3379 def BuildHooksEnv(self):
3382 This runs on the master, the primary and all the secondaries.
3386 "DISK_NAME": self.op.disk_name,
3387 "DISK_ID": self.op.disk_id,
3388 "OLD_SECONDARY": self.old_secondary,
3390 env.update(_BuildInstanceHookEnvByObject(self.instance))
3391 nl = [self.sstore.GetMasterNode(),
3392 self.instance.primary_node] + list(self.instance.secondary_nodes)
3395 def CheckPrereq(self):
3396 """Check prerequisites.
3398 This checks that the instance is in the cluster.
3401 instance = self.cfg.GetInstanceInfo(
3402 self.cfg.ExpandInstanceName(self.op.instance_name))
3403 if instance is None:
3404 raise errors.OpPrereqError("Instance '%s' not known" %
3405 self.op.instance_name)
3406 self.instance = instance
3408 if instance.disk_template != constants.DT_REMOTE_RAID1:
3409 raise errors.OpPrereqError("Instance's disk layout is not"
3411 for disk in instance.disks:
3412 if disk.iv_name == self.op.disk_name:
3415 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3416 " instance." % self.op.disk_name)
3417 for child in disk.children:
3418 if (child.dev_type == constants.LD_DRBD7 and
3419 child.logical_id[2] == self.op.disk_id):
3422 raise errors.OpPrereqError("Can't find the device with this port.")
3424 if len(disk.children) < 2:
3425 raise errors.OpPrereqError("Cannot remove the last component from"
3429 if self.child.logical_id[0] == instance.primary_node:
3433 self.old_secondary = self.child.logical_id[oid]
3435 def Exec(self, feedback_fn):
3436 """Remove the mirror component
3439 instance = self.instance
3442 logger.Info("remove mirror component")
3443 self.cfg.SetDiskID(disk, instance.primary_node)
3444 if not rpc.call_blockdev_removechildren(instance.primary_node,
3446 raise errors.OpExecError("Can't remove child from mirror.")
3448 for node in child.logical_id[:2]:
3449 self.cfg.SetDiskID(child, node)
3450 if not rpc.call_blockdev_remove(node, child):
3451 logger.Error("Warning: failed to remove device from node %s,"
3452 " continuing operation." % node)
3454 disk.children.remove(child)
3455 self.cfg.AddInstance(instance)
3458 class LUReplaceDisks(LogicalUnit):
3459 """Replace the disks of an instance.
3462 HPATH = "mirrors-replace"
3463 HTYPE = constants.HTYPE_INSTANCE
3464 _OP_REQP = ["instance_name", "mode", "disks"]
3466 def BuildHooksEnv(self):
3469 This runs on the master, the primary and all the secondaries.
3473 "MODE": self.op.mode,
3474 "NEW_SECONDARY": self.op.remote_node,
3475 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3477 env.update(_BuildInstanceHookEnvByObject(self.instance))
3479 self.sstore.GetMasterNode(),
3480 self.instance.primary_node,
3482 if self.op.remote_node is not None:
3483 nl.append(self.op.remote_node)
3486 def CheckPrereq(self):
3487 """Check prerequisites.
3489 This checks that the instance is in the cluster.
3492 instance = self.cfg.GetInstanceInfo(
3493 self.cfg.ExpandInstanceName(self.op.instance_name))
3494 if instance is None:
3495 raise errors.OpPrereqError("Instance '%s' not known" %
3496 self.op.instance_name)
3497 self.instance = instance
3498 self.op.instance_name = instance.name
3500 if instance.disk_template not in constants.DTS_NET_MIRROR:
3501 raise errors.OpPrereqError("Instance's disk layout is not"
3502 " network mirrored.")
3504 if len(instance.secondary_nodes) != 1:
3505 raise errors.OpPrereqError("The instance has a strange layout,"
3506 " expected one secondary but found %d" %
3507 len(instance.secondary_nodes))
3509 self.sec_node = instance.secondary_nodes[0]
3511 remote_node = getattr(self.op, "remote_node", None)
3512 if remote_node is not None:
3513 remote_node = self.cfg.ExpandNodeName(remote_node)
3514 if remote_node is None:
3515 raise errors.OpPrereqError("Node '%s' not known" %
3516 self.op.remote_node)
3517 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3519 self.remote_node_info = None
3520 if remote_node == instance.primary_node:
3521 raise errors.OpPrereqError("The specified node is the primary node of"
3523 elif remote_node == self.sec_node:
3524 if self.op.mode == constants.REPLACE_DISK_SEC:
3525 # this is for DRBD8, where we can't execute the same mode of
3526 # replacement as for drbd7 (no different port allocated)
3527 raise errors.OpPrereqError("Same secondary given, cannot execute"
3529 # the user gave the current secondary, switch to
3530 # 'no-replace-secondary' mode for drbd7
3532 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3533 self.op.mode != constants.REPLACE_DISK_ALL):
3534 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3535 " disks replacement, not individual ones")
3536 if instance.disk_template == constants.DT_DRBD8:
3537 if (self.op.mode == constants.REPLACE_DISK_ALL and
3538 remote_node is not None):
3539 # switch to replace secondary mode
3540 self.op.mode = constants.REPLACE_DISK_SEC
3542 if self.op.mode == constants.REPLACE_DISK_ALL:
3543 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3544 " secondary disk replacement, not"
3546 elif self.op.mode == constants.REPLACE_DISK_PRI:
3547 if remote_node is not None:
3548 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3549 " the secondary while doing a primary"
3550 " node disk replacement")
3551 self.tgt_node = instance.primary_node
3552 self.oth_node = instance.secondary_nodes[0]
3553 elif self.op.mode == constants.REPLACE_DISK_SEC:
3554 self.new_node = remote_node # this can be None, in which case
3555 # we don't change the secondary
3556 self.tgt_node = instance.secondary_nodes[0]
3557 self.oth_node = instance.primary_node
3559 raise errors.ProgrammerError("Unhandled disk replace mode")
3561 for name in self.op.disks:
3562 if instance.FindDisk(name) is None:
3563 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3564 (name, instance.name))
3565 self.op.remote_node = remote_node
3567 def _ExecRR1(self, feedback_fn):
3568 """Replace the disks of an instance.
3571 instance = self.instance
3574 if self.op.remote_node is None:
3575 remote_node = self.sec_node
3577 remote_node = self.op.remote_node
3579 for dev in instance.disks:
3581 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3582 names = _GenerateUniqueNames(cfg, lv_names)
3583 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3584 remote_node, size, names)
3585 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3586 logger.Info("adding new mirror component on secondary for %s" %
3589 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3591 _GetInstanceInfoText(instance)):
3592 raise errors.OpExecError("Failed to create new component on secondary"
3593 " node %s. Full abort, cleanup manually!" %
3596 logger.Info("adding new mirror component on primary")
3598 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3600 _GetInstanceInfoText(instance)):
3601 # remove secondary dev
3602 cfg.SetDiskID(new_drbd, remote_node)
3603 rpc.call_blockdev_remove(remote_node, new_drbd)
3604 raise errors.OpExecError("Failed to create volume on primary!"
3605 " Full abort, cleanup manually!!")
3607 # the device exists now
3608 # call the primary node to add the mirror to md
3609 logger.Info("adding new mirror component to md")
3610 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3612 logger.Error("Can't add mirror compoment to md!")
3613 cfg.SetDiskID(new_drbd, remote_node)
3614 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3615 logger.Error("Can't rollback on secondary")
3616 cfg.SetDiskID(new_drbd, instance.primary_node)
3617 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3618 logger.Error("Can't rollback on primary")
3619 raise errors.OpExecError("Full abort, cleanup manually!!")
3621 dev.children.append(new_drbd)
3622 cfg.AddInstance(instance)
3624 # this can fail as the old devices are degraded and _WaitForSync
3625 # does a combined result over all disks, so we don't check its
3627 _WaitForSync(cfg, instance, self.proc, unlock=True)
3629 # so check manually all the devices
3630 for name in iv_names:
3631 dev, child, new_drbd = iv_names[name]
3632 cfg.SetDiskID(dev, instance.primary_node)
3633 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3635 raise errors.OpExecError("MD device %s is degraded!" % name)
3636 cfg.SetDiskID(new_drbd, instance.primary_node)
3637 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3639 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3641 for name in iv_names:
3642 dev, child, new_drbd = iv_names[name]
3643 logger.Info("remove mirror %s component" % name)
3644 cfg.SetDiskID(dev, instance.primary_node)
3645 if not rpc.call_blockdev_removechildren(instance.primary_node,
3647 logger.Error("Can't remove child from mirror, aborting"
3648 " *this device cleanup*.\nYou need to cleanup manually!!")
3651 for node in child.logical_id[:2]:
3652 logger.Info("remove child device on %s" % node)
3653 cfg.SetDiskID(child, node)
3654 if not rpc.call_blockdev_remove(node, child):
3655 logger.Error("Warning: failed to remove device from node %s,"
3656 " continuing operation." % node)
3658 dev.children.remove(child)
3660 cfg.AddInstance(instance)
3662 def _ExecD8DiskOnly(self, feedback_fn):
3663 """Replace a disk on the primary or secondary for dbrd8.
3665 The algorithm for replace is quite complicated:
3666 - for each disk to be replaced:
3667 - create new LVs on the target node with unique names
3668 - detach old LVs from the drbd device
3669 - rename old LVs to name_replaced.<time_t>
3670 - rename new LVs to old LVs
3671 - attach the new LVs (with the old names now) to the drbd device
3672 - wait for sync across all devices
3673 - for each modified disk:
3674 - remove old LVs (which have the name name_replaces.<time_t>)
3676 Failures are not very well handled.
3680 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3681 instance = self.instance
3683 vgname = self.cfg.GetVGName()
3686 tgt_node = self.tgt_node
3687 oth_node = self.oth_node
3689 # Step: check device activation
3690 self.proc.LogStep(1, steps_total, "check device existence")
3691 info("checking volume groups")
3692 my_vg = cfg.GetVGName()
3693 results = rpc.call_vg_list([oth_node, tgt_node])
3695 raise errors.OpExecError("Can't list volume groups on the nodes")
3696 for node in oth_node, tgt_node:
3697 res = results.get(node, False)
3698 if not res or my_vg not in res:
3699 raise errors.OpExecError("Volume group '%s' not found on %s" %
3701 for dev in instance.disks:
3702 if not dev.iv_name in self.op.disks:
3704 for node in tgt_node, oth_node:
3705 info("checking %s on %s" % (dev.iv_name, node))
3706 cfg.SetDiskID(dev, node)
3707 if not rpc.call_blockdev_find(node, dev):
3708 raise errors.OpExecError("Can't find device %s on node %s" %
3709 (dev.iv_name, node))
3711 # Step: check other node consistency
3712 self.proc.LogStep(2, steps_total, "check peer consistency")
3713 for dev in instance.disks:
3714 if not dev.iv_name in self.op.disks:
3716 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3717 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3718 oth_node==instance.primary_node):
3719 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3720 " to replace disks on this node (%s)" %
3721 (oth_node, tgt_node))
3723 # Step: create new storage
3724 self.proc.LogStep(3, steps_total, "allocate new storage")
3725 for dev in instance.disks:
3726 if not dev.iv_name in self.op.disks:
3729 cfg.SetDiskID(dev, tgt_node)
3730 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3731 names = _GenerateUniqueNames(cfg, lv_names)
3732 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3733 logical_id=(vgname, names[0]))
3734 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3735 logical_id=(vgname, names[1]))
3736 new_lvs = [lv_data, lv_meta]
3737 old_lvs = dev.children
3738 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3739 info("creating new local storage on %s for %s" %
3740 (tgt_node, dev.iv_name))
3741 # since we *always* want to create this LV, we use the
3742 # _Create...OnPrimary (which forces the creation), even if we
3743 # are talking about the secondary node
3744 for new_lv in new_lvs:
3745 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3746 _GetInstanceInfoText(instance)):
3747 raise errors.OpExecError("Failed to create new LV named '%s' on"
3749 (new_lv.logical_id[1], tgt_node))
3751 # Step: for each lv, detach+rename*2+attach
3752 self.proc.LogStep(4, steps_total, "change drbd configuration")
3753 for dev, old_lvs, new_lvs in iv_names.itervalues():
3754 info("detaching %s drbd from local storage" % dev.iv_name)
3755 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3756 raise errors.OpExecError("Can't detach drbd from local storage on node"
3757 " %s for device %s" % (tgt_node, dev.iv_name))
3759 #cfg.Update(instance)
3761 # ok, we created the new LVs, so now we know we have the needed
3762 # storage; as such, we proceed on the target node to rename
3763 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3764 # using the assumption than logical_id == physical_id (which in
3765 # turn is the unique_id on that node)
3767 # FIXME(iustin): use a better name for the replaced LVs
3768 temp_suffix = int(time.time())
3769 ren_fn = lambda d, suff: (d.physical_id[0],
3770 d.physical_id[1] + "_replaced-%s" % suff)
3771 # build the rename list based on what LVs exist on the node
3773 for to_ren in old_lvs:
3774 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3775 if find_res is not None: # device exists
3776 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3778 info("renaming the old LVs on the target node")
3779 if not rpc.call_blockdev_rename(tgt_node, rlist):
3780 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3781 # now we rename the new LVs to the old LVs
3782 info("renaming the new LVs on the target node")
3783 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3784 if not rpc.call_blockdev_rename(tgt_node, rlist):
3785 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3787 for old, new in zip(old_lvs, new_lvs):
3788 new.logical_id = old.logical_id
3789 cfg.SetDiskID(new, tgt_node)
3791 for disk in old_lvs:
3792 disk.logical_id = ren_fn(disk, temp_suffix)
3793 cfg.SetDiskID(disk, tgt_node)
3795 # now that the new lvs have the old name, we can add them to the device
3796 info("adding new mirror component on %s" % tgt_node)
3797 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3798 for new_lv in new_lvs:
3799 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3800 warning("Can't rollback device %s", hint="manually cleanup unused"
3802 raise errors.OpExecError("Can't add local storage to drbd")
3804 dev.children = new_lvs
3805 cfg.Update(instance)
3807 # Step: wait for sync
3809 # this can fail as the old devices are degraded and _WaitForSync
3810 # does a combined result over all disks, so we don't check its
3812 self.proc.LogStep(5, steps_total, "sync devices")
3813 _WaitForSync(cfg, instance, self.proc, unlock=True)
3815 # so check manually all the devices
3816 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3817 cfg.SetDiskID(dev, instance.primary_node)
3818 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3820 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3822 # Step: remove old storage
3823 self.proc.LogStep(6, steps_total, "removing old storage")
3824 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3825 info("remove logical volumes for %s" % name)
3827 cfg.SetDiskID(lv, tgt_node)
3828 if not rpc.call_blockdev_remove(tgt_node, lv):
3829 warning("Can't remove old LV", hint="manually remove unused LVs")
3832 def _ExecD8Secondary(self, feedback_fn):
3833 """Replace the secondary node for drbd8.
3835 The algorithm for replace is quite complicated:
3836 - for all disks of the instance:
3837 - create new LVs on the new node with same names
3838 - shutdown the drbd device on the old secondary
3839 - disconnect the drbd network on the primary
3840 - create the drbd device on the new secondary
3841 - network attach the drbd on the primary, using an artifice:
3842 the drbd code for Attach() will connect to the network if it
3843 finds a device which is connected to the good local disks but
3845 - wait for sync across all devices
3846 - remove all disks from the old secondary
3848 Failures are not very well handled.
3852 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3853 instance = self.instance
3855 vgname = self.cfg.GetVGName()
3858 old_node = self.tgt_node
3859 new_node = self.new_node
3860 pri_node = instance.primary_node
3862 # Step: check device activation
3863 self.proc.LogStep(1, steps_total, "check device existence")
3864 info("checking volume groups")
3865 my_vg = cfg.GetVGName()
3866 results = rpc.call_vg_list([pri_node, new_node])
3868 raise errors.OpExecError("Can't list volume groups on the nodes")
3869 for node in pri_node, new_node:
3870 res = results.get(node, False)
3871 if not res or my_vg not in res:
3872 raise errors.OpExecError("Volume group '%s' not found on %s" %
3874 for dev in instance.disks:
3875 if not dev.iv_name in self.op.disks:
3877 info("checking %s on %s" % (dev.iv_name, pri_node))
3878 cfg.SetDiskID(dev, pri_node)
3879 if not rpc.call_blockdev_find(pri_node, dev):
3880 raise errors.OpExecError("Can't find device %s on node %s" %
3881 (dev.iv_name, pri_node))
3883 # Step: check other node consistency
3884 self.proc.LogStep(2, steps_total, "check peer consistency")
3885 for dev in instance.disks:
3886 if not dev.iv_name in self.op.disks:
3888 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3889 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3890 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3891 " unsafe to replace the secondary" %
3894 # Step: create new storage
3895 self.proc.LogStep(3, steps_total, "allocate new storage")
3896 for dev in instance.disks:
3898 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3899 # since we *always* want to create this LV, we use the
3900 # _Create...OnPrimary (which forces the creation), even if we
3901 # are talking about the secondary node
3902 for new_lv in dev.children:
3903 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3904 _GetInstanceInfoText(instance)):
3905 raise errors.OpExecError("Failed to create new LV named '%s' on"
3907 (new_lv.logical_id[1], new_node))
3909 iv_names[dev.iv_name] = (dev, dev.children)
3911 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3912 for dev in instance.disks:
3914 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3915 # create new devices on new_node
3916 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3917 logical_id=(pri_node, new_node,
3919 children=dev.children)
3920 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3922 _GetInstanceInfoText(instance)):
3923 raise errors.OpExecError("Failed to create new DRBD on"
3924 " node '%s'" % new_node)
3926 for dev in instance.disks:
3927 # we have new devices, shutdown the drbd on the old secondary
3928 info("shutting down drbd for %s on old node" % dev.iv_name)
3929 cfg.SetDiskID(dev, old_node)
3930 if not rpc.call_blockdev_shutdown(old_node, dev):
3931 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3932 hint="Please cleanup this device manually as soon as possible")
3934 info("detaching primary drbds from the network (=> standalone)")
3936 for dev in instance.disks:
3937 cfg.SetDiskID(dev, pri_node)
3938 # set the physical (unique in bdev terms) id to None, meaning
3939 # detach from network
3940 dev.physical_id = (None,) * len(dev.physical_id)
3941 # and 'find' the device, which will 'fix' it to match the
3943 if rpc.call_blockdev_find(pri_node, dev):
3946 warning("Failed to detach drbd %s from network, unusual case" %
3950 # no detaches succeeded (very unlikely)
3951 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3953 # if we managed to detach at least one, we update all the disks of
3954 # the instance to point to the new secondary
3955 info("updating instance configuration")
3956 for dev in instance.disks:
3957 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3958 cfg.SetDiskID(dev, pri_node)
3959 cfg.Update(instance)
3961 # and now perform the drbd attach
3962 info("attaching primary drbds to new secondary (standalone => connected)")
3964 for dev in instance.disks:
3965 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3966 # since the attach is smart, it's enough to 'find' the device,
3967 # it will automatically activate the network, if the physical_id
3969 cfg.SetDiskID(dev, pri_node)
3970 if not rpc.call_blockdev_find(pri_node, dev):
3971 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3972 "please do a gnt-instance info to see the status of disks")
3974 # this can fail as the old devices are degraded and _WaitForSync
3975 # does a combined result over all disks, so we don't check its
3977 self.proc.LogStep(5, steps_total, "sync devices")
3978 _WaitForSync(cfg, instance, self.proc, unlock=True)
3980 # so check manually all the devices
3981 for name, (dev, old_lvs) in iv_names.iteritems():
3982 cfg.SetDiskID(dev, pri_node)
3983 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3985 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3987 self.proc.LogStep(6, steps_total, "removing old storage")
3988 for name, (dev, old_lvs) in iv_names.iteritems():
3989 info("remove logical volumes for %s" % name)
3991 cfg.SetDiskID(lv, old_node)
3992 if not rpc.call_blockdev_remove(old_node, lv):
3993 warning("Can't remove LV on old secondary",
3994 hint="Cleanup stale volumes by hand")
3996 def Exec(self, feedback_fn):
3997 """Execute disk replacement.
3999 This dispatches the disk replacement to the appropriate handler.
4002 instance = self.instance
4003 if instance.disk_template == constants.DT_REMOTE_RAID1:
4005 elif instance.disk_template == constants.DT_DRBD8:
4006 if self.op.remote_node is None:
4007 fn = self._ExecD8DiskOnly
4009 fn = self._ExecD8Secondary
4011 raise errors.ProgrammerError("Unhandled disk replacement case")
4012 return fn(feedback_fn)
4015 class LUQueryInstanceData(NoHooksLU):
4016 """Query runtime instance data.
4019 _OP_REQP = ["instances"]
4021 def CheckPrereq(self):
4022 """Check prerequisites.
4024 This only checks the optional instance list against the existing names.
4027 if not isinstance(self.op.instances, list):
4028 raise errors.OpPrereqError("Invalid argument type 'instances'")
4029 if self.op.instances:
4030 self.wanted_instances = []
4031 names = self.op.instances
4033 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4034 if instance is None:
4035 raise errors.OpPrereqError("No such instance name '%s'" % name)
4036 self.wanted_instances.append(instance)
4038 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4039 in self.cfg.GetInstanceList()]
4043 def _ComputeDiskStatus(self, instance, snode, dev):
4044 """Compute block device status.
4047 self.cfg.SetDiskID(dev, instance.primary_node)
4048 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4049 if dev.dev_type in constants.LDS_DRBD:
4050 # we change the snode then (otherwise we use the one passed in)
4051 if dev.logical_id[0] == instance.primary_node:
4052 snode = dev.logical_id[1]
4054 snode = dev.logical_id[0]
4057 self.cfg.SetDiskID(dev, snode)
4058 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4063 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4064 for child in dev.children]
4069 "iv_name": dev.iv_name,
4070 "dev_type": dev.dev_type,
4071 "logical_id": dev.logical_id,
4072 "physical_id": dev.physical_id,
4073 "pstatus": dev_pstatus,
4074 "sstatus": dev_sstatus,
4075 "children": dev_children,
4080 def Exec(self, feedback_fn):
4081 """Gather and return data"""
4083 for instance in self.wanted_instances:
4084 remote_info = rpc.call_instance_info(instance.primary_node,
4086 if remote_info and "state" in remote_info:
4089 remote_state = "down"
4090 if instance.status == "down":
4091 config_state = "down"
4095 disks = [self._ComputeDiskStatus(instance, None, device)
4096 for device in instance.disks]
4099 "name": instance.name,
4100 "config_state": config_state,
4101 "run_state": remote_state,
4102 "pnode": instance.primary_node,
4103 "snodes": instance.secondary_nodes,
4105 "memory": instance.memory,
4106 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4108 "network_port": instance.network_port,
4109 "vcpus": instance.vcpus,
4110 "kernel_path": instance.kernel_path,
4111 "initrd_path": instance.initrd_path,
4112 "hvm_boot_order": instance.hvm_boot_order,
4115 result[instance.name] = idict
4120 class LUSetInstanceParms(LogicalUnit):
4121 """Modifies an instances's parameters.
4124 HPATH = "instance-modify"
4125 HTYPE = constants.HTYPE_INSTANCE
4126 _OP_REQP = ["instance_name"]
4128 def BuildHooksEnv(self):
4131 This runs on the master, primary and secondaries.
4136 args['memory'] = self.mem
4138 args['vcpus'] = self.vcpus
4139 if self.do_ip or self.do_bridge or self.mac:
4143 ip = self.instance.nics[0].ip
4145 bridge = self.bridge
4147 bridge = self.instance.nics[0].bridge
4151 mac = self.instance.nics[0].mac
4152 args['nics'] = [(ip, bridge, mac)]
4153 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4154 nl = [self.sstore.GetMasterNode(),
4155 self.instance.primary_node] + list(self.instance.secondary_nodes)
4158 def CheckPrereq(self):
4159 """Check prerequisites.
4161 This only checks the instance list against the existing names.
4164 self.mem = getattr(self.op, "mem", None)
4165 self.vcpus = getattr(self.op, "vcpus", None)
4166 self.ip = getattr(self.op, "ip", None)
4167 self.mac = getattr(self.op, "mac", None)
4168 self.bridge = getattr(self.op, "bridge", None)
4169 self.kernel_path = getattr(self.op, "kernel_path", None)
4170 self.initrd_path = getattr(self.op, "initrd_path", None)
4171 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4172 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4173 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4174 if all_parms.count(None) == len(all_parms):
4175 raise errors.OpPrereqError("No changes submitted")
4176 if self.mem is not None:
4178 self.mem = int(self.mem)
4179 except ValueError, err:
4180 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4181 if self.vcpus is not None:
4183 self.vcpus = int(self.vcpus)
4184 except ValueError, err:
4185 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4186 if self.ip is not None:
4188 if self.ip.lower() == "none":
4191 if not utils.IsValidIP(self.ip):
4192 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4195 self.do_bridge = (self.bridge is not None)
4196 if self.mac is not None:
4197 if self.cfg.IsMacInUse(self.mac):
4198 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4200 if not utils.IsValidMac(self.mac):
4201 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4203 if self.kernel_path is not None:
4204 self.do_kernel_path = True
4205 if self.kernel_path == constants.VALUE_NONE:
4206 raise errors.OpPrereqError("Can't set instance to no kernel")
4208 if self.kernel_path != constants.VALUE_DEFAULT:
4209 if not os.path.isabs(self.kernel_path):
4210 raise errors.OpPrereqError("The kernel path must be an absolute"
4213 self.do_kernel_path = False
4215 if self.initrd_path is not None:
4216 self.do_initrd_path = True
4217 if self.initrd_path not in (constants.VALUE_NONE,
4218 constants.VALUE_DEFAULT):
4219 if not os.path.isabs(self.initrd_path):
4220 raise errors.OpPrereqError("The initrd path must be an absolute"
4223 self.do_initrd_path = False
4225 # boot order verification
4226 if self.hvm_boot_order is not None:
4227 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4228 if len(self.hvm_boot_order.strip("acdn")) != 0:
4229 raise errors.OpPrereqError("invalid boot order specified,"
4230 " must be one or more of [acdn]"
4233 instance = self.cfg.GetInstanceInfo(
4234 self.cfg.ExpandInstanceName(self.op.instance_name))
4235 if instance is None:
4236 raise errors.OpPrereqError("No such instance name '%s'" %
4237 self.op.instance_name)
4238 self.op.instance_name = instance.name
4239 self.instance = instance
4242 def Exec(self, feedback_fn):
4243 """Modifies an instance.
4245 All parameters take effect only at the next restart of the instance.
4248 instance = self.instance
4250 instance.memory = self.mem
4251 result.append(("mem", self.mem))
4253 instance.vcpus = self.vcpus
4254 result.append(("vcpus", self.vcpus))
4256 instance.nics[0].ip = self.ip
4257 result.append(("ip", self.ip))
4259 instance.nics[0].bridge = self.bridge
4260 result.append(("bridge", self.bridge))
4262 instance.nics[0].mac = self.mac
4263 result.append(("mac", self.mac))
4264 if self.do_kernel_path:
4265 instance.kernel_path = self.kernel_path
4266 result.append(("kernel_path", self.kernel_path))
4267 if self.do_initrd_path:
4268 instance.initrd_path = self.initrd_path
4269 result.append(("initrd_path", self.initrd_path))
4270 if self.hvm_boot_order:
4271 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4272 instance.hvm_boot_order = None
4274 instance.hvm_boot_order = self.hvm_boot_order
4275 result.append(("hvm_boot_order", self.hvm_boot_order))
4277 self.cfg.AddInstance(instance)
4282 class LUQueryExports(NoHooksLU):
4283 """Query the exports list
4288 def CheckPrereq(self):
4289 """Check that the nodelist contains only existing nodes.
4292 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4294 def Exec(self, feedback_fn):
4295 """Compute the list of all the exported system images.
4298 a dictionary with the structure node->(export-list)
4299 where export-list is a list of the instances exported on
4303 return rpc.call_export_list(self.nodes)
4306 class LUExportInstance(LogicalUnit):
4307 """Export an instance to an image in the cluster.
4310 HPATH = "instance-export"
4311 HTYPE = constants.HTYPE_INSTANCE
4312 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4314 def BuildHooksEnv(self):
4317 This will run on the master, primary node and target node.
4321 "EXPORT_NODE": self.op.target_node,
4322 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4324 env.update(_BuildInstanceHookEnvByObject(self.instance))
4325 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4326 self.op.target_node]
4329 def CheckPrereq(self):
4330 """Check prerequisites.
4332 This checks that the instance name is a valid one.
4335 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4336 self.instance = self.cfg.GetInstanceInfo(instance_name)
4337 if self.instance is None:
4338 raise errors.OpPrereqError("Instance '%s' not found" %
4339 self.op.instance_name)
4342 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4343 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4345 if self.dst_node is None:
4346 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4347 self.op.target_node)
4348 self.op.target_node = self.dst_node.name
4350 def Exec(self, feedback_fn):
4351 """Export an instance to an image in the cluster.
4354 instance = self.instance
4355 dst_node = self.dst_node
4356 src_node = instance.primary_node
4357 # shutdown the instance, unless requested not to do so
4358 if self.op.shutdown:
4359 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4360 self.proc.ChainOpCode(op)
4362 vgname = self.cfg.GetVGName()
4367 for disk in instance.disks:
4368 if disk.iv_name == "sda":
4369 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4370 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4372 if not new_dev_name:
4373 logger.Error("could not snapshot block device %s on node %s" %
4374 (disk.logical_id[1], src_node))
4376 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4377 logical_id=(vgname, new_dev_name),
4378 physical_id=(vgname, new_dev_name),
4379 iv_name=disk.iv_name)
4380 snap_disks.append(new_dev)
4383 if self.op.shutdown:
4384 op = opcodes.OpStartupInstance(instance_name=instance.name,
4386 self.proc.ChainOpCode(op)
4388 # TODO: check for size
4390 for dev in snap_disks:
4391 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4393 logger.Error("could not export block device %s from node"
4395 (dev.logical_id[1], src_node, dst_node.name))
4396 if not rpc.call_blockdev_remove(src_node, dev):
4397 logger.Error("could not remove snapshot block device %s from"
4398 " node %s" % (dev.logical_id[1], src_node))
4400 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4401 logger.Error("could not finalize export for instance %s on node %s" %
4402 (instance.name, dst_node.name))
4404 nodelist = self.cfg.GetNodeList()
4405 nodelist.remove(dst_node.name)
4407 # on one-node clusters nodelist will be empty after the removal
4408 # if we proceed the backup would be removed because OpQueryExports
4409 # substitutes an empty list with the full cluster node list.
4411 op = opcodes.OpQueryExports(nodes=nodelist)
4412 exportlist = self.proc.ChainOpCode(op)
4413 for node in exportlist:
4414 if instance.name in exportlist[node]:
4415 if not rpc.call_export_remove(node, instance.name):
4416 logger.Error("could not remove older export for instance %s"
4417 " on node %s" % (instance.name, node))
4420 class TagsLU(NoHooksLU):
4423 This is an abstract class which is the parent of all the other tags LUs.
4426 def CheckPrereq(self):
4427 """Check prerequisites.
4430 if self.op.kind == constants.TAG_CLUSTER:
4431 self.target = self.cfg.GetClusterInfo()
4432 elif self.op.kind == constants.TAG_NODE:
4433 name = self.cfg.ExpandNodeName(self.op.name)
4435 raise errors.OpPrereqError("Invalid node name (%s)" %
4438 self.target = self.cfg.GetNodeInfo(name)
4439 elif self.op.kind == constants.TAG_INSTANCE:
4440 name = self.cfg.ExpandInstanceName(self.op.name)
4442 raise errors.OpPrereqError("Invalid instance name (%s)" %
4445 self.target = self.cfg.GetInstanceInfo(name)
4447 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4451 class LUGetTags(TagsLU):
4452 """Returns the tags of a given object.
4455 _OP_REQP = ["kind", "name"]
4457 def Exec(self, feedback_fn):
4458 """Returns the tag list.
4461 return self.target.GetTags()
4464 class LUSearchTags(NoHooksLU):
4465 """Searches the tags for a given pattern.
4468 _OP_REQP = ["pattern"]
4470 def CheckPrereq(self):
4471 """Check prerequisites.
4473 This checks the pattern passed for validity by compiling it.
4477 self.re = re.compile(self.op.pattern)
4478 except re.error, err:
4479 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4480 (self.op.pattern, err))
4482 def Exec(self, feedback_fn):
4483 """Returns the tag list.
4487 tgts = [("/cluster", cfg.GetClusterInfo())]
4488 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4489 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4490 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4491 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4493 for path, target in tgts:
4494 for tag in target.GetTags():
4495 if self.re.search(tag):
4496 results.append((path, tag))
4500 class LUAddTags(TagsLU):
4501 """Sets a tag on a given object.
4504 _OP_REQP = ["kind", "name", "tags"]
4506 def CheckPrereq(self):
4507 """Check prerequisites.
4509 This checks the type and length of the tag name and value.
4512 TagsLU.CheckPrereq(self)
4513 for tag in self.op.tags:
4514 objects.TaggableObject.ValidateTag(tag)
4516 def Exec(self, feedback_fn):
4521 for tag in self.op.tags:
4522 self.target.AddTag(tag)
4523 except errors.TagError, err:
4524 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4526 self.cfg.Update(self.target)
4527 except errors.ConfigurationError:
4528 raise errors.OpRetryError("There has been a modification to the"
4529 " config file and the operation has been"
4530 " aborted. Please retry.")
4533 class LUDelTags(TagsLU):
4534 """Delete a list of tags from a given object.
4537 _OP_REQP = ["kind", "name", "tags"]
4539 def CheckPrereq(self):
4540 """Check prerequisites.
4542 This checks that we have the given tag.
4545 TagsLU.CheckPrereq(self)
4546 for tag in self.op.tags:
4547 objects.TaggableObject.ValidateTag(tag)
4548 del_tags = frozenset(self.op.tags)
4549 cur_tags = self.target.GetTags()
4550 if not del_tags <= cur_tags:
4551 diff_tags = del_tags - cur_tags
4552 diff_names = ["'%s'" % tag for tag in diff_tags]
4554 raise errors.OpPrereqError("Tag(s) %s not found" %
4555 (",".join(diff_names)))
4557 def Exec(self, feedback_fn):
4558 """Remove the tag from the object.
4561 for tag in self.op.tags:
4562 self.target.RemoveTag(tag)
4564 self.cfg.Update(self.target)
4565 except errors.ConfigurationError:
4566 raise errors.OpRetryError("There has been a modification to the"
4567 " config file and the operation has been"
4568 " aborted. Please retry.")
4570 class LUTestDelay(NoHooksLU):
4571 """Sleep for a specified amount of time.
4573 This LU sleeps on the master and/or nodes for a specified amoutn of
4577 _OP_REQP = ["duration", "on_master", "on_nodes"]
4579 def CheckPrereq(self):
4580 """Check prerequisites.
4582 This checks that we have a good list of nodes and/or the duration
4587 if self.op.on_nodes:
4588 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4590 def Exec(self, feedback_fn):
4591 """Do the actual sleep.
4594 if self.op.on_master:
4595 if not utils.TestDelay(self.op.duration):
4596 raise errors.OpExecError("Error during master delay test")
4597 if self.op.on_nodes:
4598 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4600 raise errors.OpExecError("Complete failure from rpc call")
4601 for node, node_result in result.items():
4603 raise errors.OpExecError("Failure during rpc call to node %s,"
4604 " result: %s" % (node, node_result))