4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge, mac) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
279 env["INSTANCE_NIC_COUNT"] = nic_count
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285 """Builds instance related env variables for hooks from an object.
288 instance: objects.Instance object of instance
289 override: dict of values to override
292 'name': instance.name,
293 'primary_node': instance.primary_node,
294 'secondary_nodes': instance.secondary_nodes,
295 'os_type': instance.os,
296 'status': instance.os,
297 'memory': instance.memory,
298 'vcpus': instance.vcpus,
299 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302 args.update(override)
303 return _BuildInstanceHookEnv(**args)
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307 """Ensure a node has a correct known_hosts entry.
310 fullnode - Fully qualified domain name of host. (str)
311 ip - IPv4 address of host (str)
312 pubkey - the public key of the cluster
315 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327 logger.Debug('read %s' % (repr(rawline),))
329 parts = rawline.rstrip('\r\n').split()
331 # Ignore unwanted lines
332 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333 fields = parts[0].split(',')
338 for spec in [ ip, fullnode ]:
339 if spec not in fields:
344 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345 if haveall and key == pubkey:
347 save_lines.append(rawline)
348 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
351 if havesome and (not haveall or key != pubkey):
353 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
356 save_lines.append(rawline)
359 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
363 save_lines = save_lines + add_lines
365 # Write a new file and replace old.
366 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368 newfile = os.fdopen(fd, 'w')
370 newfile.write(''.join(save_lines))
373 logger.Debug("Wrote new known_hosts.")
374 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
377 # Simply appending a new line will do the trick.
379 for add in add_lines:
385 def _HasValidVG(vglist, vgname):
386 """Checks if the volume group list is valid.
388 A non-None return value means there's an error, and the return value
389 is the error message.
392 vgsize = vglist.get(vgname, None)
394 return "volume group '%s' missing" % vgname
396 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
401 def _InitSSHSetup(node):
402 """Setup the SSH configuration for the cluster.
405 This generates a dsa keypair for root, adds the pub key to the
406 permitted hosts and adds the hostkey to its own known hosts.
409 node: the name of this host as a fqdn
412 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414 for name in priv_key, pub_key:
415 if os.path.exists(name):
416 utils.CreateBackup(name)
417 utils.RemoveFile(name)
419 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
423 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
426 f = open(pub_key, 'r')
428 utils.AddAuthorizedKey(auth_keys, f.read(8192))
433 def _InitGanetiServerSetup(ss):
434 """Setup the necessary configuration for the initial node daemon.
436 This creates the nodepass file containing the shared password for
437 the cluster and also generates the SSL certificate.
440 # Create pseudo random password
441 randpass = sha.new(os.urandom(64)).hexdigest()
442 # and write it into sstore
443 ss.SetKey(ss.SS_NODED_PASS, randpass)
445 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446 "-days", str(365*5), "-nodes", "-x509",
447 "-keyout", constants.SSL_CERT_FILE,
448 "-out", constants.SSL_CERT_FILE, "-batch"])
450 raise errors.OpExecError("could not generate server ssl cert, command"
451 " %s had exitcode %s and error message %s" %
452 (result.cmd, result.exit_code, result.output))
454 os.chmod(constants.SSL_CERT_FILE, 0400)
456 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
459 raise errors.OpExecError("Could not start the node daemon, command %s"
460 " had exitcode %s and error %s" %
461 (result.cmd, result.exit_code, result.output))
464 def _CheckInstanceBridgesExist(instance):
465 """Check that the brigdes needed by an instance exist.
468 # check bridges existance
469 brlist = [nic.bridge for nic in instance.nics]
470 if not rpc.call_bridges_exist(instance.primary_node, brlist):
471 raise errors.OpPrereqError("one or more target bridges %s does not"
472 " exist on destination node '%s'" %
473 (brlist, instance.primary_node))
476 class LUInitCluster(LogicalUnit):
477 """Initialise the cluster.
480 HPATH = "cluster-init"
481 HTYPE = constants.HTYPE_CLUSTER
482 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483 "def_bridge", "master_netdev"]
486 def BuildHooksEnv(self):
489 Notes: Since we don't require a cluster, we must manually add
490 ourselves in the post-run node list.
493 env = {"OP_TARGET": self.op.cluster_name}
494 return env, [], [self.hostname.name]
496 def CheckPrereq(self):
497 """Verify that the passed name is a valid one.
500 if config.ConfigWriter.IsCluster():
501 raise errors.OpPrereqError("Cluster is already initialised")
503 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504 if not os.path.exists(constants.VNC_PASSWORD_FILE):
505 raise errors.OpPrereqError("Please prepare the cluster VNC"
507 constants.VNC_PASSWORD_FILE)
509 self.hostname = hostname = utils.HostInfo()
511 if hostname.ip.startswith("127."):
512 raise errors.OpPrereqError("This host's IP resolves to the private"
513 " range (%s). Please fix DNS or /etc/hosts." %
516 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
518 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519 constants.DEFAULT_NODED_PORT):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 secondary_ip = getattr(self.op, "secondary_ip", None)
526 if secondary_ip and not utils.IsValidIP(secondary_ip):
527 raise errors.OpPrereqError("Invalid secondary ip given")
529 secondary_ip != hostname.ip and
530 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531 constants.DEFAULT_NODED_PORT))):
532 raise errors.OpPrereqError("You gave %s as secondary IP,"
533 " but it does not belong to this host." %
535 self.secondary_ip = secondary_ip
537 # checks presence of the volume group given
538 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
541 raise errors.OpPrereqError("Error: %s" % vgstatus)
543 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
545 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
548 if self.op.hypervisor_type not in constants.HYPER_TYPES:
549 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550 self.op.hypervisor_type)
552 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
554 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555 (self.op.master_netdev,
556 result.output.strip()))
558 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560 raise errors.OpPrereqError("Init.d script '%s' missing or not"
561 " executable." % constants.NODE_INITD_SCRIPT)
563 def Exec(self, feedback_fn):
564 """Initialize the cluster.
567 clustername = self.clustername
568 hostname = self.hostname
570 # set up the simple store
571 self.sstore = ss = ssconf.SimpleStore()
572 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
578 # set up the inter-node password and certificate
579 _InitGanetiServerSetup(ss)
581 # start the master ip
582 rpc.call_node_start_master(hostname.name)
584 # set up ssh config and /etc/hosts
585 f = open(constants.SSH_HOST_RSA_PUB, 'r')
590 sshkey = sshline.split(" ")[1]
592 _AddHostToEtcHosts(hostname.name)
594 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
596 _InitSSHSetup(hostname.name)
598 # init of cluster config file
599 self.cfg = cfgw = config.ConfigWriter()
600 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601 sshkey, self.op.mac_prefix,
602 self.op.vg_name, self.op.def_bridge)
605 class LUDestroyCluster(NoHooksLU):
606 """Logical unit for destroying the cluster.
611 def CheckPrereq(self):
612 """Check prerequisites.
614 This checks whether the cluster is empty.
616 Any errors are signalled by raising errors.OpPrereqError.
619 master = self.sstore.GetMasterNode()
621 nodelist = self.cfg.GetNodeList()
622 if len(nodelist) != 1 or nodelist[0] != master:
623 raise errors.OpPrereqError("There are still %d node(s) in"
624 " this cluster." % (len(nodelist) - 1))
625 instancelist = self.cfg.GetInstanceList()
627 raise errors.OpPrereqError("There are still %d instance(s) in"
628 " this cluster." % len(instancelist))
630 def Exec(self, feedback_fn):
631 """Destroys the cluster.
634 master = self.sstore.GetMasterNode()
635 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636 utils.CreateBackup(priv_key)
637 utils.CreateBackup(pub_key)
638 rpc.call_node_leave_cluster(master)
641 class LUVerifyCluster(NoHooksLU):
642 """Verifies the cluster status.
647 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648 remote_version, feedback_fn):
649 """Run multiple tests against a node.
652 - compares ganeti version
653 - checks vg existance and size > 20G
654 - checks config file checksum
655 - checks ssh to other nodes
658 node: name of the node to check
659 file_list: required list of files
660 local_cksum: dictionary of local files and their checksums
663 # compares ganeti version
664 local_version = constants.PROTOCOL_VERSION
665 if not remote_version:
666 feedback_fn(" - ERROR: connection to %s failed" % (node))
669 if local_version != remote_version:
670 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
671 (local_version, node, remote_version))
674 # checks vg existance and size > 20G
678 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
682 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
684 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
687 # checks config file checksum
690 if 'filelist' not in node_result:
692 feedback_fn(" - ERROR: node hasn't returned file checksum data")
694 remote_cksum = node_result['filelist']
695 for file_name in file_list:
696 if file_name not in remote_cksum:
698 feedback_fn(" - ERROR: file '%s' missing" % file_name)
699 elif remote_cksum[file_name] != local_cksum[file_name]:
701 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
703 if 'nodelist' not in node_result:
705 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
707 if node_result['nodelist']:
709 for node in node_result['nodelist']:
710 feedback_fn(" - ERROR: communication with node '%s': %s" %
711 (node, node_result['nodelist'][node]))
712 hyp_result = node_result.get('hypervisor', None)
713 if hyp_result is not None:
714 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
717 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718 """Verify an instance.
720 This function checks to see if the required block devices are
721 available on the instance's node.
726 instancelist = self.cfg.GetInstanceList()
727 if not instance in instancelist:
728 feedback_fn(" - ERROR: instance %s not in instance list %s" %
729 (instance, instancelist))
732 instanceconfig = self.cfg.GetInstanceInfo(instance)
733 node_current = instanceconfig.primary_node
736 instanceconfig.MapLVsByNode(node_vol_should)
738 for node in node_vol_should:
739 for volume in node_vol_should[node]:
740 if node not in node_vol_is or volume not in node_vol_is[node]:
741 feedback_fn(" - ERROR: volume %s missing on node %s" %
745 if not instanceconfig.status == 'down':
746 if not instance in node_instance[node_current]:
747 feedback_fn(" - ERROR: instance %s not running on node %s" %
748 (instance, node_current))
751 for node in node_instance:
752 if (not node == node_current):
753 if instance in node_instance[node]:
754 feedback_fn(" - ERROR: instance %s should not run on node %s" %
760 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761 """Verify if there are any unknown volumes in the cluster.
763 The .os, .swap and backup volumes are ignored. All other volumes are
769 for node in node_vol_is:
770 for volume in node_vol_is[node]:
771 if node not in node_vol_should or volume not in node_vol_should[node]:
772 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
777 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778 """Verify the list of running instances.
780 This checks what instances are running but unknown to the cluster.
784 for node in node_instance:
785 for runninginstance in node_instance[node]:
786 if runninginstance not in instancelist:
787 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
788 (runninginstance, node))
792 def CheckPrereq(self):
793 """Check prerequisites.
795 This has no prerequisites.
800 def Exec(self, feedback_fn):
801 """Verify integrity of cluster, performing various test on nodes.
805 feedback_fn("* Verifying global settings")
806 for msg in self.cfg.VerifyConfig():
807 feedback_fn(" - ERROR: %s" % msg)
809 vg_name = self.cfg.GetVGName()
810 nodelist = utils.NiceSort(self.cfg.GetNodeList())
811 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815 # FIXME: verify OS list
817 file_names = list(self.sstore.GetFileList())
818 file_names.append(constants.SSL_CERT_FILE)
819 file_names.append(constants.CLUSTER_CONF_FILE)
820 local_checksums = utils.FingerprintFiles(file_names)
822 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824 all_instanceinfo = rpc.call_instance_list(nodelist)
825 all_vglist = rpc.call_vg_list(nodelist)
826 node_verify_param = {
827 'filelist': file_names,
828 'nodelist': nodelist,
831 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832 all_rversion = rpc.call_version(nodelist)
834 for node in nodelist:
835 feedback_fn("* Verifying node %s" % node)
836 result = self._VerifyNode(node, file_names, local_checksums,
837 all_vglist[node], all_nvinfo[node],
838 all_rversion[node], feedback_fn)
842 volumeinfo = all_volumeinfo[node]
844 if isinstance(volumeinfo, basestring):
845 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
846 (node, volumeinfo[-400:].encode('string_escape')))
848 node_volume[node] = {}
849 elif not isinstance(volumeinfo, dict):
850 feedback_fn(" - ERROR: connection to %s failed" % (node,))
854 node_volume[node] = volumeinfo
857 nodeinstance = all_instanceinfo[node]
858 if type(nodeinstance) != list:
859 feedback_fn(" - ERROR: connection to %s failed" % (node,))
863 node_instance[node] = nodeinstance
867 for instance in instancelist:
868 feedback_fn("* Verifying instance %s" % instance)
869 result = self._VerifyInstance(instance, node_volume, node_instance,
873 inst_config = self.cfg.GetInstanceInfo(instance)
875 inst_config.MapLVsByNode(node_vol_should)
877 feedback_fn("* Verifying orphan volumes")
878 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
882 feedback_fn("* Verifying remaining instances")
883 result = self._VerifyOrphanInstances(instancelist, node_instance,
890 class LUVerifyDisks(NoHooksLU):
891 """Verifies the cluster disks status.
896 def CheckPrereq(self):
897 """Check prerequisites.
899 This has no prerequisites.
904 def Exec(self, feedback_fn):
905 """Verify integrity of cluster disks.
908 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
910 vg_name = self.cfg.GetVGName()
911 nodes = utils.NiceSort(self.cfg.GetNodeList())
912 instances = [self.cfg.GetInstanceInfo(name)
913 for name in self.cfg.GetInstanceList()]
916 for inst in instances:
918 if (inst.status != "up" or
919 inst.disk_template not in constants.DTS_NET_MIRROR):
921 inst.MapLVsByNode(inst_lvs)
922 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923 for node, vol_list in inst_lvs.iteritems():
925 nv_dict[(node, vol)] = inst
930 node_lvs = rpc.call_volume_list(nodes, vg_name)
937 if isinstance(lvs, basestring):
938 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
940 elif not isinstance(lvs, dict):
941 logger.Info("connection to node %s failed or invalid data returned" %
943 res_nodes.append(node)
946 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947 inst = nv_dict.pop((node, lv_name), None)
948 if (not lv_online and inst is not None
949 and inst.name not in res_instances):
950 res_instances.append(inst.name)
952 # any leftover items in nv_dict are missing LVs, let's arrange the
954 for key, inst in nv_dict.iteritems():
955 if inst.name not in res_missing:
956 res_missing[inst.name] = []
957 res_missing[inst.name].append(key)
962 class LURenameCluster(LogicalUnit):
963 """Rename the cluster.
966 HPATH = "cluster-rename"
967 HTYPE = constants.HTYPE_CLUSTER
970 def BuildHooksEnv(self):
975 "OP_TARGET": self.op.sstore.GetClusterName(),
976 "NEW_NAME": self.op.name,
978 mn = self.sstore.GetMasterNode()
979 return env, [mn], [mn]
981 def CheckPrereq(self):
982 """Verify that the passed name is a valid one.
985 hostname = utils.HostInfo(self.op.name)
987 new_name = hostname.name
988 self.ip = new_ip = hostname.ip
989 old_name = self.sstore.GetClusterName()
990 old_ip = self.sstore.GetMasterIP()
991 if new_name == old_name and new_ip == old_ip:
992 raise errors.OpPrereqError("Neither the name nor the IP address of the"
993 " cluster has changed")
995 result = utils.RunCmd(["fping", "-q", new_ip])
996 if not result.failed:
997 raise errors.OpPrereqError("The given cluster IP address (%s) is"
998 " reachable on the network. Aborting." %
1001 self.op.name = new_name
1003 def Exec(self, feedback_fn):
1004 """Rename the cluster.
1007 clustername = self.op.name
1011 # shutdown the master IP
1012 master = ss.GetMasterNode()
1013 if not rpc.call_node_stop_master(master):
1014 raise errors.OpExecError("Could not disable the master role")
1018 ss.SetKey(ss.SS_MASTER_IP, ip)
1019 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1021 # Distribute updated ss config to all nodes
1022 myself = self.cfg.GetNodeInfo(master)
1023 dist_nodes = self.cfg.GetNodeList()
1024 if myself.name in dist_nodes:
1025 dist_nodes.remove(myself.name)
1027 logger.Debug("Copying updated ssconf data to all nodes")
1028 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029 fname = ss.KeyToFilename(keyname)
1030 result = rpc.call_upload_file(dist_nodes, fname)
1031 for to_node in dist_nodes:
1032 if not result[to_node]:
1033 logger.Error("copy of file %s to node %s failed" %
1036 if not rpc.call_node_start_master(master):
1037 logger.Error("Could not re-enable the master role on the master,"
1038 " please restart manually.")
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042 """Sleep and poll for an instance's disk to sync.
1045 if not instance.disks:
1049 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1051 node = instance.primary_node
1053 for dev in instance.disks:
1054 cfgw.SetDiskID(dev, node)
1060 cumul_degraded = False
1061 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1063 proc.LogWarning("Can't get any data from node %s" % node)
1066 raise errors.RemoteError("Can't contact node %s for mirror data,"
1067 " aborting." % node)
1071 for i in range(len(rstats)):
1074 proc.LogWarning("Can't compute data for node %s/%s" %
1075 (node, instance.disks[i].iv_name))
1077 # we ignore the ldisk parameter
1078 perc_done, est_time, is_degraded, _ = mstat
1079 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080 if perc_done is not None:
1082 if est_time is not None:
1083 rem_time = "%d estimated seconds remaining" % est_time
1086 rem_time = "no time estimate"
1087 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088 (instance.disks[i].iv_name, perc_done, rem_time))
1095 time.sleep(min(60, max_time))
1101 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102 return not cumul_degraded
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106 """Check that mirrors are not degraded.
1108 The ldisk parameter, if True, will change the test from the
1109 is_degraded attribute (which represents overall non-ok status for
1110 the device(s)) to the ldisk (representing the local storage status).
1113 cfgw.SetDiskID(dev, node)
1120 if on_primary or dev.AssembleOnSecondary():
1121 rstats = rpc.call_blockdev_find(node, dev)
1123 logger.ToStderr("Can't get any data from node %s" % node)
1126 result = result and (not rstats[idx])
1128 for child in dev.children:
1129 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1134 class LUDiagnoseOS(NoHooksLU):
1135 """Logical unit for OS diagnose/query.
1140 def CheckPrereq(self):
1141 """Check prerequisites.
1143 This always succeeds, since this is a pure query LU.
1148 def Exec(self, feedback_fn):
1149 """Compute the list of OSes.
1152 node_list = self.cfg.GetNodeList()
1153 node_data = rpc.call_os_diagnose(node_list)
1154 if node_data == False:
1155 raise errors.OpExecError("Can't gather the list of OSes")
1159 class LURemoveNode(LogicalUnit):
1160 """Logical unit for removing a node.
1163 HPATH = "node-remove"
1164 HTYPE = constants.HTYPE_NODE
1165 _OP_REQP = ["node_name"]
1167 def BuildHooksEnv(self):
1170 This doesn't run on the target node in the pre phase as a failed
1171 node would not allows itself to run.
1175 "OP_TARGET": self.op.node_name,
1176 "NODE_NAME": self.op.node_name,
1178 all_nodes = self.cfg.GetNodeList()
1179 all_nodes.remove(self.op.node_name)
1180 return env, all_nodes, all_nodes
1182 def CheckPrereq(self):
1183 """Check prerequisites.
1186 - the node exists in the configuration
1187 - it does not have primary or secondary instances
1188 - it's not the master
1190 Any errors are signalled by raising errors.OpPrereqError.
1193 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1195 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1197 instance_list = self.cfg.GetInstanceList()
1199 masternode = self.sstore.GetMasterNode()
1200 if node.name == masternode:
1201 raise errors.OpPrereqError("Node is the master node,"
1202 " you need to failover first.")
1204 for instance_name in instance_list:
1205 instance = self.cfg.GetInstanceInfo(instance_name)
1206 if node.name == instance.primary_node:
1207 raise errors.OpPrereqError("Instance %s still running on the node,"
1208 " please remove first." % instance_name)
1209 if node.name in instance.secondary_nodes:
1210 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211 " please remove first." % instance_name)
1212 self.op.node_name = node.name
1215 def Exec(self, feedback_fn):
1216 """Removes the node from the cluster.
1220 logger.Info("stopping the node daemon and removing configs from node %s" %
1223 rpc.call_node_leave_cluster(node.name)
1225 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1227 logger.Info("Removing node %s from config" % node.name)
1229 self.cfg.RemoveNode(node.name)
1231 _RemoveHostFromEtcHosts(node.name)
1234 class LUQueryNodes(NoHooksLU):
1235 """Logical unit for querying nodes.
1238 _OP_REQP = ["output_fields", "names"]
1240 def CheckPrereq(self):
1241 """Check prerequisites.
1243 This checks that the fields required are valid output fields.
1246 self.dynamic_fields = frozenset(["dtotal", "dfree",
1247 "mtotal", "mnode", "mfree",
1250 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251 "pinst_list", "sinst_list",
1253 dynamic=self.dynamic_fields,
1254 selected=self.op.output_fields)
1256 self.wanted = _GetWantedNodes(self, self.op.names)
1258 def Exec(self, feedback_fn):
1259 """Computes the list of nodes and their attributes.
1262 nodenames = self.wanted
1263 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1265 # begin data gathering
1267 if self.dynamic_fields.intersection(self.op.output_fields):
1269 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270 for name in nodenames:
1271 nodeinfo = node_data.get(name, None)
1274 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279 "bootid": nodeinfo['bootid'],
1282 live_data[name] = {}
1284 live_data = dict.fromkeys(nodenames, {})
1286 node_to_primary = dict([(name, set()) for name in nodenames])
1287 node_to_secondary = dict([(name, set()) for name in nodenames])
1289 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290 "sinst_cnt", "sinst_list"))
1291 if inst_fields & frozenset(self.op.output_fields):
1292 instancelist = self.cfg.GetInstanceList()
1294 for instance_name in instancelist:
1295 inst = self.cfg.GetInstanceInfo(instance_name)
1296 if inst.primary_node in node_to_primary:
1297 node_to_primary[inst.primary_node].add(inst.name)
1298 for secnode in inst.secondary_nodes:
1299 if secnode in node_to_secondary:
1300 node_to_secondary[secnode].add(inst.name)
1302 # end data gathering
1305 for node in nodelist:
1307 for field in self.op.output_fields:
1310 elif field == "pinst_list":
1311 val = list(node_to_primary[node.name])
1312 elif field == "sinst_list":
1313 val = list(node_to_secondary[node.name])
1314 elif field == "pinst_cnt":
1315 val = len(node_to_primary[node.name])
1316 elif field == "sinst_cnt":
1317 val = len(node_to_secondary[node.name])
1318 elif field == "pip":
1319 val = node.primary_ip
1320 elif field == "sip":
1321 val = node.secondary_ip
1322 elif field in self.dynamic_fields:
1323 val = live_data[node.name].get(field, None)
1325 raise errors.ParameterError(field)
1326 node_output.append(val)
1327 output.append(node_output)
1332 class LUQueryNodeVolumes(NoHooksLU):
1333 """Logical unit for getting volumes on node(s).
1336 _OP_REQP = ["nodes", "output_fields"]
1338 def CheckPrereq(self):
1339 """Check prerequisites.
1341 This checks that the fields required are valid output fields.
1344 self.nodes = _GetWantedNodes(self, self.op.nodes)
1346 _CheckOutputFields(static=["node"],
1347 dynamic=["phys", "vg", "name", "size", "instance"],
1348 selected=self.op.output_fields)
1351 def Exec(self, feedback_fn):
1352 """Computes the list of nodes and their attributes.
1355 nodenames = self.nodes
1356 volumes = rpc.call_node_volumes(nodenames)
1358 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359 in self.cfg.GetInstanceList()]
1361 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1364 for node in nodenames:
1365 if node not in volumes or not volumes[node]:
1368 node_vols = volumes[node][:]
1369 node_vols.sort(key=lambda vol: vol['dev'])
1371 for vol in node_vols:
1373 for field in self.op.output_fields:
1376 elif field == "phys":
1380 elif field == "name":
1382 elif field == "size":
1383 val = int(float(vol['size']))
1384 elif field == "instance":
1386 if node not in lv_by_node[inst]:
1388 if vol['name'] in lv_by_node[inst][node]:
1394 raise errors.ParameterError(field)
1395 node_output.append(str(val))
1397 output.append(node_output)
1402 class LUAddNode(LogicalUnit):
1403 """Logical unit for adding node to the cluster.
1407 HTYPE = constants.HTYPE_NODE
1408 _OP_REQP = ["node_name"]
1410 def BuildHooksEnv(self):
1413 This will run on all nodes before, and on all nodes + the new node after.
1417 "OP_TARGET": self.op.node_name,
1418 "NODE_NAME": self.op.node_name,
1419 "NODE_PIP": self.op.primary_ip,
1420 "NODE_SIP": self.op.secondary_ip,
1422 nodes_0 = self.cfg.GetNodeList()
1423 nodes_1 = nodes_0 + [self.op.node_name, ]
1424 return env, nodes_0, nodes_1
1426 def CheckPrereq(self):
1427 """Check prerequisites.
1430 - the new node is not already in the config
1432 - its parameters (single/dual homed) matches the cluster
1434 Any errors are signalled by raising errors.OpPrereqError.
1437 node_name = self.op.node_name
1440 dns_data = utils.HostInfo(node_name)
1442 node = dns_data.name
1443 primary_ip = self.op.primary_ip = dns_data.ip
1444 secondary_ip = getattr(self.op, "secondary_ip", None)
1445 if secondary_ip is None:
1446 secondary_ip = primary_ip
1447 if not utils.IsValidIP(secondary_ip):
1448 raise errors.OpPrereqError("Invalid secondary IP given")
1449 self.op.secondary_ip = secondary_ip
1450 node_list = cfg.GetNodeList()
1451 if node in node_list:
1452 raise errors.OpPrereqError("Node %s is already in the configuration"
1455 for existing_node_name in node_list:
1456 existing_node = cfg.GetNodeInfo(existing_node_name)
1457 if (existing_node.primary_ip == primary_ip or
1458 existing_node.secondary_ip == primary_ip or
1459 existing_node.primary_ip == secondary_ip or
1460 existing_node.secondary_ip == secondary_ip):
1461 raise errors.OpPrereqError("New node ip address(es) conflict with"
1462 " existing node %s" % existing_node.name)
1464 # check that the type of the node (single versus dual homed) is the
1465 # same as for the master
1466 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467 master_singlehomed = myself.secondary_ip == myself.primary_ip
1468 newbie_singlehomed = secondary_ip == primary_ip
1469 if master_singlehomed != newbie_singlehomed:
1470 if master_singlehomed:
1471 raise errors.OpPrereqError("The master has no private ip but the"
1472 " new node has one")
1474 raise errors.OpPrereqError("The master has a private ip but the"
1475 " new node doesn't have one")
1477 # checks reachablity
1478 if not utils.TcpPing(utils.HostInfo().name,
1480 constants.DEFAULT_NODED_PORT):
1481 raise errors.OpPrereqError("Node not reachable by ping")
1483 if not newbie_singlehomed:
1484 # check reachability from my secondary ip to newbie's secondary ip
1485 if not utils.TcpPing(myself.secondary_ip,
1487 constants.DEFAULT_NODED_PORT):
1488 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489 " based ping to noded port")
1491 self.new_node = objects.Node(name=node,
1492 primary_ip=primary_ip,
1493 secondary_ip=secondary_ip)
1495 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498 constants.VNC_PASSWORD_FILE)
1500 def Exec(self, feedback_fn):
1501 """Adds the new node to the cluster.
1504 new_node = self.new_node
1505 node = new_node.name
1507 # set up inter-node password and certificate and restarts the node daemon
1508 gntpass = self.sstore.GetNodeDaemonPassword()
1509 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510 raise errors.OpExecError("ganeti password corruption detected")
1511 f = open(constants.SSL_CERT_FILE)
1513 gntpem = f.read(8192)
1516 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517 # so we use this to detect an invalid certificate; as long as the
1518 # cert doesn't contain this, the here-document will be correctly
1519 # parsed by the shell sequence below
1520 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522 if not gntpem.endswith("\n"):
1523 raise errors.OpExecError("PEM must end with newline")
1524 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1526 # and then connect with ssh to set password and start ganeti-noded
1527 # note that all the below variables are sanitized at this point,
1528 # either by being constants or by the checks above
1530 mycommand = ("umask 077 && "
1531 "echo '%s' > '%s' && "
1532 "cat > '%s' << '!EOF.' && \n"
1533 "%s!EOF.\n%s restart" %
1534 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535 constants.SSL_CERT_FILE, gntpem,
1536 constants.NODE_INITD_SCRIPT))
1538 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1540 raise errors.OpExecError("Remote command on node %s, error: %s,"
1542 (node, result.fail_reason, result.output))
1544 # check connectivity
1547 result = rpc.call_version([node])[node]
1549 if constants.PROTOCOL_VERSION == result:
1550 logger.Info("communication to node %s fine, sw version %s match" %
1553 raise errors.OpExecError("Version mismatch master version %s,"
1554 " node version %s" %
1555 (constants.PROTOCOL_VERSION, result))
1557 raise errors.OpExecError("Cannot get version from the new node")
1560 logger.Info("copy ssh key to node %s" % node)
1561 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1563 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1570 keyarray.append(f.read())
1574 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575 keyarray[3], keyarray[4], keyarray[5])
1578 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1580 # Add node to our /etc/hosts, and add key to known_hosts
1581 _AddHostToEtcHosts(new_node.name)
1583 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584 self.cfg.GetHostKey())
1586 if new_node.secondary_ip != new_node.primary_ip:
1587 if not rpc.call_node_tcp_ping(new_node.name,
1588 constants.LOCALHOST_IP_ADDRESS,
1589 new_node.secondary_ip,
1590 constants.DEFAULT_NODED_PORT,
1592 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593 " you gave (%s). Please fix and re-run this"
1594 " command." % new_node.secondary_ip)
1596 success, msg = ssh.VerifyNodeHostname(node)
1598 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599 " than the one the resolver gives: %s."
1600 " Please fix and re-run this command." %
1603 # Distribute updated /etc/hosts and known_hosts to all nodes,
1604 # including the node just added
1605 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606 dist_nodes = self.cfg.GetNodeList() + [node]
1607 if myself.name in dist_nodes:
1608 dist_nodes.remove(myself.name)
1610 logger.Debug("Copying hosts and known_hosts to all nodes")
1611 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612 result = rpc.call_upload_file(dist_nodes, fname)
1613 for to_node in dist_nodes:
1614 if not result[to_node]:
1615 logger.Error("copy of file %s to node %s failed" %
1618 to_copy = ss.GetFileList()
1619 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620 to_copy.append(constants.VNC_PASSWORD_FILE)
1621 for fname in to_copy:
1622 if not ssh.CopyFileToNode(node, fname):
1623 logger.Error("could not copy file %s to node %s" % (fname, node))
1625 logger.Info("adding node %s to cluster.conf" % node)
1626 self.cfg.AddNode(new_node)
1629 class LUMasterFailover(LogicalUnit):
1630 """Failover the master node to the current node.
1632 This is a special LU in that it must run on a non-master node.
1635 HPATH = "master-failover"
1636 HTYPE = constants.HTYPE_CLUSTER
1640 def BuildHooksEnv(self):
1643 This will run on the new master only in the pre phase, and on all
1644 the nodes in the post phase.
1648 "OP_TARGET": self.new_master,
1649 "NEW_MASTER": self.new_master,
1650 "OLD_MASTER": self.old_master,
1652 return env, [self.new_master], self.cfg.GetNodeList()
1654 def CheckPrereq(self):
1655 """Check prerequisites.
1657 This checks that we are not already the master.
1660 self.new_master = utils.HostInfo().name
1661 self.old_master = self.sstore.GetMasterNode()
1663 if self.old_master == self.new_master:
1664 raise errors.OpPrereqError("This commands must be run on the node"
1665 " where you want the new master to be."
1666 " %s is already the master" %
1669 def Exec(self, feedback_fn):
1670 """Failover the master node.
1672 This command, when run on a non-master node, will cause the current
1673 master to cease being master, and the non-master to become new
1677 #TODO: do not rely on gethostname returning the FQDN
1678 logger.Info("setting master to %s, old master: %s" %
1679 (self.new_master, self.old_master))
1681 if not rpc.call_node_stop_master(self.old_master):
1682 logger.Error("could disable the master role on the old master"
1683 " %s, please disable manually" % self.old_master)
1686 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689 logger.Error("could not distribute the new simple store master file"
1690 " to the other nodes, please check.")
1692 if not rpc.call_node_start_master(self.new_master):
1693 logger.Error("could not start the master role on the new master"
1694 " %s, please check" % self.new_master)
1695 feedback_fn("Error in activating the master IP on the new master,"
1696 " please fix manually.")
1700 class LUQueryClusterInfo(NoHooksLU):
1701 """Query cluster configuration.
1707 def CheckPrereq(self):
1708 """No prerequsites needed for this LU.
1713 def Exec(self, feedback_fn):
1714 """Return cluster config.
1718 "name": self.sstore.GetClusterName(),
1719 "software_version": constants.RELEASE_VERSION,
1720 "protocol_version": constants.PROTOCOL_VERSION,
1721 "config_version": constants.CONFIG_VERSION,
1722 "os_api_version": constants.OS_API_VERSION,
1723 "export_version": constants.EXPORT_VERSION,
1724 "master": self.sstore.GetMasterNode(),
1725 "architecture": (platform.architecture()[0], platform.machine()),
1731 class LUClusterCopyFile(NoHooksLU):
1732 """Copy file to cluster.
1735 _OP_REQP = ["nodes", "filename"]
1737 def CheckPrereq(self):
1738 """Check prerequisites.
1740 It should check that the named file exists and that the given list
1744 if not os.path.exists(self.op.filename):
1745 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1747 self.nodes = _GetWantedNodes(self, self.op.nodes)
1749 def Exec(self, feedback_fn):
1750 """Copy a file from master to some nodes.
1753 opts - class with options as members
1754 args - list containing a single element, the file name
1756 nodes - list containing the name of target nodes; if empty, all nodes
1759 filename = self.op.filename
1761 myname = utils.HostInfo().name
1763 for node in self.nodes:
1766 if not ssh.CopyFileToNode(node, filename):
1767 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1770 class LUDumpClusterConfig(NoHooksLU):
1771 """Return a text-representation of the cluster-config.
1776 def CheckPrereq(self):
1777 """No prerequisites.
1782 def Exec(self, feedback_fn):
1783 """Dump a representation of the cluster config to the standard output.
1786 return self.cfg.DumpConfig()
1789 class LURunClusterCommand(NoHooksLU):
1790 """Run a command on some nodes.
1793 _OP_REQP = ["command", "nodes"]
1795 def CheckPrereq(self):
1796 """Check prerequisites.
1798 It checks that the given list of nodes is valid.
1801 self.nodes = _GetWantedNodes(self, self.op.nodes)
1803 def Exec(self, feedback_fn):
1804 """Run a command on some nodes.
1808 for node in self.nodes:
1809 result = ssh.SSHCall(node, "root", self.op.command)
1810 data.append((node, result.output, result.exit_code))
1815 class LUActivateInstanceDisks(NoHooksLU):
1816 """Bring up an instance's disks.
1819 _OP_REQP = ["instance_name"]
1821 def CheckPrereq(self):
1822 """Check prerequisites.
1824 This checks that the instance is in the cluster.
1827 instance = self.cfg.GetInstanceInfo(
1828 self.cfg.ExpandInstanceName(self.op.instance_name))
1829 if instance is None:
1830 raise errors.OpPrereqError("Instance '%s' not known" %
1831 self.op.instance_name)
1832 self.instance = instance
1835 def Exec(self, feedback_fn):
1836 """Activate the disks.
1839 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1841 raise errors.OpExecError("Cannot activate block devices")
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847 """Prepare the block devices for an instance.
1849 This sets up the block devices on all nodes.
1852 instance: a ganeti.objects.Instance object
1853 ignore_secondaries: if true, errors on secondary nodes won't result
1854 in an error return from the function
1857 false if the operation failed
1858 list of (host, instance_visible_name, node_visible_name) if the operation
1859 suceeded with the mapping from node devices to instance devices
1863 for inst_disk in instance.disks:
1864 master_result = None
1865 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1866 cfg.SetDiskID(node_disk, node)
1867 is_primary = node == instance.primary_node
1868 result = rpc.call_blockdev_assemble(node, node_disk,
1869 instance.name, is_primary)
1871 logger.Error("could not prepare block device %s on node %s"
1872 " (is_primary=%s)" %
1873 (inst_disk.iv_name, node, is_primary))
1874 if is_primary or not ignore_secondaries:
1877 master_result = result
1878 device_info.append((instance.primary_node, inst_disk.iv_name,
1881 # leave the disks configured for the primary node
1882 # this is a workaround that would be fixed better by
1883 # improving the logical/physical id handling
1884 for disk in instance.disks:
1885 cfg.SetDiskID(disk, instance.primary_node)
1887 return disks_ok, device_info
1890 def _StartInstanceDisks(cfg, instance, force):
1891 """Start the disks of an instance.
1894 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1895 ignore_secondaries=force)
1897 _ShutdownInstanceDisks(instance, cfg)
1898 if force is not None and not force:
1899 logger.Error("If the message above refers to a secondary node,"
1900 " you can retry the operation using '--force'.")
1901 raise errors.OpExecError("Disk consistency error")
1904 class LUDeactivateInstanceDisks(NoHooksLU):
1905 """Shutdown an instance's disks.
1908 _OP_REQP = ["instance_name"]
1910 def CheckPrereq(self):
1911 """Check prerequisites.
1913 This checks that the instance is in the cluster.
1916 instance = self.cfg.GetInstanceInfo(
1917 self.cfg.ExpandInstanceName(self.op.instance_name))
1918 if instance is None:
1919 raise errors.OpPrereqError("Instance '%s' not known" %
1920 self.op.instance_name)
1921 self.instance = instance
1923 def Exec(self, feedback_fn):
1924 """Deactivate the disks
1927 instance = self.instance
1928 ins_l = rpc.call_instance_list([instance.primary_node])
1929 ins_l = ins_l[instance.primary_node]
1930 if not type(ins_l) is list:
1931 raise errors.OpExecError("Can't contact node '%s'" %
1932 instance.primary_node)
1934 if self.instance.name in ins_l:
1935 raise errors.OpExecError("Instance is running, can't shutdown"
1938 _ShutdownInstanceDisks(instance, self.cfg)
1941 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1942 """Shutdown block devices of an instance.
1944 This does the shutdown on all nodes of the instance.
1946 If the ignore_primary is false, errors on the primary node are
1951 for disk in instance.disks:
1952 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1953 cfg.SetDiskID(top_disk, node)
1954 if not rpc.call_blockdev_shutdown(node, top_disk):
1955 logger.Error("could not shutdown block device %s on node %s" %
1956 (disk.iv_name, node))
1957 if not ignore_primary or node != instance.primary_node:
1962 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1963 """Checks if a node has enough free memory.
1965 This function check if a given node has the needed amount of free
1966 memory. In case the node has less memory or we cannot get the
1967 information from the node, this function raise an OpPrereqError
1971 - cfg: a ConfigWriter instance
1972 - node: the node name
1973 - reason: string to use in the error message
1974 - requested: the amount of memory in MiB
1977 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1978 if not nodeinfo or not isinstance(nodeinfo, dict):
1979 raise errors.OpPrereqError("Could not contact node %s for resource"
1980 " information" % (node,))
1982 free_mem = nodeinfo[node].get('memory_free')
1983 if not isinstance(free_mem, int):
1984 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1985 " was '%s'" % (node, free_mem))
1986 if requested > free_mem:
1987 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1988 " needed %s MiB, available %s MiB" %
1989 (node, reason, requested, free_mem))
1992 class LUStartupInstance(LogicalUnit):
1993 """Starts an instance.
1996 HPATH = "instance-start"
1997 HTYPE = constants.HTYPE_INSTANCE
1998 _OP_REQP = ["instance_name", "force"]
2000 def BuildHooksEnv(self):
2003 This runs on master, primary and secondary nodes of the instance.
2007 "FORCE": self.op.force,
2009 env.update(_BuildInstanceHookEnvByObject(self.instance))
2010 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2011 list(self.instance.secondary_nodes))
2014 def CheckPrereq(self):
2015 """Check prerequisites.
2017 This checks that the instance is in the cluster.
2020 instance = self.cfg.GetInstanceInfo(
2021 self.cfg.ExpandInstanceName(self.op.instance_name))
2022 if instance is None:
2023 raise errors.OpPrereqError("Instance '%s' not known" %
2024 self.op.instance_name)
2026 # check bridges existance
2027 _CheckInstanceBridgesExist(instance)
2029 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2030 "starting instance %s" % instance.name,
2033 self.instance = instance
2034 self.op.instance_name = instance.name
2036 def Exec(self, feedback_fn):
2037 """Start the instance.
2040 instance = self.instance
2041 force = self.op.force
2042 extra_args = getattr(self.op, "extra_args", "")
2044 node_current = instance.primary_node
2046 _StartInstanceDisks(self.cfg, instance, force)
2048 if not rpc.call_instance_start(node_current, instance, extra_args):
2049 _ShutdownInstanceDisks(instance, self.cfg)
2050 raise errors.OpExecError("Could not start instance")
2052 self.cfg.MarkInstanceUp(instance.name)
2055 class LURebootInstance(LogicalUnit):
2056 """Reboot an instance.
2059 HPATH = "instance-reboot"
2060 HTYPE = constants.HTYPE_INSTANCE
2061 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2063 def BuildHooksEnv(self):
2066 This runs on master, primary and secondary nodes of the instance.
2070 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2072 env.update(_BuildInstanceHookEnvByObject(self.instance))
2073 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2074 list(self.instance.secondary_nodes))
2077 def CheckPrereq(self):
2078 """Check prerequisites.
2080 This checks that the instance is in the cluster.
2083 instance = self.cfg.GetInstanceInfo(
2084 self.cfg.ExpandInstanceName(self.op.instance_name))
2085 if instance is None:
2086 raise errors.OpPrereqError("Instance '%s' not known" %
2087 self.op.instance_name)
2089 # check bridges existance
2090 _CheckInstanceBridgesExist(instance)
2092 self.instance = instance
2093 self.op.instance_name = instance.name
2095 def Exec(self, feedback_fn):
2096 """Reboot the instance.
2099 instance = self.instance
2100 ignore_secondaries = self.op.ignore_secondaries
2101 reboot_type = self.op.reboot_type
2102 extra_args = getattr(self.op, "extra_args", "")
2104 node_current = instance.primary_node
2106 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2107 constants.INSTANCE_REBOOT_HARD,
2108 constants.INSTANCE_REBOOT_FULL]:
2109 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2110 (constants.INSTANCE_REBOOT_SOFT,
2111 constants.INSTANCE_REBOOT_HARD,
2112 constants.INSTANCE_REBOOT_FULL))
2114 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2115 constants.INSTANCE_REBOOT_HARD]:
2116 if not rpc.call_instance_reboot(node_current, instance,
2117 reboot_type, extra_args):
2118 raise errors.OpExecError("Could not reboot instance")
2120 if not rpc.call_instance_shutdown(node_current, instance):
2121 raise errors.OpExecError("could not shutdown instance for full reboot")
2122 _ShutdownInstanceDisks(instance, self.cfg)
2123 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2124 if not rpc.call_instance_start(node_current, instance, extra_args):
2125 _ShutdownInstanceDisks(instance, self.cfg)
2126 raise errors.OpExecError("Could not start instance for full reboot")
2128 self.cfg.MarkInstanceUp(instance.name)
2131 class LUShutdownInstance(LogicalUnit):
2132 """Shutdown an instance.
2135 HPATH = "instance-stop"
2136 HTYPE = constants.HTYPE_INSTANCE
2137 _OP_REQP = ["instance_name"]
2139 def BuildHooksEnv(self):
2142 This runs on master, primary and secondary nodes of the instance.
2145 env = _BuildInstanceHookEnvByObject(self.instance)
2146 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2147 list(self.instance.secondary_nodes))
2150 def CheckPrereq(self):
2151 """Check prerequisites.
2153 This checks that the instance is in the cluster.
2156 instance = self.cfg.GetInstanceInfo(
2157 self.cfg.ExpandInstanceName(self.op.instance_name))
2158 if instance is None:
2159 raise errors.OpPrereqError("Instance '%s' not known" %
2160 self.op.instance_name)
2161 self.instance = instance
2163 def Exec(self, feedback_fn):
2164 """Shutdown the instance.
2167 instance = self.instance
2168 node_current = instance.primary_node
2169 if not rpc.call_instance_shutdown(node_current, instance):
2170 logger.Error("could not shutdown instance")
2172 self.cfg.MarkInstanceDown(instance.name)
2173 _ShutdownInstanceDisks(instance, self.cfg)
2176 class LUReinstallInstance(LogicalUnit):
2177 """Reinstall an instance.
2180 HPATH = "instance-reinstall"
2181 HTYPE = constants.HTYPE_INSTANCE
2182 _OP_REQP = ["instance_name"]
2184 def BuildHooksEnv(self):
2187 This runs on master, primary and secondary nodes of the instance.
2190 env = _BuildInstanceHookEnvByObject(self.instance)
2191 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2192 list(self.instance.secondary_nodes))
2195 def CheckPrereq(self):
2196 """Check prerequisites.
2198 This checks that the instance is in the cluster and is not running.
2201 instance = self.cfg.GetInstanceInfo(
2202 self.cfg.ExpandInstanceName(self.op.instance_name))
2203 if instance is None:
2204 raise errors.OpPrereqError("Instance '%s' not known" %
2205 self.op.instance_name)
2206 if instance.disk_template == constants.DT_DISKLESS:
2207 raise errors.OpPrereqError("Instance '%s' has no disks" %
2208 self.op.instance_name)
2209 if instance.status != "down":
2210 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2211 self.op.instance_name)
2212 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2214 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2215 (self.op.instance_name,
2216 instance.primary_node))
2218 self.op.os_type = getattr(self.op, "os_type", None)
2219 if self.op.os_type is not None:
2221 pnode = self.cfg.GetNodeInfo(
2222 self.cfg.ExpandNodeName(instance.primary_node))
2224 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2226 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2228 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2229 " primary node" % self.op.os_type)
2231 self.instance = instance
2233 def Exec(self, feedback_fn):
2234 """Reinstall the instance.
2237 inst = self.instance
2239 if self.op.os_type is not None:
2240 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2241 inst.os = self.op.os_type
2242 self.cfg.AddInstance(inst)
2244 _StartInstanceDisks(self.cfg, inst, None)
2246 feedback_fn("Running the instance OS create scripts...")
2247 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2248 raise errors.OpExecError("Could not install OS for instance %s"
2250 (inst.name, inst.primary_node))
2252 _ShutdownInstanceDisks(inst, self.cfg)
2255 class LURenameInstance(LogicalUnit):
2256 """Rename an instance.
2259 HPATH = "instance-rename"
2260 HTYPE = constants.HTYPE_INSTANCE
2261 _OP_REQP = ["instance_name", "new_name"]
2263 def BuildHooksEnv(self):
2266 This runs on master, primary and secondary nodes of the instance.
2269 env = _BuildInstanceHookEnvByObject(self.instance)
2270 env["INSTANCE_NEW_NAME"] = self.op.new_name
2271 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272 list(self.instance.secondary_nodes))
2275 def CheckPrereq(self):
2276 """Check prerequisites.
2278 This checks that the instance is in the cluster and is not running.
2281 instance = self.cfg.GetInstanceInfo(
2282 self.cfg.ExpandInstanceName(self.op.instance_name))
2283 if instance is None:
2284 raise errors.OpPrereqError("Instance '%s' not known" %
2285 self.op.instance_name)
2286 if instance.status != "down":
2287 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2288 self.op.instance_name)
2289 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2291 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2292 (self.op.instance_name,
2293 instance.primary_node))
2294 self.instance = instance
2296 # new name verification
2297 name_info = utils.HostInfo(self.op.new_name)
2299 self.op.new_name = new_name = name_info.name
2300 if not getattr(self.op, "ignore_ip", False):
2301 command = ["fping", "-q", name_info.ip]
2302 result = utils.RunCmd(command)
2303 if not result.failed:
2304 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2305 (name_info.ip, new_name))
2308 def Exec(self, feedback_fn):
2309 """Reinstall the instance.
2312 inst = self.instance
2313 old_name = inst.name
2315 self.cfg.RenameInstance(inst.name, self.op.new_name)
2317 # re-read the instance from the configuration after rename
2318 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2320 _StartInstanceDisks(self.cfg, inst, None)
2322 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2324 msg = ("Could run OS rename script for instance %s on node %s (but the"
2325 " instance has been renamed in Ganeti)" %
2326 (inst.name, inst.primary_node))
2329 _ShutdownInstanceDisks(inst, self.cfg)
2332 class LURemoveInstance(LogicalUnit):
2333 """Remove an instance.
2336 HPATH = "instance-remove"
2337 HTYPE = constants.HTYPE_INSTANCE
2338 _OP_REQP = ["instance_name"]
2340 def BuildHooksEnv(self):
2343 This runs on master, primary and secondary nodes of the instance.
2346 env = _BuildInstanceHookEnvByObject(self.instance)
2347 nl = [self.sstore.GetMasterNode()]
2350 def CheckPrereq(self):
2351 """Check prerequisites.
2353 This checks that the instance is in the cluster.
2356 instance = self.cfg.GetInstanceInfo(
2357 self.cfg.ExpandInstanceName(self.op.instance_name))
2358 if instance is None:
2359 raise errors.OpPrereqError("Instance '%s' not known" %
2360 self.op.instance_name)
2361 self.instance = instance
2363 def Exec(self, feedback_fn):
2364 """Remove the instance.
2367 instance = self.instance
2368 logger.Info("shutting down instance %s on node %s" %
2369 (instance.name, instance.primary_node))
2371 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2372 if self.op.ignore_failures:
2373 feedback_fn("Warning: can't shutdown instance")
2375 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2376 (instance.name, instance.primary_node))
2378 logger.Info("removing block devices for instance %s" % instance.name)
2380 if not _RemoveDisks(instance, self.cfg):
2381 if self.op.ignore_failures:
2382 feedback_fn("Warning: can't remove instance's disks")
2384 raise errors.OpExecError("Can't remove instance's disks")
2386 logger.Info("removing instance %s out of cluster config" % instance.name)
2388 self.cfg.RemoveInstance(instance.name)
2391 class LUQueryInstances(NoHooksLU):
2392 """Logical unit for querying instances.
2395 _OP_REQP = ["output_fields", "names"]
2397 def CheckPrereq(self):
2398 """Check prerequisites.
2400 This checks that the fields required are valid output fields.
2403 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2404 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2405 "admin_state", "admin_ram",
2406 "disk_template", "ip", "mac", "bridge",
2407 "sda_size", "sdb_size", "vcpus"],
2408 dynamic=self.dynamic_fields,
2409 selected=self.op.output_fields)
2411 self.wanted = _GetWantedInstances(self, self.op.names)
2413 def Exec(self, feedback_fn):
2414 """Computes the list of nodes and their attributes.
2417 instance_names = self.wanted
2418 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2421 # begin data gathering
2423 nodes = frozenset([inst.primary_node for inst in instance_list])
2426 if self.dynamic_fields.intersection(self.op.output_fields):
2428 node_data = rpc.call_all_instances_info(nodes)
2430 result = node_data[name]
2432 live_data.update(result)
2433 elif result == False:
2434 bad_nodes.append(name)
2435 # else no instance is alive
2437 live_data = dict([(name, {}) for name in instance_names])
2439 # end data gathering
2442 for instance in instance_list:
2444 for field in self.op.output_fields:
2449 elif field == "pnode":
2450 val = instance.primary_node
2451 elif field == "snodes":
2452 val = list(instance.secondary_nodes)
2453 elif field == "admin_state":
2454 val = (instance.status != "down")
2455 elif field == "oper_state":
2456 if instance.primary_node in bad_nodes:
2459 val = bool(live_data.get(instance.name))
2460 elif field == "admin_ram":
2461 val = instance.memory
2462 elif field == "oper_ram":
2463 if instance.primary_node in bad_nodes:
2465 elif instance.name in live_data:
2466 val = live_data[instance.name].get("memory", "?")
2469 elif field == "disk_template":
2470 val = instance.disk_template
2472 val = instance.nics[0].ip
2473 elif field == "bridge":
2474 val = instance.nics[0].bridge
2475 elif field == "mac":
2476 val = instance.nics[0].mac
2477 elif field == "sda_size" or field == "sdb_size":
2478 disk = instance.FindDisk(field[:3])
2483 elif field == "vcpus":
2484 val = instance.vcpus
2486 raise errors.ParameterError(field)
2493 class LUFailoverInstance(LogicalUnit):
2494 """Failover an instance.
2497 HPATH = "instance-failover"
2498 HTYPE = constants.HTYPE_INSTANCE
2499 _OP_REQP = ["instance_name", "ignore_consistency"]
2501 def BuildHooksEnv(self):
2504 This runs on master, primary and secondary nodes of the instance.
2508 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2510 env.update(_BuildInstanceHookEnvByObject(self.instance))
2511 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2514 def CheckPrereq(self):
2515 """Check prerequisites.
2517 This checks that the instance is in the cluster.
2520 instance = self.cfg.GetInstanceInfo(
2521 self.cfg.ExpandInstanceName(self.op.instance_name))
2522 if instance is None:
2523 raise errors.OpPrereqError("Instance '%s' not known" %
2524 self.op.instance_name)
2526 if instance.disk_template not in constants.DTS_NET_MIRROR:
2527 raise errors.OpPrereqError("Instance's disk layout is not"
2528 " network mirrored, cannot failover.")
2530 secondary_nodes = instance.secondary_nodes
2531 if not secondary_nodes:
2532 raise errors.ProgrammerError("no secondary node but using "
2533 "DT_REMOTE_RAID1 template")
2535 target_node = secondary_nodes[0]
2536 # check memory requirements on the secondary node
2537 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2538 instance.name, instance.memory)
2540 # check bridge existance
2541 brlist = [nic.bridge for nic in instance.nics]
2542 if not rpc.call_bridges_exist(target_node, brlist):
2543 raise errors.OpPrereqError("One or more target bridges %s does not"
2544 " exist on destination node '%s'" %
2545 (brlist, target_node))
2547 self.instance = instance
2549 def Exec(self, feedback_fn):
2550 """Failover an instance.
2552 The failover is done by shutting it down on its present node and
2553 starting it on the secondary.
2556 instance = self.instance
2558 source_node = instance.primary_node
2559 target_node = instance.secondary_nodes[0]
2561 feedback_fn("* checking disk consistency between source and target")
2562 for dev in instance.disks:
2563 # for remote_raid1, these are md over drbd
2564 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2565 if not self.op.ignore_consistency:
2566 raise errors.OpExecError("Disk %s is degraded on target node,"
2567 " aborting failover." % dev.iv_name)
2569 feedback_fn("* shutting down instance on source node")
2570 logger.Info("Shutting down instance %s on node %s" %
2571 (instance.name, source_node))
2573 if not rpc.call_instance_shutdown(source_node, instance):
2574 if self.op.ignore_consistency:
2575 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576 " anyway. Please make sure node %s is down" %
2577 (instance.name, source_node, source_node))
2579 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580 (instance.name, source_node))
2582 feedback_fn("* deactivating the instance's disks on source node")
2583 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584 raise errors.OpExecError("Can't shut down the instance's disks.")
2586 instance.primary_node = target_node
2587 # distribute new instance config to the other nodes
2588 self.cfg.AddInstance(instance)
2590 feedback_fn("* activating the instance's disks on target node")
2591 logger.Info("Starting instance %s on node %s" %
2592 (instance.name, target_node))
2594 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595 ignore_secondaries=True)
2597 _ShutdownInstanceDisks(instance, self.cfg)
2598 raise errors.OpExecError("Can't activate the instance's disks")
2600 feedback_fn("* starting the instance on the target node")
2601 if not rpc.call_instance_start(target_node, instance, None):
2602 _ShutdownInstanceDisks(instance, self.cfg)
2603 raise errors.OpExecError("Could not start instance %s on node %s." %
2604 (instance.name, target_node))
2607 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608 """Create a tree of block devices on the primary node.
2610 This always creates all devices.
2614 for child in device.children:
2615 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2618 cfg.SetDiskID(device, node)
2619 new_id = rpc.call_blockdev_create(node, device, device.size,
2620 instance.name, True, info)
2623 if device.physical_id is None:
2624 device.physical_id = new_id
2628 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629 """Create a tree of block devices on a secondary node.
2631 If this device type has to be created on secondaries, create it and
2634 If not, just recurse to children keeping the same 'force' value.
2637 if device.CreateOnSecondary():
2640 for child in device.children:
2641 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642 child, force, info):
2647 cfg.SetDiskID(device, node)
2648 new_id = rpc.call_blockdev_create(node, device, device.size,
2649 instance.name, False, info)
2652 if device.physical_id is None:
2653 device.physical_id = new_id
2657 def _GenerateUniqueNames(cfg, exts):
2658 """Generate a suitable LV name.
2660 This will generate a logical volume name for the given instance.
2665 new_id = cfg.GenerateUniqueID()
2666 results.append("%s%s" % (new_id, val))
2670 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671 """Generate a drbd device complete with its children.
2674 port = cfg.AllocatePort()
2675 vgname = cfg.GetVGName()
2676 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677 logical_id=(vgname, names[0]))
2678 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679 logical_id=(vgname, names[1]))
2680 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681 logical_id = (primary, secondary, port),
2682 children = [dev_data, dev_meta])
2686 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687 """Generate a drbd8 device complete with its children.
2690 port = cfg.AllocatePort()
2691 vgname = cfg.GetVGName()
2692 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693 logical_id=(vgname, names[0]))
2694 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695 logical_id=(vgname, names[1]))
2696 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697 logical_id = (primary, secondary, port),
2698 children = [dev_data, dev_meta],
2702 def _GenerateDiskTemplate(cfg, template_name,
2703 instance_name, primary_node,
2704 secondary_nodes, disk_sz, swap_sz):
2705 """Generate the entire disk layout for a given template type.
2708 #TODO: compute space requirements
2710 vgname = cfg.GetVGName()
2711 if template_name == "diskless":
2713 elif template_name == "plain":
2714 if len(secondary_nodes) != 0:
2715 raise errors.ProgrammerError("Wrong template configuration")
2717 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719 logical_id=(vgname, names[0]),
2721 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722 logical_id=(vgname, names[1]),
2724 disks = [sda_dev, sdb_dev]
2725 elif template_name == "local_raid1":
2726 if len(secondary_nodes) != 0:
2727 raise errors.ProgrammerError("Wrong template configuration")
2730 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731 ".sdb_m1", ".sdb_m2"])
2732 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733 logical_id=(vgname, names[0]))
2734 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735 logical_id=(vgname, names[1]))
2736 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2738 children = [sda_dev_m1, sda_dev_m2])
2739 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740 logical_id=(vgname, names[2]))
2741 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742 logical_id=(vgname, names[3]))
2743 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2745 children = [sdb_dev_m1, sdb_dev_m2])
2746 disks = [md_sda_dev, md_sdb_dev]
2747 elif template_name == constants.DT_REMOTE_RAID1:
2748 if len(secondary_nodes) != 1:
2749 raise errors.ProgrammerError("Wrong template configuration")
2750 remote_node = secondary_nodes[0]
2751 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752 ".sdb_data", ".sdb_meta"])
2753 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754 disk_sz, names[0:2])
2755 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756 children = [drbd_sda_dev], size=disk_sz)
2757 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758 swap_sz, names[2:4])
2759 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760 children = [drbd_sdb_dev], size=swap_sz)
2761 disks = [md_sda_dev, md_sdb_dev]
2762 elif template_name == constants.DT_DRBD8:
2763 if len(secondary_nodes) != 1:
2764 raise errors.ProgrammerError("Wrong template configuration")
2765 remote_node = secondary_nodes[0]
2766 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767 ".sdb_data", ".sdb_meta"])
2768 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769 disk_sz, names[0:2], "sda")
2770 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771 swap_sz, names[2:4], "sdb")
2772 disks = [drbd_sda_dev, drbd_sdb_dev]
2774 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2778 def _GetInstanceInfoText(instance):
2779 """Compute that text that should be added to the disk's metadata.
2782 return "originstname+%s" % instance.name
2785 def _CreateDisks(cfg, instance):
2786 """Create all disks for an instance.
2788 This abstracts away some work from AddInstance.
2791 instance: the instance object
2794 True or False showing the success of the creation process
2797 info = _GetInstanceInfoText(instance)
2799 for device in instance.disks:
2800 logger.Info("creating volume %s for instance %s" %
2801 (device.iv_name, instance.name))
2803 for secondary_node in instance.secondary_nodes:
2804 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805 device, False, info):
2806 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807 (device.iv_name, device, secondary_node))
2810 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811 instance, device, info):
2812 logger.Error("failed to create volume %s on primary!" %
2818 def _RemoveDisks(instance, cfg):
2819 """Remove all disks for an instance.
2821 This abstracts away some work from `AddInstance()` and
2822 `RemoveInstance()`. Note that in case some of the devices couldn't
2823 be removed, the removal will continue with the other ones (compare
2824 with `_CreateDisks()`).
2827 instance: the instance object
2830 True or False showing the success of the removal proces
2833 logger.Info("removing block devices for instance %s" % instance.name)
2836 for device in instance.disks:
2837 for node, disk in device.ComputeNodeTree(instance.primary_node):
2838 cfg.SetDiskID(disk, node)
2839 if not rpc.call_blockdev_remove(node, disk):
2840 logger.Error("could not remove block device %s on node %s,"
2841 " continuing anyway" %
2842 (device.iv_name, node))
2847 class LUCreateInstance(LogicalUnit):
2848 """Create an instance.
2851 HPATH = "instance-add"
2852 HTYPE = constants.HTYPE_INSTANCE
2853 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854 "disk_template", "swap_size", "mode", "start", "vcpus",
2855 "wait_for_sync", "ip_check", "mac"]
2857 def BuildHooksEnv(self):
2860 This runs on master, primary and secondary nodes of the instance.
2864 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865 "INSTANCE_DISK_SIZE": self.op.disk_size,
2866 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867 "INSTANCE_ADD_MODE": self.op.mode,
2869 if self.op.mode == constants.INSTANCE_IMPORT:
2870 env["INSTANCE_SRC_NODE"] = self.op.src_node
2871 env["INSTANCE_SRC_PATH"] = self.op.src_path
2872 env["INSTANCE_SRC_IMAGE"] = self.src_image
2874 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875 primary_node=self.op.pnode,
2876 secondary_nodes=self.secondaries,
2877 status=self.instance_status,
2878 os_type=self.op.os_type,
2879 memory=self.op.mem_size,
2880 vcpus=self.op.vcpus,
2881 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2884 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2889 def CheckPrereq(self):
2890 """Check prerequisites.
2893 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2894 if not hasattr(self.op, attr):
2895 setattr(self.op, attr, None)
2897 if self.op.mode not in (constants.INSTANCE_CREATE,
2898 constants.INSTANCE_IMPORT):
2899 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2902 if self.op.mode == constants.INSTANCE_IMPORT:
2903 src_node = getattr(self.op, "src_node", None)
2904 src_path = getattr(self.op, "src_path", None)
2905 if src_node is None or src_path is None:
2906 raise errors.OpPrereqError("Importing an instance requires source"
2907 " node and path options")
2908 src_node_full = self.cfg.ExpandNodeName(src_node)
2909 if src_node_full is None:
2910 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2911 self.op.src_node = src_node = src_node_full
2913 if not os.path.isabs(src_path):
2914 raise errors.OpPrereqError("The source path must be absolute")
2916 export_info = rpc.call_export_info(src_node, src_path)
2919 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2921 if not export_info.has_section(constants.INISECT_EXP):
2922 raise errors.ProgrammerError("Corrupted export config")
2924 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2925 if (int(ei_version) != constants.EXPORT_VERSION):
2926 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2927 (ei_version, constants.EXPORT_VERSION))
2929 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2930 raise errors.OpPrereqError("Can't import instance with more than"
2933 # FIXME: are the old os-es, disk sizes, etc. useful?
2934 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2935 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2937 self.src_image = diskimage
2938 else: # INSTANCE_CREATE
2939 if getattr(self.op, "os_type", None) is None:
2940 raise errors.OpPrereqError("No guest OS specified")
2942 # check primary node
2943 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2945 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2947 self.op.pnode = pnode.name
2949 self.secondaries = []
2950 # disk template and mirror node verification
2951 if self.op.disk_template not in constants.DISK_TEMPLATES:
2952 raise errors.OpPrereqError("Invalid disk template name")
2954 if self.op.disk_template in constants.DTS_NET_MIRROR:
2955 if getattr(self.op, "snode", None) is None:
2956 raise errors.OpPrereqError("The networked disk templates need"
2959 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2960 if snode_name is None:
2961 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2963 elif snode_name == pnode.name:
2964 raise errors.OpPrereqError("The secondary node cannot be"
2965 " the primary node.")
2966 self.secondaries.append(snode_name)
2968 # Required free disk space as a function of disk and swap space
2970 constants.DT_DISKLESS: None,
2971 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973 # 256 MB are added for drbd metadata, 128MB for each drbd device
2974 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2978 if self.op.disk_template not in req_size_dict:
2979 raise errors.ProgrammerError("Disk template '%s' size requirement"
2980 " is unknown" % self.op.disk_template)
2982 req_size = req_size_dict[self.op.disk_template]
2984 # Check lv size requirements
2985 if req_size is not None:
2986 nodenames = [pnode.name] + self.secondaries
2987 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2988 for node in nodenames:
2989 info = nodeinfo.get(node, None)
2991 raise errors.OpPrereqError("Cannot get current information"
2992 " from node '%s'" % nodeinfo)
2993 vg_free = info.get('vg_free', None)
2994 if not isinstance(vg_free, int):
2995 raise errors.OpPrereqError("Can't compute free disk space on"
2997 if req_size > info['vg_free']:
2998 raise errors.OpPrereqError("Not enough disk space on target node %s."
2999 " %d MB available, %d MB required" %
3000 (node, info['vg_free'], req_size))
3003 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3005 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3006 " primary node" % self.op.os_type)
3008 if self.op.kernel_path == constants.VALUE_NONE:
3009 raise errors.OpPrereqError("Can't set instance kernel to none")
3011 # instance verification
3012 hostname1 = utils.HostInfo(self.op.instance_name)
3014 self.op.instance_name = instance_name = hostname1.name
3015 instance_list = self.cfg.GetInstanceList()
3016 if instance_name in instance_list:
3017 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3020 ip = getattr(self.op, "ip", None)
3021 if ip is None or ip.lower() == "none":
3023 elif ip.lower() == "auto":
3024 inst_ip = hostname1.ip
3026 if not utils.IsValidIP(ip):
3027 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3028 " like a valid IP" % ip)
3030 self.inst_ip = inst_ip
3032 if self.op.start and not self.op.ip_check:
3033 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3034 " adding an instance in start mode")
3036 if self.op.ip_check:
3037 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3038 constants.DEFAULT_NODED_PORT):
3039 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3040 (hostname1.ip, instance_name))
3042 # MAC address verification
3043 if self.op.mac != "auto":
3044 if not utils.IsValidMac(self.op.mac.lower()):
3045 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3048 # bridge verification
3049 bridge = getattr(self.op, "bridge", None)
3051 self.op.bridge = self.cfg.GetDefBridge()
3053 self.op.bridge = bridge
3055 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3056 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3057 " destination node '%s'" %
3058 (self.op.bridge, pnode.name))
3060 # boot order verification
3061 if self.op.hvm_boot_order is not None:
3062 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3063 raise errors.OpPrereqError("invalid boot order specified,"
3064 " must be one or more of [acdn]")
3067 self.instance_status = 'up'
3069 self.instance_status = 'down'
3071 def Exec(self, feedback_fn):
3072 """Create and add the instance to the cluster.
3075 instance = self.op.instance_name
3076 pnode_name = self.pnode.name
3078 if self.op.mac == "auto":
3079 mac_address = self.cfg.GenerateMAC()
3081 mac_address = self.op.mac
3083 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3084 if self.inst_ip is not None:
3085 nic.ip = self.inst_ip
3087 ht_kind = self.sstore.GetHypervisorType()
3088 if ht_kind in constants.HTS_REQ_PORT:
3089 network_port = self.cfg.AllocatePort()
3093 disks = _GenerateDiskTemplate(self.cfg,
3094 self.op.disk_template,
3095 instance, pnode_name,
3096 self.secondaries, self.op.disk_size,
3099 iobj = objects.Instance(name=instance, os=self.op.os_type,
3100 primary_node=pnode_name,
3101 memory=self.op.mem_size,
3102 vcpus=self.op.vcpus,
3103 nics=[nic], disks=disks,
3104 disk_template=self.op.disk_template,
3105 status=self.instance_status,
3106 network_port=network_port,
3107 kernel_path=self.op.kernel_path,
3108 initrd_path=self.op.initrd_path,
3109 hvm_boot_order=self.op.hvm_boot_order,
3112 feedback_fn("* creating instance disks...")
3113 if not _CreateDisks(self.cfg, iobj):
3114 _RemoveDisks(iobj, self.cfg)
3115 raise errors.OpExecError("Device creation failed, reverting...")
3117 feedback_fn("adding instance %s to cluster config" % instance)
3119 self.cfg.AddInstance(iobj)
3121 if self.op.wait_for_sync:
3122 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3123 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3124 # make sure the disks are not degraded (still sync-ing is ok)
3126 feedback_fn("* checking mirrors status")
3127 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3132 _RemoveDisks(iobj, self.cfg)
3133 self.cfg.RemoveInstance(iobj.name)
3134 raise errors.OpExecError("There are some degraded disks for"
3137 feedback_fn("creating os for instance %s on node %s" %
3138 (instance, pnode_name))
3140 if iobj.disk_template != constants.DT_DISKLESS:
3141 if self.op.mode == constants.INSTANCE_CREATE:
3142 feedback_fn("* running the instance OS create scripts...")
3143 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3144 raise errors.OpExecError("could not add os for instance %s"
3146 (instance, pnode_name))
3148 elif self.op.mode == constants.INSTANCE_IMPORT:
3149 feedback_fn("* running the instance OS import scripts...")
3150 src_node = self.op.src_node
3151 src_image = self.src_image
3152 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3153 src_node, src_image):
3154 raise errors.OpExecError("Could not import os for instance"
3156 (instance, pnode_name))
3158 # also checked in the prereq part
3159 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3163 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3164 feedback_fn("* starting instance...")
3165 if not rpc.call_instance_start(pnode_name, iobj, None):
3166 raise errors.OpExecError("Could not start instance")
3169 class LUConnectConsole(NoHooksLU):
3170 """Connect to an instance's console.
3172 This is somewhat special in that it returns the command line that
3173 you need to run on the master node in order to connect to the
3177 _OP_REQP = ["instance_name"]
3179 def CheckPrereq(self):
3180 """Check prerequisites.
3182 This checks that the instance is in the cluster.
3185 instance = self.cfg.GetInstanceInfo(
3186 self.cfg.ExpandInstanceName(self.op.instance_name))
3187 if instance is None:
3188 raise errors.OpPrereqError("Instance '%s' not known" %
3189 self.op.instance_name)
3190 self.instance = instance
3192 def Exec(self, feedback_fn):
3193 """Connect to the console of an instance
3196 instance = self.instance
3197 node = instance.primary_node
3199 node_insts = rpc.call_instance_list([node])[node]
3200 if node_insts is False:
3201 raise errors.OpExecError("Can't connect to node %s." % node)
3203 if instance.name not in node_insts:
3204 raise errors.OpExecError("Instance %s is not running." % instance.name)
3206 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3208 hyper = hypervisor.GetHypervisor()
3209 console_cmd = hyper.GetShellCommandForConsole(instance)
3211 argv = ["ssh", "-q", "-t"]
3212 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3213 argv.extend(ssh.BATCH_MODE_OPTS)
3215 argv.append(console_cmd)
3219 class LUAddMDDRBDComponent(LogicalUnit):
3220 """Adda new mirror member to an instance's disk.
3223 HPATH = "mirror-add"
3224 HTYPE = constants.HTYPE_INSTANCE
3225 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3227 def BuildHooksEnv(self):
3230 This runs on the master, the primary and all the secondaries.
3234 "NEW_SECONDARY": self.op.remote_node,
3235 "DISK_NAME": self.op.disk_name,
3237 env.update(_BuildInstanceHookEnvByObject(self.instance))
3238 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3239 self.op.remote_node,] + list(self.instance.secondary_nodes)
3242 def CheckPrereq(self):
3243 """Check prerequisites.
3245 This checks that the instance is in the cluster.
3248 instance = self.cfg.GetInstanceInfo(
3249 self.cfg.ExpandInstanceName(self.op.instance_name))
3250 if instance is None:
3251 raise errors.OpPrereqError("Instance '%s' not known" %
3252 self.op.instance_name)
3253 self.instance = instance
3255 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3256 if remote_node is None:
3257 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3258 self.remote_node = remote_node
3260 if remote_node == instance.primary_node:
3261 raise errors.OpPrereqError("The specified node is the primary node of"
3264 if instance.disk_template != constants.DT_REMOTE_RAID1:
3265 raise errors.OpPrereqError("Instance's disk layout is not"
3267 for disk in instance.disks:
3268 if disk.iv_name == self.op.disk_name:
3271 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3272 " instance." % self.op.disk_name)
3273 if len(disk.children) > 1:
3274 raise errors.OpPrereqError("The device already has two slave devices."
3275 " This would create a 3-disk raid1 which we"
3279 def Exec(self, feedback_fn):
3280 """Add the mirror component
3284 instance = self.instance
3286 remote_node = self.remote_node
3287 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3288 names = _GenerateUniqueNames(self.cfg, lv_names)
3289 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3290 remote_node, disk.size, names)
3292 logger.Info("adding new mirror component on secondary")
3294 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3296 _GetInstanceInfoText(instance)):
3297 raise errors.OpExecError("Failed to create new component on secondary"
3298 " node %s" % remote_node)
3300 logger.Info("adding new mirror component on primary")
3302 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3304 _GetInstanceInfoText(instance)):
3305 # remove secondary dev
3306 self.cfg.SetDiskID(new_drbd, remote_node)
3307 rpc.call_blockdev_remove(remote_node, new_drbd)
3308 raise errors.OpExecError("Failed to create volume on primary")
3310 # the device exists now
3311 # call the primary node to add the mirror to md
3312 logger.Info("adding new mirror component to md")
3313 if not rpc.call_blockdev_addchildren(instance.primary_node,
3315 logger.Error("Can't add mirror compoment to md!")
3316 self.cfg.SetDiskID(new_drbd, remote_node)
3317 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3318 logger.Error("Can't rollback on secondary")
3319 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3320 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3321 logger.Error("Can't rollback on primary")
3322 raise errors.OpExecError("Can't add mirror component to md array")
3324 disk.children.append(new_drbd)
3326 self.cfg.AddInstance(instance)
3328 _WaitForSync(self.cfg, instance, self.proc)
3333 class LURemoveMDDRBDComponent(LogicalUnit):
3334 """Remove a component from a remote_raid1 disk.
3337 HPATH = "mirror-remove"
3338 HTYPE = constants.HTYPE_INSTANCE
3339 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3341 def BuildHooksEnv(self):
3344 This runs on the master, the primary and all the secondaries.
3348 "DISK_NAME": self.op.disk_name,
3349 "DISK_ID": self.op.disk_id,
3350 "OLD_SECONDARY": self.old_secondary,
3352 env.update(_BuildInstanceHookEnvByObject(self.instance))
3353 nl = [self.sstore.GetMasterNode(),
3354 self.instance.primary_node] + list(self.instance.secondary_nodes)
3357 def CheckPrereq(self):
3358 """Check prerequisites.
3360 This checks that the instance is in the cluster.
3363 instance = self.cfg.GetInstanceInfo(
3364 self.cfg.ExpandInstanceName(self.op.instance_name))
3365 if instance is None:
3366 raise errors.OpPrereqError("Instance '%s' not known" %
3367 self.op.instance_name)
3368 self.instance = instance
3370 if instance.disk_template != constants.DT_REMOTE_RAID1:
3371 raise errors.OpPrereqError("Instance's disk layout is not"
3373 for disk in instance.disks:
3374 if disk.iv_name == self.op.disk_name:
3377 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3378 " instance." % self.op.disk_name)
3379 for child in disk.children:
3380 if (child.dev_type == constants.LD_DRBD7 and
3381 child.logical_id[2] == self.op.disk_id):
3384 raise errors.OpPrereqError("Can't find the device with this port.")
3386 if len(disk.children) < 2:
3387 raise errors.OpPrereqError("Cannot remove the last component from"
3391 if self.child.logical_id[0] == instance.primary_node:
3395 self.old_secondary = self.child.logical_id[oid]
3397 def Exec(self, feedback_fn):
3398 """Remove the mirror component
3401 instance = self.instance
3404 logger.Info("remove mirror component")
3405 self.cfg.SetDiskID(disk, instance.primary_node)
3406 if not rpc.call_blockdev_removechildren(instance.primary_node,
3408 raise errors.OpExecError("Can't remove child from mirror.")
3410 for node in child.logical_id[:2]:
3411 self.cfg.SetDiskID(child, node)
3412 if not rpc.call_blockdev_remove(node, child):
3413 logger.Error("Warning: failed to remove device from node %s,"
3414 " continuing operation." % node)
3416 disk.children.remove(child)
3417 self.cfg.AddInstance(instance)
3420 class LUReplaceDisks(LogicalUnit):
3421 """Replace the disks of an instance.
3424 HPATH = "mirrors-replace"
3425 HTYPE = constants.HTYPE_INSTANCE
3426 _OP_REQP = ["instance_name", "mode", "disks"]
3428 def BuildHooksEnv(self):
3431 This runs on the master, the primary and all the secondaries.
3435 "MODE": self.op.mode,
3436 "NEW_SECONDARY": self.op.remote_node,
3437 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3439 env.update(_BuildInstanceHookEnvByObject(self.instance))
3441 self.sstore.GetMasterNode(),
3442 self.instance.primary_node,
3444 if self.op.remote_node is not None:
3445 nl.append(self.op.remote_node)
3448 def CheckPrereq(self):
3449 """Check prerequisites.
3451 This checks that the instance is in the cluster.
3454 instance = self.cfg.GetInstanceInfo(
3455 self.cfg.ExpandInstanceName(self.op.instance_name))
3456 if instance is None:
3457 raise errors.OpPrereqError("Instance '%s' not known" %
3458 self.op.instance_name)
3459 self.instance = instance
3460 self.op.instance_name = instance.name
3462 if instance.disk_template not in constants.DTS_NET_MIRROR:
3463 raise errors.OpPrereqError("Instance's disk layout is not"
3464 " network mirrored.")
3466 if len(instance.secondary_nodes) != 1:
3467 raise errors.OpPrereqError("The instance has a strange layout,"
3468 " expected one secondary but found %d" %
3469 len(instance.secondary_nodes))
3471 self.sec_node = instance.secondary_nodes[0]
3473 remote_node = getattr(self.op, "remote_node", None)
3474 if remote_node is not None:
3475 remote_node = self.cfg.ExpandNodeName(remote_node)
3476 if remote_node is None:
3477 raise errors.OpPrereqError("Node '%s' not known" %
3478 self.op.remote_node)
3479 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3481 self.remote_node_info = None
3482 if remote_node == instance.primary_node:
3483 raise errors.OpPrereqError("The specified node is the primary node of"
3485 elif remote_node == self.sec_node:
3486 if self.op.mode == constants.REPLACE_DISK_SEC:
3487 # this is for DRBD8, where we can't execute the same mode of
3488 # replacement as for drbd7 (no different port allocated)
3489 raise errors.OpPrereqError("Same secondary given, cannot execute"
3491 # the user gave the current secondary, switch to
3492 # 'no-replace-secondary' mode for drbd7
3494 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3495 self.op.mode != constants.REPLACE_DISK_ALL):
3496 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3497 " disks replacement, not individual ones")
3498 if instance.disk_template == constants.DT_DRBD8:
3499 if (self.op.mode == constants.REPLACE_DISK_ALL and
3500 remote_node is not None):
3501 # switch to replace secondary mode
3502 self.op.mode = constants.REPLACE_DISK_SEC
3504 if self.op.mode == constants.REPLACE_DISK_ALL:
3505 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3506 " secondary disk replacement, not"
3508 elif self.op.mode == constants.REPLACE_DISK_PRI:
3509 if remote_node is not None:
3510 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3511 " the secondary while doing a primary"
3512 " node disk replacement")
3513 self.tgt_node = instance.primary_node
3514 self.oth_node = instance.secondary_nodes[0]
3515 elif self.op.mode == constants.REPLACE_DISK_SEC:
3516 self.new_node = remote_node # this can be None, in which case
3517 # we don't change the secondary
3518 self.tgt_node = instance.secondary_nodes[0]
3519 self.oth_node = instance.primary_node
3521 raise errors.ProgrammerError("Unhandled disk replace mode")
3523 for name in self.op.disks:
3524 if instance.FindDisk(name) is None:
3525 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3526 (name, instance.name))
3527 self.op.remote_node = remote_node
3529 def _ExecRR1(self, feedback_fn):
3530 """Replace the disks of an instance.
3533 instance = self.instance
3536 if self.op.remote_node is None:
3537 remote_node = self.sec_node
3539 remote_node = self.op.remote_node
3541 for dev in instance.disks:
3543 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3544 names = _GenerateUniqueNames(cfg, lv_names)
3545 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3546 remote_node, size, names)
3547 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3548 logger.Info("adding new mirror component on secondary for %s" %
3551 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3553 _GetInstanceInfoText(instance)):
3554 raise errors.OpExecError("Failed to create new component on secondary"
3555 " node %s. Full abort, cleanup manually!" %
3558 logger.Info("adding new mirror component on primary")
3560 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3562 _GetInstanceInfoText(instance)):
3563 # remove secondary dev
3564 cfg.SetDiskID(new_drbd, remote_node)
3565 rpc.call_blockdev_remove(remote_node, new_drbd)
3566 raise errors.OpExecError("Failed to create volume on primary!"
3567 " Full abort, cleanup manually!!")
3569 # the device exists now
3570 # call the primary node to add the mirror to md
3571 logger.Info("adding new mirror component to md")
3572 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3574 logger.Error("Can't add mirror compoment to md!")
3575 cfg.SetDiskID(new_drbd, remote_node)
3576 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3577 logger.Error("Can't rollback on secondary")
3578 cfg.SetDiskID(new_drbd, instance.primary_node)
3579 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3580 logger.Error("Can't rollback on primary")
3581 raise errors.OpExecError("Full abort, cleanup manually!!")
3583 dev.children.append(new_drbd)
3584 cfg.AddInstance(instance)
3586 # this can fail as the old devices are degraded and _WaitForSync
3587 # does a combined result over all disks, so we don't check its
3589 _WaitForSync(cfg, instance, self.proc, unlock=True)
3591 # so check manually all the devices
3592 for name in iv_names:
3593 dev, child, new_drbd = iv_names[name]
3594 cfg.SetDiskID(dev, instance.primary_node)
3595 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3597 raise errors.OpExecError("MD device %s is degraded!" % name)
3598 cfg.SetDiskID(new_drbd, instance.primary_node)
3599 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3601 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3603 for name in iv_names:
3604 dev, child, new_drbd = iv_names[name]
3605 logger.Info("remove mirror %s component" % name)
3606 cfg.SetDiskID(dev, instance.primary_node)
3607 if not rpc.call_blockdev_removechildren(instance.primary_node,
3609 logger.Error("Can't remove child from mirror, aborting"
3610 " *this device cleanup*.\nYou need to cleanup manually!!")
3613 for node in child.logical_id[:2]:
3614 logger.Info("remove child device on %s" % node)
3615 cfg.SetDiskID(child, node)
3616 if not rpc.call_blockdev_remove(node, child):
3617 logger.Error("Warning: failed to remove device from node %s,"
3618 " continuing operation." % node)
3620 dev.children.remove(child)
3622 cfg.AddInstance(instance)
3624 def _ExecD8DiskOnly(self, feedback_fn):
3625 """Replace a disk on the primary or secondary for dbrd8.
3627 The algorithm for replace is quite complicated:
3628 - for each disk to be replaced:
3629 - create new LVs on the target node with unique names
3630 - detach old LVs from the drbd device
3631 - rename old LVs to name_replaced.<time_t>
3632 - rename new LVs to old LVs
3633 - attach the new LVs (with the old names now) to the drbd device
3634 - wait for sync across all devices
3635 - for each modified disk:
3636 - remove old LVs (which have the name name_replaces.<time_t>)
3638 Failures are not very well handled.
3642 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3643 instance = self.instance
3645 vgname = self.cfg.GetVGName()
3648 tgt_node = self.tgt_node
3649 oth_node = self.oth_node
3651 # Step: check device activation
3652 self.proc.LogStep(1, steps_total, "check device existence")
3653 info("checking volume groups")
3654 my_vg = cfg.GetVGName()
3655 results = rpc.call_vg_list([oth_node, tgt_node])
3657 raise errors.OpExecError("Can't list volume groups on the nodes")
3658 for node in oth_node, tgt_node:
3659 res = results.get(node, False)
3660 if not res or my_vg not in res:
3661 raise errors.OpExecError("Volume group '%s' not found on %s" %
3663 for dev in instance.disks:
3664 if not dev.iv_name in self.op.disks:
3666 for node in tgt_node, oth_node:
3667 info("checking %s on %s" % (dev.iv_name, node))
3668 cfg.SetDiskID(dev, node)
3669 if not rpc.call_blockdev_find(node, dev):
3670 raise errors.OpExecError("Can't find device %s on node %s" %
3671 (dev.iv_name, node))
3673 # Step: check other node consistency
3674 self.proc.LogStep(2, steps_total, "check peer consistency")
3675 for dev in instance.disks:
3676 if not dev.iv_name in self.op.disks:
3678 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3679 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3680 oth_node==instance.primary_node):
3681 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3682 " to replace disks on this node (%s)" %
3683 (oth_node, tgt_node))
3685 # Step: create new storage
3686 self.proc.LogStep(3, steps_total, "allocate new storage")
3687 for dev in instance.disks:
3688 if not dev.iv_name in self.op.disks:
3691 cfg.SetDiskID(dev, tgt_node)
3692 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3693 names = _GenerateUniqueNames(cfg, lv_names)
3694 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3695 logical_id=(vgname, names[0]))
3696 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3697 logical_id=(vgname, names[1]))
3698 new_lvs = [lv_data, lv_meta]
3699 old_lvs = dev.children
3700 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3701 info("creating new local storage on %s for %s" %
3702 (tgt_node, dev.iv_name))
3703 # since we *always* want to create this LV, we use the
3704 # _Create...OnPrimary (which forces the creation), even if we
3705 # are talking about the secondary node
3706 for new_lv in new_lvs:
3707 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3708 _GetInstanceInfoText(instance)):
3709 raise errors.OpExecError("Failed to create new LV named '%s' on"
3711 (new_lv.logical_id[1], tgt_node))
3713 # Step: for each lv, detach+rename*2+attach
3714 self.proc.LogStep(4, steps_total, "change drbd configuration")
3715 for dev, old_lvs, new_lvs in iv_names.itervalues():
3716 info("detaching %s drbd from local storage" % dev.iv_name)
3717 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3718 raise errors.OpExecError("Can't detach drbd from local storage on node"
3719 " %s for device %s" % (tgt_node, dev.iv_name))
3721 #cfg.Update(instance)
3723 # ok, we created the new LVs, so now we know we have the needed
3724 # storage; as such, we proceed on the target node to rename
3725 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3726 # using the assumption than logical_id == physical_id (which in
3727 # turn is the unique_id on that node)
3729 # FIXME(iustin): use a better name for the replaced LVs
3730 temp_suffix = int(time.time())
3731 ren_fn = lambda d, suff: (d.physical_id[0],
3732 d.physical_id[1] + "_replaced-%s" % suff)
3733 # build the rename list based on what LVs exist on the node
3735 for to_ren in old_lvs:
3736 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3737 if find_res is not None: # device exists
3738 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3740 info("renaming the old LVs on the target node")
3741 if not rpc.call_blockdev_rename(tgt_node, rlist):
3742 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3743 # now we rename the new LVs to the old LVs
3744 info("renaming the new LVs on the target node")
3745 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3746 if not rpc.call_blockdev_rename(tgt_node, rlist):
3747 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3749 for old, new in zip(old_lvs, new_lvs):
3750 new.logical_id = old.logical_id
3751 cfg.SetDiskID(new, tgt_node)
3753 for disk in old_lvs:
3754 disk.logical_id = ren_fn(disk, temp_suffix)
3755 cfg.SetDiskID(disk, tgt_node)
3757 # now that the new lvs have the old name, we can add them to the device
3758 info("adding new mirror component on %s" % tgt_node)
3759 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3760 for new_lv in new_lvs:
3761 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3762 warning("Can't rollback device %s", hint="manually cleanup unused"
3764 raise errors.OpExecError("Can't add local storage to drbd")
3766 dev.children = new_lvs
3767 cfg.Update(instance)
3769 # Step: wait for sync
3771 # this can fail as the old devices are degraded and _WaitForSync
3772 # does a combined result over all disks, so we don't check its
3774 self.proc.LogStep(5, steps_total, "sync devices")
3775 _WaitForSync(cfg, instance, self.proc, unlock=True)
3777 # so check manually all the devices
3778 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3779 cfg.SetDiskID(dev, instance.primary_node)
3780 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3782 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3784 # Step: remove old storage
3785 self.proc.LogStep(6, steps_total, "removing old storage")
3786 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3787 info("remove logical volumes for %s" % name)
3789 cfg.SetDiskID(lv, tgt_node)
3790 if not rpc.call_blockdev_remove(tgt_node, lv):
3791 warning("Can't remove old LV", hint="manually remove unused LVs")
3794 def _ExecD8Secondary(self, feedback_fn):
3795 """Replace the secondary node for drbd8.
3797 The algorithm for replace is quite complicated:
3798 - for all disks of the instance:
3799 - create new LVs on the new node with same names
3800 - shutdown the drbd device on the old secondary
3801 - disconnect the drbd network on the primary
3802 - create the drbd device on the new secondary
3803 - network attach the drbd on the primary, using an artifice:
3804 the drbd code for Attach() will connect to the network if it
3805 finds a device which is connected to the good local disks but
3807 - wait for sync across all devices
3808 - remove all disks from the old secondary
3810 Failures are not very well handled.
3814 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3815 instance = self.instance
3817 vgname = self.cfg.GetVGName()
3820 old_node = self.tgt_node
3821 new_node = self.new_node
3822 pri_node = instance.primary_node
3824 # Step: check device activation
3825 self.proc.LogStep(1, steps_total, "check device existence")
3826 info("checking volume groups")
3827 my_vg = cfg.GetVGName()
3828 results = rpc.call_vg_list([pri_node, new_node])
3830 raise errors.OpExecError("Can't list volume groups on the nodes")
3831 for node in pri_node, new_node:
3832 res = results.get(node, False)
3833 if not res or my_vg not in res:
3834 raise errors.OpExecError("Volume group '%s' not found on %s" %
3836 for dev in instance.disks:
3837 if not dev.iv_name in self.op.disks:
3839 info("checking %s on %s" % (dev.iv_name, pri_node))
3840 cfg.SetDiskID(dev, pri_node)
3841 if not rpc.call_blockdev_find(pri_node, dev):
3842 raise errors.OpExecError("Can't find device %s on node %s" %
3843 (dev.iv_name, pri_node))
3845 # Step: check other node consistency
3846 self.proc.LogStep(2, steps_total, "check peer consistency")
3847 for dev in instance.disks:
3848 if not dev.iv_name in self.op.disks:
3850 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3851 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3852 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3853 " unsafe to replace the secondary" %
3856 # Step: create new storage
3857 self.proc.LogStep(3, steps_total, "allocate new storage")
3858 for dev in instance.disks:
3860 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3861 # since we *always* want to create this LV, we use the
3862 # _Create...OnPrimary (which forces the creation), even if we
3863 # are talking about the secondary node
3864 for new_lv in dev.children:
3865 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3866 _GetInstanceInfoText(instance)):
3867 raise errors.OpExecError("Failed to create new LV named '%s' on"
3869 (new_lv.logical_id[1], new_node))
3871 iv_names[dev.iv_name] = (dev, dev.children)
3873 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3874 for dev in instance.disks:
3876 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3877 # create new devices on new_node
3878 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3879 logical_id=(pri_node, new_node,
3881 children=dev.children)
3882 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3884 _GetInstanceInfoText(instance)):
3885 raise errors.OpExecError("Failed to create new DRBD on"
3886 " node '%s'" % new_node)
3888 for dev in instance.disks:
3889 # we have new devices, shutdown the drbd on the old secondary
3890 info("shutting down drbd for %s on old node" % dev.iv_name)
3891 cfg.SetDiskID(dev, old_node)
3892 if not rpc.call_blockdev_shutdown(old_node, dev):
3893 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3894 hint="Please cleanup this device manually as soon as possible")
3896 info("detaching primary drbds from the network (=> standalone)")
3898 for dev in instance.disks:
3899 cfg.SetDiskID(dev, pri_node)
3900 # set the physical (unique in bdev terms) id to None, meaning
3901 # detach from network
3902 dev.physical_id = (None,) * len(dev.physical_id)
3903 # and 'find' the device, which will 'fix' it to match the
3905 if rpc.call_blockdev_find(pri_node, dev):
3908 warning("Failed to detach drbd %s from network, unusual case" %
3912 # no detaches succeeded (very unlikely)
3913 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3915 # if we managed to detach at least one, we update all the disks of
3916 # the instance to point to the new secondary
3917 info("updating instance configuration")
3918 for dev in instance.disks:
3919 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3920 cfg.SetDiskID(dev, pri_node)
3921 cfg.Update(instance)
3923 # and now perform the drbd attach
3924 info("attaching primary drbds to new secondary (standalone => connected)")
3926 for dev in instance.disks:
3927 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3928 # since the attach is smart, it's enough to 'find' the device,
3929 # it will automatically activate the network, if the physical_id
3931 cfg.SetDiskID(dev, pri_node)
3932 if not rpc.call_blockdev_find(pri_node, dev):
3933 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3934 "please do a gnt-instance info to see the status of disks")
3936 # this can fail as the old devices are degraded and _WaitForSync
3937 # does a combined result over all disks, so we don't check its
3939 self.proc.LogStep(5, steps_total, "sync devices")
3940 _WaitForSync(cfg, instance, self.proc, unlock=True)
3942 # so check manually all the devices
3943 for name, (dev, old_lvs) in iv_names.iteritems():
3944 cfg.SetDiskID(dev, pri_node)
3945 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3947 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3949 self.proc.LogStep(6, steps_total, "removing old storage")
3950 for name, (dev, old_lvs) in iv_names.iteritems():
3951 info("remove logical volumes for %s" % name)
3953 cfg.SetDiskID(lv, old_node)
3954 if not rpc.call_blockdev_remove(old_node, lv):
3955 warning("Can't remove LV on old secondary",
3956 hint="Cleanup stale volumes by hand")
3958 def Exec(self, feedback_fn):
3959 """Execute disk replacement.
3961 This dispatches the disk replacement to the appropriate handler.
3964 instance = self.instance
3965 if instance.disk_template == constants.DT_REMOTE_RAID1:
3967 elif instance.disk_template == constants.DT_DRBD8:
3968 if self.op.remote_node is None:
3969 fn = self._ExecD8DiskOnly
3971 fn = self._ExecD8Secondary
3973 raise errors.ProgrammerError("Unhandled disk replacement case")
3974 return fn(feedback_fn)
3977 class LUQueryInstanceData(NoHooksLU):
3978 """Query runtime instance data.
3981 _OP_REQP = ["instances"]
3983 def CheckPrereq(self):
3984 """Check prerequisites.
3986 This only checks the optional instance list against the existing names.
3989 if not isinstance(self.op.instances, list):
3990 raise errors.OpPrereqError("Invalid argument type 'instances'")
3991 if self.op.instances:
3992 self.wanted_instances = []
3993 names = self.op.instances
3995 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3996 if instance is None:
3997 raise errors.OpPrereqError("No such instance name '%s'" % name)
3998 self.wanted_instances.append(instance)
4000 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4001 in self.cfg.GetInstanceList()]
4005 def _ComputeDiskStatus(self, instance, snode, dev):
4006 """Compute block device status.
4009 self.cfg.SetDiskID(dev, instance.primary_node)
4010 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4011 if dev.dev_type in constants.LDS_DRBD:
4012 # we change the snode then (otherwise we use the one passed in)
4013 if dev.logical_id[0] == instance.primary_node:
4014 snode = dev.logical_id[1]
4016 snode = dev.logical_id[0]
4019 self.cfg.SetDiskID(dev, snode)
4020 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4025 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4026 for child in dev.children]
4031 "iv_name": dev.iv_name,
4032 "dev_type": dev.dev_type,
4033 "logical_id": dev.logical_id,
4034 "physical_id": dev.physical_id,
4035 "pstatus": dev_pstatus,
4036 "sstatus": dev_sstatus,
4037 "children": dev_children,
4042 def Exec(self, feedback_fn):
4043 """Gather and return data"""
4045 for instance in self.wanted_instances:
4046 remote_info = rpc.call_instance_info(instance.primary_node,
4048 if remote_info and "state" in remote_info:
4051 remote_state = "down"
4052 if instance.status == "down":
4053 config_state = "down"
4057 disks = [self._ComputeDiskStatus(instance, None, device)
4058 for device in instance.disks]
4061 "name": instance.name,
4062 "config_state": config_state,
4063 "run_state": remote_state,
4064 "pnode": instance.primary_node,
4065 "snodes": instance.secondary_nodes,
4067 "memory": instance.memory,
4068 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4070 "network_port": instance.network_port,
4071 "vcpus": instance.vcpus,
4072 "kernel_path": instance.kernel_path,
4073 "initrd_path": instance.initrd_path,
4074 "hvm_boot_order": instance.hvm_boot_order,
4077 result[instance.name] = idict
4082 class LUSetInstanceParms(LogicalUnit):
4083 """Modifies an instances's parameters.
4086 HPATH = "instance-modify"
4087 HTYPE = constants.HTYPE_INSTANCE
4088 _OP_REQP = ["instance_name"]
4090 def BuildHooksEnv(self):
4093 This runs on the master, primary and secondaries.
4098 args['memory'] = self.mem
4100 args['vcpus'] = self.vcpus
4101 if self.do_ip or self.do_bridge:
4105 ip = self.instance.nics[0].ip
4107 bridge = self.bridge
4109 bridge = self.instance.nics[0].bridge
4110 args['nics'] = [(ip, bridge)]
4111 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4112 nl = [self.sstore.GetMasterNode(),
4113 self.instance.primary_node] + list(self.instance.secondary_nodes)
4116 def CheckPrereq(self):
4117 """Check prerequisites.
4119 This only checks the instance list against the existing names.
4122 self.mem = getattr(self.op, "mem", None)
4123 self.vcpus = getattr(self.op, "vcpus", None)
4124 self.ip = getattr(self.op, "ip", None)
4125 self.mac = getattr(self.op, "mac", None)
4126 self.bridge = getattr(self.op, "bridge", None)
4127 self.kernel_path = getattr(self.op, "kernel_path", None)
4128 self.initrd_path = getattr(self.op, "initrd_path", None)
4129 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4130 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4131 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4132 if all_parms.count(None) == len(all_parms):
4133 raise errors.OpPrereqError("No changes submitted")
4134 if self.mem is not None:
4136 self.mem = int(self.mem)
4137 except ValueError, err:
4138 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4139 if self.vcpus is not None:
4141 self.vcpus = int(self.vcpus)
4142 except ValueError, err:
4143 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4144 if self.ip is not None:
4146 if self.ip.lower() == "none":
4149 if not utils.IsValidIP(self.ip):
4150 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4153 self.do_bridge = (self.bridge is not None)
4154 if self.mac is not None:
4155 if self.cfg.IsMacInUse(self.mac):
4156 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4158 if not utils.IsValidMac(self.mac):
4159 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4161 if self.kernel_path is not None:
4162 self.do_kernel_path = True
4163 if self.kernel_path == constants.VALUE_NONE:
4164 raise errors.OpPrereqError("Can't set instance to no kernel")
4166 if self.kernel_path != constants.VALUE_DEFAULT:
4167 if not os.path.isabs(self.kernel_path):
4168 raise errors.OpPrereqError("The kernel path must be an absolute"
4171 self.do_kernel_path = False
4173 if self.initrd_path is not None:
4174 self.do_initrd_path = True
4175 if self.initrd_path not in (constants.VALUE_NONE,
4176 constants.VALUE_DEFAULT):
4177 if not os.path.isabs(self.initrd_path):
4178 raise errors.OpPrereqError("The initrd path must be an absolute"
4181 self.do_initrd_path = False
4183 # boot order verification
4184 if self.hvm_boot_order is not None:
4185 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4186 if len(self.hvm_boot_order.strip("acdn")) != 0:
4187 raise errors.OpPrereqError("invalid boot order specified,"
4188 " must be one or more of [acdn]"
4191 instance = self.cfg.GetInstanceInfo(
4192 self.cfg.ExpandInstanceName(self.op.instance_name))
4193 if instance is None:
4194 raise errors.OpPrereqError("No such instance name '%s'" %
4195 self.op.instance_name)
4196 self.op.instance_name = instance.name
4197 self.instance = instance
4200 def Exec(self, feedback_fn):
4201 """Modifies an instance.
4203 All parameters take effect only at the next restart of the instance.
4206 instance = self.instance
4208 instance.memory = self.mem
4209 result.append(("mem", self.mem))
4211 instance.vcpus = self.vcpus
4212 result.append(("vcpus", self.vcpus))
4214 instance.nics[0].ip = self.ip
4215 result.append(("ip", self.ip))
4217 instance.nics[0].bridge = self.bridge
4218 result.append(("bridge", self.bridge))
4220 instance.nics[0].mac = self.mac
4221 result.append(("mac", self.mac))
4222 if self.do_kernel_path:
4223 instance.kernel_path = self.kernel_path
4224 result.append(("kernel_path", self.kernel_path))
4225 if self.do_initrd_path:
4226 instance.initrd_path = self.initrd_path
4227 result.append(("initrd_path", self.initrd_path))
4228 if self.hvm_boot_order:
4229 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4230 instance.hvm_boot_order = None
4232 instance.hvm_boot_order = self.hvm_boot_order
4233 result.append(("hvm_boot_order", self.hvm_boot_order))
4235 self.cfg.AddInstance(instance)
4240 class LUQueryExports(NoHooksLU):
4241 """Query the exports list
4246 def CheckPrereq(self):
4247 """Check that the nodelist contains only existing nodes.
4250 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4252 def Exec(self, feedback_fn):
4253 """Compute the list of all the exported system images.
4256 a dictionary with the structure node->(export-list)
4257 where export-list is a list of the instances exported on
4261 return rpc.call_export_list(self.nodes)
4264 class LUExportInstance(LogicalUnit):
4265 """Export an instance to an image in the cluster.
4268 HPATH = "instance-export"
4269 HTYPE = constants.HTYPE_INSTANCE
4270 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4272 def BuildHooksEnv(self):
4275 This will run on the master, primary node and target node.
4279 "EXPORT_NODE": self.op.target_node,
4280 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4282 env.update(_BuildInstanceHookEnvByObject(self.instance))
4283 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4284 self.op.target_node]
4287 def CheckPrereq(self):
4288 """Check prerequisites.
4290 This checks that the instance name is a valid one.
4293 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4294 self.instance = self.cfg.GetInstanceInfo(instance_name)
4295 if self.instance is None:
4296 raise errors.OpPrereqError("Instance '%s' not found" %
4297 self.op.instance_name)
4300 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4301 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4303 if self.dst_node is None:
4304 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4305 self.op.target_node)
4306 self.op.target_node = self.dst_node.name
4308 def Exec(self, feedback_fn):
4309 """Export an instance to an image in the cluster.
4312 instance = self.instance
4313 dst_node = self.dst_node
4314 src_node = instance.primary_node
4315 # shutdown the instance, unless requested not to do so
4316 if self.op.shutdown:
4317 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4318 self.proc.ChainOpCode(op)
4320 vgname = self.cfg.GetVGName()
4325 for disk in instance.disks:
4326 if disk.iv_name == "sda":
4327 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4328 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4330 if not new_dev_name:
4331 logger.Error("could not snapshot block device %s on node %s" %
4332 (disk.logical_id[1], src_node))
4334 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4335 logical_id=(vgname, new_dev_name),
4336 physical_id=(vgname, new_dev_name),
4337 iv_name=disk.iv_name)
4338 snap_disks.append(new_dev)
4341 if self.op.shutdown:
4342 op = opcodes.OpStartupInstance(instance_name=instance.name,
4344 self.proc.ChainOpCode(op)
4346 # TODO: check for size
4348 for dev in snap_disks:
4349 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4351 logger.Error("could not export block device %s from node"
4353 (dev.logical_id[1], src_node, dst_node.name))
4354 if not rpc.call_blockdev_remove(src_node, dev):
4355 logger.Error("could not remove snapshot block device %s from"
4356 " node %s" % (dev.logical_id[1], src_node))
4358 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4359 logger.Error("could not finalize export for instance %s on node %s" %
4360 (instance.name, dst_node.name))
4362 nodelist = self.cfg.GetNodeList()
4363 nodelist.remove(dst_node.name)
4365 # on one-node clusters nodelist will be empty after the removal
4366 # if we proceed the backup would be removed because OpQueryExports
4367 # substitutes an empty list with the full cluster node list.
4369 op = opcodes.OpQueryExports(nodes=nodelist)
4370 exportlist = self.proc.ChainOpCode(op)
4371 for node in exportlist:
4372 if instance.name in exportlist[node]:
4373 if not rpc.call_export_remove(node, instance.name):
4374 logger.Error("could not remove older export for instance %s"
4375 " on node %s" % (instance.name, node))
4378 class TagsLU(NoHooksLU):
4381 This is an abstract class which is the parent of all the other tags LUs.
4384 def CheckPrereq(self):
4385 """Check prerequisites.
4388 if self.op.kind == constants.TAG_CLUSTER:
4389 self.target = self.cfg.GetClusterInfo()
4390 elif self.op.kind == constants.TAG_NODE:
4391 name = self.cfg.ExpandNodeName(self.op.name)
4393 raise errors.OpPrereqError("Invalid node name (%s)" %
4396 self.target = self.cfg.GetNodeInfo(name)
4397 elif self.op.kind == constants.TAG_INSTANCE:
4398 name = self.cfg.ExpandInstanceName(self.op.name)
4400 raise errors.OpPrereqError("Invalid instance name (%s)" %
4403 self.target = self.cfg.GetInstanceInfo(name)
4405 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4409 class LUGetTags(TagsLU):
4410 """Returns the tags of a given object.
4413 _OP_REQP = ["kind", "name"]
4415 def Exec(self, feedback_fn):
4416 """Returns the tag list.
4419 return self.target.GetTags()
4422 class LUSearchTags(NoHooksLU):
4423 """Searches the tags for a given pattern.
4426 _OP_REQP = ["pattern"]
4428 def CheckPrereq(self):
4429 """Check prerequisites.
4431 This checks the pattern passed for validity by compiling it.
4435 self.re = re.compile(self.op.pattern)
4436 except re.error, err:
4437 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4438 (self.op.pattern, err))
4440 def Exec(self, feedback_fn):
4441 """Returns the tag list.
4445 tgts = [("/cluster", cfg.GetClusterInfo())]
4446 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4447 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4448 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4449 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4451 for path, target in tgts:
4452 for tag in target.GetTags():
4453 if self.re.search(tag):
4454 results.append((path, tag))
4458 class LUAddTags(TagsLU):
4459 """Sets a tag on a given object.
4462 _OP_REQP = ["kind", "name", "tags"]
4464 def CheckPrereq(self):
4465 """Check prerequisites.
4467 This checks the type and length of the tag name and value.
4470 TagsLU.CheckPrereq(self)
4471 for tag in self.op.tags:
4472 objects.TaggableObject.ValidateTag(tag)
4474 def Exec(self, feedback_fn):
4479 for tag in self.op.tags:
4480 self.target.AddTag(tag)
4481 except errors.TagError, err:
4482 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4484 self.cfg.Update(self.target)
4485 except errors.ConfigurationError:
4486 raise errors.OpRetryError("There has been a modification to the"
4487 " config file and the operation has been"
4488 " aborted. Please retry.")
4491 class LUDelTags(TagsLU):
4492 """Delete a list of tags from a given object.
4495 _OP_REQP = ["kind", "name", "tags"]
4497 def CheckPrereq(self):
4498 """Check prerequisites.
4500 This checks that we have the given tag.
4503 TagsLU.CheckPrereq(self)
4504 for tag in self.op.tags:
4505 objects.TaggableObject.ValidateTag(tag)
4506 del_tags = frozenset(self.op.tags)
4507 cur_tags = self.target.GetTags()
4508 if not del_tags <= cur_tags:
4509 diff_tags = del_tags - cur_tags
4510 diff_names = ["'%s'" % tag for tag in diff_tags]
4512 raise errors.OpPrereqError("Tag(s) %s not found" %
4513 (",".join(diff_names)))
4515 def Exec(self, feedback_fn):
4516 """Remove the tag from the object.
4519 for tag in self.op.tags:
4520 self.target.RemoveTag(tag)
4522 self.cfg.Update(self.target)
4523 except errors.ConfigurationError:
4524 raise errors.OpRetryError("There has been a modification to the"
4525 " config file and the operation has been"
4526 " aborted. Please retry.")