4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
278 env["INSTANCE_NIC_COUNT"] = nic_count
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284 """Builds instance related env variables for hooks from an object.
287 instance: objects.Instance object of instance
288 override: dict of values to override
291 'name': instance.name,
292 'primary_node': instance.primary_node,
293 'secondary_nodes': instance.secondary_nodes,
294 'os_type': instance.os,
295 'status': instance.os,
296 'memory': instance.memory,
297 'vcpus': instance.vcpus,
298 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
301 args.update(override)
302 return _BuildInstanceHookEnv(**args)
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306 """Ensure a node has a correct known_hosts entry.
309 fullnode - Fully qualified domain name of host. (str)
310 ip - IPv4 address of host (str)
311 pubkey - the public key of the cluster
314 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
326 logger.Debug('read %s' % (repr(rawline),))
328 parts = rawline.rstrip('\r\n').split()
330 # Ignore unwanted lines
331 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332 fields = parts[0].split(',')
337 for spec in [ ip, fullnode ]:
338 if spec not in fields:
343 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344 if haveall and key == pubkey:
346 save_lines.append(rawline)
347 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350 if havesome and (not haveall or key != pubkey):
352 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355 save_lines.append(rawline)
358 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362 save_lines = save_lines + add_lines
364 # Write a new file and replace old.
365 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367 newfile = os.fdopen(fd, 'w')
369 newfile.write(''.join(save_lines))
372 logger.Debug("Wrote new known_hosts.")
373 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376 # Simply appending a new line will do the trick.
378 for add in add_lines:
384 def _HasValidVG(vglist, vgname):
385 """Checks if the volume group list is valid.
387 A non-None return value means there's an error, and the return value
388 is the error message.
391 vgsize = vglist.get(vgname, None)
393 return "volume group '%s' missing" % vgname
395 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
400 def _InitSSHSetup(node):
401 """Setup the SSH configuration for the cluster.
404 This generates a dsa keypair for root, adds the pub key to the
405 permitted hosts and adds the hostkey to its own known hosts.
408 node: the name of this host as a fqdn
411 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413 for name in priv_key, pub_key:
414 if os.path.exists(name):
415 utils.CreateBackup(name)
416 utils.RemoveFile(name)
418 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425 f = open(pub_key, 'r')
427 utils.AddAuthorizedKey(auth_keys, f.read(8192))
432 def _InitGanetiServerSetup(ss):
433 """Setup the necessary configuration for the initial node daemon.
435 This creates the nodepass file containing the shared password for
436 the cluster and also generates the SSL certificate.
439 # Create pseudo random password
440 randpass = sha.new(os.urandom(64)).hexdigest()
441 # and write it into sstore
442 ss.SetKey(ss.SS_NODED_PASS, randpass)
444 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445 "-days", str(365*5), "-nodes", "-x509",
446 "-keyout", constants.SSL_CERT_FILE,
447 "-out", constants.SSL_CERT_FILE, "-batch"])
449 raise errors.OpExecError("could not generate server ssl cert, command"
450 " %s had exitcode %s and error message %s" %
451 (result.cmd, result.exit_code, result.output))
453 os.chmod(constants.SSL_CERT_FILE, 0400)
455 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458 raise errors.OpExecError("Could not start the node daemon, command %s"
459 " had exitcode %s and error %s" %
460 (result.cmd, result.exit_code, result.output))
463 def _CheckInstanceBridgesExist(instance):
464 """Check that the brigdes needed by an instance exist.
467 # check bridges existance
468 brlist = [nic.bridge for nic in instance.nics]
469 if not rpc.call_bridges_exist(instance.primary_node, brlist):
470 raise errors.OpPrereqError("one or more target bridges %s does not"
471 " exist on destination node '%s'" %
472 (brlist, instance.primary_node))
475 class LUInitCluster(LogicalUnit):
476 """Initialise the cluster.
479 HPATH = "cluster-init"
480 HTYPE = constants.HTYPE_CLUSTER
481 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482 "def_bridge", "master_netdev"]
485 def BuildHooksEnv(self):
488 Notes: Since we don't require a cluster, we must manually add
489 ourselves in the post-run node list.
492 env = {"OP_TARGET": self.op.cluster_name}
493 return env, [], [self.hostname.name]
495 def CheckPrereq(self):
496 """Verify that the passed name is a valid one.
499 if config.ConfigWriter.IsCluster():
500 raise errors.OpPrereqError("Cluster is already initialised")
502 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503 if not os.path.exists(constants.VNC_PASSWORD_FILE):
504 raise errors.OpPrereqError("Please prepare the cluster VNC"
506 constants.VNC_PASSWORD_FILE)
508 self.hostname = hostname = utils.HostInfo()
510 if hostname.ip.startswith("127."):
511 raise errors.OpPrereqError("This host's IP resolves to the private"
512 " range (%s). Please fix DNS or /etc/hosts." %
515 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518 constants.DEFAULT_NODED_PORT):
519 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520 " to %s,\nbut this ip address does not"
521 " belong to this host."
522 " Aborting." % hostname.ip)
524 secondary_ip = getattr(self.op, "secondary_ip", None)
525 if secondary_ip and not utils.IsValidIP(secondary_ip):
526 raise errors.OpPrereqError("Invalid secondary ip given")
528 secondary_ip != hostname.ip and
529 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530 constants.DEFAULT_NODED_PORT))):
531 raise errors.OpPrereqError("You gave %s as secondary IP,"
532 " but it does not belong to this host." %
534 self.secondary_ip = secondary_ip
536 # checks presence of the volume group given
537 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540 raise errors.OpPrereqError("Error: %s" % vgstatus)
542 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547 if self.op.hypervisor_type not in constants.HYPER_TYPES:
548 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549 self.op.hypervisor_type)
551 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554 (self.op.master_netdev,
555 result.output.strip()))
557 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559 raise errors.OpPrereqError("Init.d script '%s' missing or not"
560 " executable." % constants.NODE_INITD_SCRIPT)
562 def Exec(self, feedback_fn):
563 """Initialize the cluster.
566 clustername = self.clustername
567 hostname = self.hostname
569 # set up the simple store
570 self.sstore = ss = ssconf.SimpleStore()
571 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577 # set up the inter-node password and certificate
578 _InitGanetiServerSetup(ss)
580 # start the master ip
581 rpc.call_node_start_master(hostname.name)
583 # set up ssh config and /etc/hosts
584 f = open(constants.SSH_HOST_RSA_PUB, 'r')
589 sshkey = sshline.split(" ")[1]
591 _AddHostToEtcHosts(hostname.name)
593 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595 _InitSSHSetup(hostname.name)
597 # init of cluster config file
598 self.cfg = cfgw = config.ConfigWriter()
599 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600 sshkey, self.op.mac_prefix,
601 self.op.vg_name, self.op.def_bridge)
604 class LUDestroyCluster(NoHooksLU):
605 """Logical unit for destroying the cluster.
610 def CheckPrereq(self):
611 """Check prerequisites.
613 This checks whether the cluster is empty.
615 Any errors are signalled by raising errors.OpPrereqError.
618 master = self.sstore.GetMasterNode()
620 nodelist = self.cfg.GetNodeList()
621 if len(nodelist) != 1 or nodelist[0] != master:
622 raise errors.OpPrereqError("There are still %d node(s) in"
623 " this cluster." % (len(nodelist) - 1))
624 instancelist = self.cfg.GetInstanceList()
626 raise errors.OpPrereqError("There are still %d instance(s) in"
627 " this cluster." % len(instancelist))
629 def Exec(self, feedback_fn):
630 """Destroys the cluster.
633 master = self.sstore.GetMasterNode()
634 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635 utils.CreateBackup(priv_key)
636 utils.CreateBackup(pub_key)
637 rpc.call_node_leave_cluster(master)
640 class LUVerifyCluster(NoHooksLU):
641 """Verifies the cluster status.
646 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647 remote_version, feedback_fn):
648 """Run multiple tests against a node.
651 - compares ganeti version
652 - checks vg existance and size > 20G
653 - checks config file checksum
654 - checks ssh to other nodes
657 node: name of the node to check
658 file_list: required list of files
659 local_cksum: dictionary of local files and their checksums
662 # compares ganeti version
663 local_version = constants.PROTOCOL_VERSION
664 if not remote_version:
665 feedback_fn(" - ERROR: connection to %s failed" % (node))
668 if local_version != remote_version:
669 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
670 (local_version, node, remote_version))
673 # checks vg existance and size > 20G
677 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
681 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
686 # checks config file checksum
689 if 'filelist' not in node_result:
691 feedback_fn(" - ERROR: node hasn't returned file checksum data")
693 remote_cksum = node_result['filelist']
694 for file_name in file_list:
695 if file_name not in remote_cksum:
697 feedback_fn(" - ERROR: file '%s' missing" % file_name)
698 elif remote_cksum[file_name] != local_cksum[file_name]:
700 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
702 if 'nodelist' not in node_result:
704 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
706 if node_result['nodelist']:
708 for node in node_result['nodelist']:
709 feedback_fn(" - ERROR: communication with node '%s': %s" %
710 (node, node_result['nodelist'][node]))
711 hyp_result = node_result.get('hypervisor', None)
712 if hyp_result is not None:
713 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
716 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717 """Verify an instance.
719 This function checks to see if the required block devices are
720 available on the instance's node.
725 instancelist = self.cfg.GetInstanceList()
726 if not instance in instancelist:
727 feedback_fn(" - ERROR: instance %s not in instance list %s" %
728 (instance, instancelist))
731 instanceconfig = self.cfg.GetInstanceInfo(instance)
732 node_current = instanceconfig.primary_node
735 instanceconfig.MapLVsByNode(node_vol_should)
737 for node in node_vol_should:
738 for volume in node_vol_should[node]:
739 if node not in node_vol_is or volume not in node_vol_is[node]:
740 feedback_fn(" - ERROR: volume %s missing on node %s" %
744 if not instanceconfig.status == 'down':
745 if not instance in node_instance[node_current]:
746 feedback_fn(" - ERROR: instance %s not running on node %s" %
747 (instance, node_current))
750 for node in node_instance:
751 if (not node == node_current):
752 if instance in node_instance[node]:
753 feedback_fn(" - ERROR: instance %s should not run on node %s" %
759 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760 """Verify if there are any unknown volumes in the cluster.
762 The .os, .swap and backup volumes are ignored. All other volumes are
768 for node in node_vol_is:
769 for volume in node_vol_is[node]:
770 if node not in node_vol_should or volume not in node_vol_should[node]:
771 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
776 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777 """Verify the list of running instances.
779 This checks what instances are running but unknown to the cluster.
783 for node in node_instance:
784 for runninginstance in node_instance[node]:
785 if runninginstance not in instancelist:
786 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
787 (runninginstance, node))
791 def CheckPrereq(self):
792 """Check prerequisites.
794 This has no prerequisites.
799 def Exec(self, feedback_fn):
800 """Verify integrity of cluster, performing various test on nodes.
804 feedback_fn("* Verifying global settings")
805 for msg in self.cfg.VerifyConfig():
806 feedback_fn(" - ERROR: %s" % msg)
808 vg_name = self.cfg.GetVGName()
809 nodelist = utils.NiceSort(self.cfg.GetNodeList())
810 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
814 # FIXME: verify OS list
816 file_names = list(self.sstore.GetFileList())
817 file_names.append(constants.SSL_CERT_FILE)
818 file_names.append(constants.CLUSTER_CONF_FILE)
819 local_checksums = utils.FingerprintFiles(file_names)
821 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823 all_instanceinfo = rpc.call_instance_list(nodelist)
824 all_vglist = rpc.call_vg_list(nodelist)
825 node_verify_param = {
826 'filelist': file_names,
827 'nodelist': nodelist,
830 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831 all_rversion = rpc.call_version(nodelist)
833 for node in nodelist:
834 feedback_fn("* Verifying node %s" % node)
835 result = self._VerifyNode(node, file_names, local_checksums,
836 all_vglist[node], all_nvinfo[node],
837 all_rversion[node], feedback_fn)
841 volumeinfo = all_volumeinfo[node]
843 if type(volumeinfo) != dict:
844 feedback_fn(" - ERROR: connection to %s failed" % (node,))
848 node_volume[node] = volumeinfo
851 nodeinstance = all_instanceinfo[node]
852 if type(nodeinstance) != list:
853 feedback_fn(" - ERROR: connection to %s failed" % (node,))
857 node_instance[node] = nodeinstance
861 for instance in instancelist:
862 feedback_fn("* Verifying instance %s" % instance)
863 result = self._VerifyInstance(instance, node_volume, node_instance,
867 inst_config = self.cfg.GetInstanceInfo(instance)
869 inst_config.MapLVsByNode(node_vol_should)
871 feedback_fn("* Verifying orphan volumes")
872 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
876 feedback_fn("* Verifying remaining instances")
877 result = self._VerifyOrphanInstances(instancelist, node_instance,
884 class LUVerifyDisks(NoHooksLU):
885 """Verifies the cluster disks status.
890 def CheckPrereq(self):
891 """Check prerequisites.
893 This has no prerequisites.
898 def Exec(self, feedback_fn):
899 """Verify integrity of cluster disks.
902 result = res_nodes, res_instances = [], []
904 vg_name = self.cfg.GetVGName()
905 nodes = utils.NiceSort(self.cfg.GetNodeList())
906 instances = [self.cfg.GetInstanceInfo(name)
907 for name in self.cfg.GetInstanceList()]
910 for inst in instances:
912 if (inst.status != "up" or
913 inst.disk_template not in constants.DTS_NET_MIRROR):
915 inst.MapLVsByNode(inst_lvs)
916 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
917 for node, vol_list in inst_lvs.iteritems():
919 nv_dict[(node, vol)] = inst
924 node_lvs = rpc.call_volume_list(nodes, vg_name)
931 if not isinstance(lvs, dict):
932 logger.Info("connection to node %s failed or invalid data returned" %
934 res_nodes.append(node)
937 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
939 inst = nv_dict.get((node, lv_name), None)
940 if inst is not None and inst.name not in res_instances:
941 res_instances.append(inst.name)
946 class LURenameCluster(LogicalUnit):
947 """Rename the cluster.
950 HPATH = "cluster-rename"
951 HTYPE = constants.HTYPE_CLUSTER
954 def BuildHooksEnv(self):
959 "OP_TARGET": self.op.sstore.GetClusterName(),
960 "NEW_NAME": self.op.name,
962 mn = self.sstore.GetMasterNode()
963 return env, [mn], [mn]
965 def CheckPrereq(self):
966 """Verify that the passed name is a valid one.
969 hostname = utils.HostInfo(self.op.name)
971 new_name = hostname.name
972 self.ip = new_ip = hostname.ip
973 old_name = self.sstore.GetClusterName()
974 old_ip = self.sstore.GetMasterIP()
975 if new_name == old_name and new_ip == old_ip:
976 raise errors.OpPrereqError("Neither the name nor the IP address of the"
977 " cluster has changed")
979 result = utils.RunCmd(["fping", "-q", new_ip])
980 if not result.failed:
981 raise errors.OpPrereqError("The given cluster IP address (%s) is"
982 " reachable on the network. Aborting." %
985 self.op.name = new_name
987 def Exec(self, feedback_fn):
988 """Rename the cluster.
991 clustername = self.op.name
995 # shutdown the master IP
996 master = ss.GetMasterNode()
997 if not rpc.call_node_stop_master(master):
998 raise errors.OpExecError("Could not disable the master role")
1002 ss.SetKey(ss.SS_MASTER_IP, ip)
1003 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1005 # Distribute updated ss config to all nodes
1006 myself = self.cfg.GetNodeInfo(master)
1007 dist_nodes = self.cfg.GetNodeList()
1008 if myself.name in dist_nodes:
1009 dist_nodes.remove(myself.name)
1011 logger.Debug("Copying updated ssconf data to all nodes")
1012 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1013 fname = ss.KeyToFilename(keyname)
1014 result = rpc.call_upload_file(dist_nodes, fname)
1015 for to_node in dist_nodes:
1016 if not result[to_node]:
1017 logger.Error("copy of file %s to node %s failed" %
1020 if not rpc.call_node_start_master(master):
1021 logger.Error("Could not re-enable the master role on the master,"
1022 " please restart manually.")
1025 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1026 """Sleep and poll for an instance's disk to sync.
1029 if not instance.disks:
1033 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1035 node = instance.primary_node
1037 for dev in instance.disks:
1038 cfgw.SetDiskID(dev, node)
1044 cumul_degraded = False
1045 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1047 proc.LogWarning("Can't get any data from node %s" % node)
1050 raise errors.RemoteError("Can't contact node %s for mirror data,"
1051 " aborting." % node)
1055 for i in range(len(rstats)):
1058 proc.LogWarning("Can't compute data for node %s/%s" %
1059 (node, instance.disks[i].iv_name))
1061 # we ignore the ldisk parameter
1062 perc_done, est_time, is_degraded, _ = mstat
1063 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1064 if perc_done is not None:
1066 if est_time is not None:
1067 rem_time = "%d estimated seconds remaining" % est_time
1070 rem_time = "no time estimate"
1071 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1072 (instance.disks[i].iv_name, perc_done, rem_time))
1079 time.sleep(min(60, max_time))
1085 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1086 return not cumul_degraded
1089 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1090 """Check that mirrors are not degraded.
1092 The ldisk parameter, if True, will change the test from the
1093 is_degraded attribute (which represents overall non-ok status for
1094 the device(s)) to the ldisk (representing the local storage status).
1097 cfgw.SetDiskID(dev, node)
1104 if on_primary or dev.AssembleOnSecondary():
1105 rstats = rpc.call_blockdev_find(node, dev)
1107 logger.ToStderr("Can't get any data from node %s" % node)
1110 result = result and (not rstats[idx])
1112 for child in dev.children:
1113 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1118 class LUDiagnoseOS(NoHooksLU):
1119 """Logical unit for OS diagnose/query.
1124 def CheckPrereq(self):
1125 """Check prerequisites.
1127 This always succeeds, since this is a pure query LU.
1132 def Exec(self, feedback_fn):
1133 """Compute the list of OSes.
1136 node_list = self.cfg.GetNodeList()
1137 node_data = rpc.call_os_diagnose(node_list)
1138 if node_data == False:
1139 raise errors.OpExecError("Can't gather the list of OSes")
1143 class LURemoveNode(LogicalUnit):
1144 """Logical unit for removing a node.
1147 HPATH = "node-remove"
1148 HTYPE = constants.HTYPE_NODE
1149 _OP_REQP = ["node_name"]
1151 def BuildHooksEnv(self):
1154 This doesn't run on the target node in the pre phase as a failed
1155 node would not allows itself to run.
1159 "OP_TARGET": self.op.node_name,
1160 "NODE_NAME": self.op.node_name,
1162 all_nodes = self.cfg.GetNodeList()
1163 all_nodes.remove(self.op.node_name)
1164 return env, all_nodes, all_nodes
1166 def CheckPrereq(self):
1167 """Check prerequisites.
1170 - the node exists in the configuration
1171 - it does not have primary or secondary instances
1172 - it's not the master
1174 Any errors are signalled by raising errors.OpPrereqError.
1177 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1179 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1181 instance_list = self.cfg.GetInstanceList()
1183 masternode = self.sstore.GetMasterNode()
1184 if node.name == masternode:
1185 raise errors.OpPrereqError("Node is the master node,"
1186 " you need to failover first.")
1188 for instance_name in instance_list:
1189 instance = self.cfg.GetInstanceInfo(instance_name)
1190 if node.name == instance.primary_node:
1191 raise errors.OpPrereqError("Instance %s still running on the node,"
1192 " please remove first." % instance_name)
1193 if node.name in instance.secondary_nodes:
1194 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1195 " please remove first." % instance_name)
1196 self.op.node_name = node.name
1199 def Exec(self, feedback_fn):
1200 """Removes the node from the cluster.
1204 logger.Info("stopping the node daemon and removing configs from node %s" %
1207 rpc.call_node_leave_cluster(node.name)
1209 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1211 logger.Info("Removing node %s from config" % node.name)
1213 self.cfg.RemoveNode(node.name)
1215 _RemoveHostFromEtcHosts(node.name)
1218 class LUQueryNodes(NoHooksLU):
1219 """Logical unit for querying nodes.
1222 _OP_REQP = ["output_fields", "names"]
1224 def CheckPrereq(self):
1225 """Check prerequisites.
1227 This checks that the fields required are valid output fields.
1230 self.dynamic_fields = frozenset(["dtotal", "dfree",
1231 "mtotal", "mnode", "mfree",
1234 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1235 "pinst_list", "sinst_list",
1237 dynamic=self.dynamic_fields,
1238 selected=self.op.output_fields)
1240 self.wanted = _GetWantedNodes(self, self.op.names)
1242 def Exec(self, feedback_fn):
1243 """Computes the list of nodes and their attributes.
1246 nodenames = self.wanted
1247 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1249 # begin data gathering
1251 if self.dynamic_fields.intersection(self.op.output_fields):
1253 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1254 for name in nodenames:
1255 nodeinfo = node_data.get(name, None)
1258 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1259 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1260 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1261 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1262 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1263 "bootid": nodeinfo['bootid'],
1266 live_data[name] = {}
1268 live_data = dict.fromkeys(nodenames, {})
1270 node_to_primary = dict([(name, set()) for name in nodenames])
1271 node_to_secondary = dict([(name, set()) for name in nodenames])
1273 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1274 "sinst_cnt", "sinst_list"))
1275 if inst_fields & frozenset(self.op.output_fields):
1276 instancelist = self.cfg.GetInstanceList()
1278 for instance_name in instancelist:
1279 inst = self.cfg.GetInstanceInfo(instance_name)
1280 if inst.primary_node in node_to_primary:
1281 node_to_primary[inst.primary_node].add(inst.name)
1282 for secnode in inst.secondary_nodes:
1283 if secnode in node_to_secondary:
1284 node_to_secondary[secnode].add(inst.name)
1286 # end data gathering
1289 for node in nodelist:
1291 for field in self.op.output_fields:
1294 elif field == "pinst_list":
1295 val = list(node_to_primary[node.name])
1296 elif field == "sinst_list":
1297 val = list(node_to_secondary[node.name])
1298 elif field == "pinst_cnt":
1299 val = len(node_to_primary[node.name])
1300 elif field == "sinst_cnt":
1301 val = len(node_to_secondary[node.name])
1302 elif field == "pip":
1303 val = node.primary_ip
1304 elif field == "sip":
1305 val = node.secondary_ip
1306 elif field in self.dynamic_fields:
1307 val = live_data[node.name].get(field, None)
1309 raise errors.ParameterError(field)
1310 node_output.append(val)
1311 output.append(node_output)
1316 class LUQueryNodeVolumes(NoHooksLU):
1317 """Logical unit for getting volumes on node(s).
1320 _OP_REQP = ["nodes", "output_fields"]
1322 def CheckPrereq(self):
1323 """Check prerequisites.
1325 This checks that the fields required are valid output fields.
1328 self.nodes = _GetWantedNodes(self, self.op.nodes)
1330 _CheckOutputFields(static=["node"],
1331 dynamic=["phys", "vg", "name", "size", "instance"],
1332 selected=self.op.output_fields)
1335 def Exec(self, feedback_fn):
1336 """Computes the list of nodes and their attributes.
1339 nodenames = self.nodes
1340 volumes = rpc.call_node_volumes(nodenames)
1342 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1343 in self.cfg.GetInstanceList()]
1345 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1348 for node in nodenames:
1349 if node not in volumes or not volumes[node]:
1352 node_vols = volumes[node][:]
1353 node_vols.sort(key=lambda vol: vol['dev'])
1355 for vol in node_vols:
1357 for field in self.op.output_fields:
1360 elif field == "phys":
1364 elif field == "name":
1366 elif field == "size":
1367 val = int(float(vol['size']))
1368 elif field == "instance":
1370 if node not in lv_by_node[inst]:
1372 if vol['name'] in lv_by_node[inst][node]:
1378 raise errors.ParameterError(field)
1379 node_output.append(str(val))
1381 output.append(node_output)
1386 class LUAddNode(LogicalUnit):
1387 """Logical unit for adding node to the cluster.
1391 HTYPE = constants.HTYPE_NODE
1392 _OP_REQP = ["node_name"]
1394 def BuildHooksEnv(self):
1397 This will run on all nodes before, and on all nodes + the new node after.
1401 "OP_TARGET": self.op.node_name,
1402 "NODE_NAME": self.op.node_name,
1403 "NODE_PIP": self.op.primary_ip,
1404 "NODE_SIP": self.op.secondary_ip,
1406 nodes_0 = self.cfg.GetNodeList()
1407 nodes_1 = nodes_0 + [self.op.node_name, ]
1408 return env, nodes_0, nodes_1
1410 def CheckPrereq(self):
1411 """Check prerequisites.
1414 - the new node is not already in the config
1416 - its parameters (single/dual homed) matches the cluster
1418 Any errors are signalled by raising errors.OpPrereqError.
1421 node_name = self.op.node_name
1424 dns_data = utils.HostInfo(node_name)
1426 node = dns_data.name
1427 primary_ip = self.op.primary_ip = dns_data.ip
1428 secondary_ip = getattr(self.op, "secondary_ip", None)
1429 if secondary_ip is None:
1430 secondary_ip = primary_ip
1431 if not utils.IsValidIP(secondary_ip):
1432 raise errors.OpPrereqError("Invalid secondary IP given")
1433 self.op.secondary_ip = secondary_ip
1434 node_list = cfg.GetNodeList()
1435 if node in node_list:
1436 raise errors.OpPrereqError("Node %s is already in the configuration"
1439 for existing_node_name in node_list:
1440 existing_node = cfg.GetNodeInfo(existing_node_name)
1441 if (existing_node.primary_ip == primary_ip or
1442 existing_node.secondary_ip == primary_ip or
1443 existing_node.primary_ip == secondary_ip or
1444 existing_node.secondary_ip == secondary_ip):
1445 raise errors.OpPrereqError("New node ip address(es) conflict with"
1446 " existing node %s" % existing_node.name)
1448 # check that the type of the node (single versus dual homed) is the
1449 # same as for the master
1450 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1451 master_singlehomed = myself.secondary_ip == myself.primary_ip
1452 newbie_singlehomed = secondary_ip == primary_ip
1453 if master_singlehomed != newbie_singlehomed:
1454 if master_singlehomed:
1455 raise errors.OpPrereqError("The master has no private ip but the"
1456 " new node has one")
1458 raise errors.OpPrereqError("The master has a private ip but the"
1459 " new node doesn't have one")
1461 # checks reachablity
1462 if not utils.TcpPing(utils.HostInfo().name,
1464 constants.DEFAULT_NODED_PORT):
1465 raise errors.OpPrereqError("Node not reachable by ping")
1467 if not newbie_singlehomed:
1468 # check reachability from my secondary ip to newbie's secondary ip
1469 if not utils.TcpPing(myself.secondary_ip,
1471 constants.DEFAULT_NODED_PORT):
1472 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1473 " based ping to noded port")
1475 self.new_node = objects.Node(name=node,
1476 primary_ip=primary_ip,
1477 secondary_ip=secondary_ip)
1479 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1480 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1481 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1482 constants.VNC_PASSWORD_FILE)
1484 def Exec(self, feedback_fn):
1485 """Adds the new node to the cluster.
1488 new_node = self.new_node
1489 node = new_node.name
1491 # set up inter-node password and certificate and restarts the node daemon
1492 gntpass = self.sstore.GetNodeDaemonPassword()
1493 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1494 raise errors.OpExecError("ganeti password corruption detected")
1495 f = open(constants.SSL_CERT_FILE)
1497 gntpem = f.read(8192)
1500 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1501 # so we use this to detect an invalid certificate; as long as the
1502 # cert doesn't contain this, the here-document will be correctly
1503 # parsed by the shell sequence below
1504 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1505 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1506 if not gntpem.endswith("\n"):
1507 raise errors.OpExecError("PEM must end with newline")
1508 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1510 # and then connect with ssh to set password and start ganeti-noded
1511 # note that all the below variables are sanitized at this point,
1512 # either by being constants or by the checks above
1514 mycommand = ("umask 077 && "
1515 "echo '%s' > '%s' && "
1516 "cat > '%s' << '!EOF.' && \n"
1517 "%s!EOF.\n%s restart" %
1518 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1519 constants.SSL_CERT_FILE, gntpem,
1520 constants.NODE_INITD_SCRIPT))
1522 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1524 raise errors.OpExecError("Remote command on node %s, error: %s,"
1526 (node, result.fail_reason, result.output))
1528 # check connectivity
1531 result = rpc.call_version([node])[node]
1533 if constants.PROTOCOL_VERSION == result:
1534 logger.Info("communication to node %s fine, sw version %s match" %
1537 raise errors.OpExecError("Version mismatch master version %s,"
1538 " node version %s" %
1539 (constants.PROTOCOL_VERSION, result))
1541 raise errors.OpExecError("Cannot get version from the new node")
1544 logger.Info("copy ssh key to node %s" % node)
1545 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1547 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1548 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1554 keyarray.append(f.read())
1558 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1559 keyarray[3], keyarray[4], keyarray[5])
1562 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1564 # Add node to our /etc/hosts, and add key to known_hosts
1565 _AddHostToEtcHosts(new_node.name)
1567 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1568 self.cfg.GetHostKey())
1570 if new_node.secondary_ip != new_node.primary_ip:
1571 if not rpc.call_node_tcp_ping(new_node.name,
1572 constants.LOCALHOST_IP_ADDRESS,
1573 new_node.secondary_ip,
1574 constants.DEFAULT_NODED_PORT,
1576 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1577 " you gave (%s). Please fix and re-run this"
1578 " command." % new_node.secondary_ip)
1580 success, msg = ssh.VerifyNodeHostname(node)
1582 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1583 " than the one the resolver gives: %s."
1584 " Please fix and re-run this command." %
1587 # Distribute updated /etc/hosts and known_hosts to all nodes,
1588 # including the node just added
1589 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1590 dist_nodes = self.cfg.GetNodeList() + [node]
1591 if myself.name in dist_nodes:
1592 dist_nodes.remove(myself.name)
1594 logger.Debug("Copying hosts and known_hosts to all nodes")
1595 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1596 result = rpc.call_upload_file(dist_nodes, fname)
1597 for to_node in dist_nodes:
1598 if not result[to_node]:
1599 logger.Error("copy of file %s to node %s failed" %
1602 to_copy = ss.GetFileList()
1603 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1604 to_copy.append(constants.VNC_PASSWORD_FILE)
1605 for fname in to_copy:
1606 if not ssh.CopyFileToNode(node, fname):
1607 logger.Error("could not copy file %s to node %s" % (fname, node))
1609 logger.Info("adding node %s to cluster.conf" % node)
1610 self.cfg.AddNode(new_node)
1613 class LUMasterFailover(LogicalUnit):
1614 """Failover the master node to the current node.
1616 This is a special LU in that it must run on a non-master node.
1619 HPATH = "master-failover"
1620 HTYPE = constants.HTYPE_CLUSTER
1624 def BuildHooksEnv(self):
1627 This will run on the new master only in the pre phase, and on all
1628 the nodes in the post phase.
1632 "OP_TARGET": self.new_master,
1633 "NEW_MASTER": self.new_master,
1634 "OLD_MASTER": self.old_master,
1636 return env, [self.new_master], self.cfg.GetNodeList()
1638 def CheckPrereq(self):
1639 """Check prerequisites.
1641 This checks that we are not already the master.
1644 self.new_master = utils.HostInfo().name
1645 self.old_master = self.sstore.GetMasterNode()
1647 if self.old_master == self.new_master:
1648 raise errors.OpPrereqError("This commands must be run on the node"
1649 " where you want the new master to be."
1650 " %s is already the master" %
1653 def Exec(self, feedback_fn):
1654 """Failover the master node.
1656 This command, when run on a non-master node, will cause the current
1657 master to cease being master, and the non-master to become new
1661 #TODO: do not rely on gethostname returning the FQDN
1662 logger.Info("setting master to %s, old master: %s" %
1663 (self.new_master, self.old_master))
1665 if not rpc.call_node_stop_master(self.old_master):
1666 logger.Error("could disable the master role on the old master"
1667 " %s, please disable manually" % self.old_master)
1670 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1671 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1672 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1673 logger.Error("could not distribute the new simple store master file"
1674 " to the other nodes, please check.")
1676 if not rpc.call_node_start_master(self.new_master):
1677 logger.Error("could not start the master role on the new master"
1678 " %s, please check" % self.new_master)
1679 feedback_fn("Error in activating the master IP on the new master,"
1680 " please fix manually.")
1684 class LUQueryClusterInfo(NoHooksLU):
1685 """Query cluster configuration.
1691 def CheckPrereq(self):
1692 """No prerequsites needed for this LU.
1697 def Exec(self, feedback_fn):
1698 """Return cluster config.
1702 "name": self.sstore.GetClusterName(),
1703 "software_version": constants.RELEASE_VERSION,
1704 "protocol_version": constants.PROTOCOL_VERSION,
1705 "config_version": constants.CONFIG_VERSION,
1706 "os_api_version": constants.OS_API_VERSION,
1707 "export_version": constants.EXPORT_VERSION,
1708 "master": self.sstore.GetMasterNode(),
1709 "architecture": (platform.architecture()[0], platform.machine()),
1715 class LUClusterCopyFile(NoHooksLU):
1716 """Copy file to cluster.
1719 _OP_REQP = ["nodes", "filename"]
1721 def CheckPrereq(self):
1722 """Check prerequisites.
1724 It should check that the named file exists and that the given list
1728 if not os.path.exists(self.op.filename):
1729 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1731 self.nodes = _GetWantedNodes(self, self.op.nodes)
1733 def Exec(self, feedback_fn):
1734 """Copy a file from master to some nodes.
1737 opts - class with options as members
1738 args - list containing a single element, the file name
1740 nodes - list containing the name of target nodes; if empty, all nodes
1743 filename = self.op.filename
1745 myname = utils.HostInfo().name
1747 for node in self.nodes:
1750 if not ssh.CopyFileToNode(node, filename):
1751 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1754 class LUDumpClusterConfig(NoHooksLU):
1755 """Return a text-representation of the cluster-config.
1760 def CheckPrereq(self):
1761 """No prerequisites.
1766 def Exec(self, feedback_fn):
1767 """Dump a representation of the cluster config to the standard output.
1770 return self.cfg.DumpConfig()
1773 class LURunClusterCommand(NoHooksLU):
1774 """Run a command on some nodes.
1777 _OP_REQP = ["command", "nodes"]
1779 def CheckPrereq(self):
1780 """Check prerequisites.
1782 It checks that the given list of nodes is valid.
1785 self.nodes = _GetWantedNodes(self, self.op.nodes)
1787 def Exec(self, feedback_fn):
1788 """Run a command on some nodes.
1792 for node in self.nodes:
1793 result = ssh.SSHCall(node, "root", self.op.command)
1794 data.append((node, result.output, result.exit_code))
1799 class LUActivateInstanceDisks(NoHooksLU):
1800 """Bring up an instance's disks.
1803 _OP_REQP = ["instance_name"]
1805 def CheckPrereq(self):
1806 """Check prerequisites.
1808 This checks that the instance is in the cluster.
1811 instance = self.cfg.GetInstanceInfo(
1812 self.cfg.ExpandInstanceName(self.op.instance_name))
1813 if instance is None:
1814 raise errors.OpPrereqError("Instance '%s' not known" %
1815 self.op.instance_name)
1816 self.instance = instance
1819 def Exec(self, feedback_fn):
1820 """Activate the disks.
1823 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1825 raise errors.OpExecError("Cannot activate block devices")
1830 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1831 """Prepare the block devices for an instance.
1833 This sets up the block devices on all nodes.
1836 instance: a ganeti.objects.Instance object
1837 ignore_secondaries: if true, errors on secondary nodes won't result
1838 in an error return from the function
1841 false if the operation failed
1842 list of (host, instance_visible_name, node_visible_name) if the operation
1843 suceeded with the mapping from node devices to instance devices
1847 for inst_disk in instance.disks:
1848 master_result = None
1849 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1850 cfg.SetDiskID(node_disk, node)
1851 is_primary = node == instance.primary_node
1852 result = rpc.call_blockdev_assemble(node, node_disk,
1853 instance.name, is_primary)
1855 logger.Error("could not prepare block device %s on node %s"
1856 " (is_primary=%s)" %
1857 (inst_disk.iv_name, node, is_primary))
1858 if is_primary or not ignore_secondaries:
1861 master_result = result
1862 device_info.append((instance.primary_node, inst_disk.iv_name,
1865 # leave the disks configured for the primary node
1866 # this is a workaround that would be fixed better by
1867 # improving the logical/physical id handling
1868 for disk in instance.disks:
1869 cfg.SetDiskID(disk, instance.primary_node)
1871 return disks_ok, device_info
1874 def _StartInstanceDisks(cfg, instance, force):
1875 """Start the disks of an instance.
1878 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1879 ignore_secondaries=force)
1881 _ShutdownInstanceDisks(instance, cfg)
1882 if force is not None and not force:
1883 logger.Error("If the message above refers to a secondary node,"
1884 " you can retry the operation using '--force'.")
1885 raise errors.OpExecError("Disk consistency error")
1888 class LUDeactivateInstanceDisks(NoHooksLU):
1889 """Shutdown an instance's disks.
1892 _OP_REQP = ["instance_name"]
1894 def CheckPrereq(self):
1895 """Check prerequisites.
1897 This checks that the instance is in the cluster.
1900 instance = self.cfg.GetInstanceInfo(
1901 self.cfg.ExpandInstanceName(self.op.instance_name))
1902 if instance is None:
1903 raise errors.OpPrereqError("Instance '%s' not known" %
1904 self.op.instance_name)
1905 self.instance = instance
1907 def Exec(self, feedback_fn):
1908 """Deactivate the disks
1911 instance = self.instance
1912 ins_l = rpc.call_instance_list([instance.primary_node])
1913 ins_l = ins_l[instance.primary_node]
1914 if not type(ins_l) is list:
1915 raise errors.OpExecError("Can't contact node '%s'" %
1916 instance.primary_node)
1918 if self.instance.name in ins_l:
1919 raise errors.OpExecError("Instance is running, can't shutdown"
1922 _ShutdownInstanceDisks(instance, self.cfg)
1925 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1926 """Shutdown block devices of an instance.
1928 This does the shutdown on all nodes of the instance.
1930 If the ignore_primary is false, errors on the primary node are
1935 for disk in instance.disks:
1936 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1937 cfg.SetDiskID(top_disk, node)
1938 if not rpc.call_blockdev_shutdown(node, top_disk):
1939 logger.Error("could not shutdown block device %s on node %s" %
1940 (disk.iv_name, node))
1941 if not ignore_primary or node != instance.primary_node:
1946 class LUStartupInstance(LogicalUnit):
1947 """Starts an instance.
1950 HPATH = "instance-start"
1951 HTYPE = constants.HTYPE_INSTANCE
1952 _OP_REQP = ["instance_name", "force"]
1954 def BuildHooksEnv(self):
1957 This runs on master, primary and secondary nodes of the instance.
1961 "FORCE": self.op.force,
1963 env.update(_BuildInstanceHookEnvByObject(self.instance))
1964 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965 list(self.instance.secondary_nodes))
1968 def CheckPrereq(self):
1969 """Check prerequisites.
1971 This checks that the instance is in the cluster.
1974 instance = self.cfg.GetInstanceInfo(
1975 self.cfg.ExpandInstanceName(self.op.instance_name))
1976 if instance is None:
1977 raise errors.OpPrereqError("Instance '%s' not known" %
1978 self.op.instance_name)
1980 # check bridges existance
1981 _CheckInstanceBridgesExist(instance)
1983 self.instance = instance
1984 self.op.instance_name = instance.name
1986 def Exec(self, feedback_fn):
1987 """Start the instance.
1990 instance = self.instance
1991 force = self.op.force
1992 extra_args = getattr(self.op, "extra_args", "")
1994 node_current = instance.primary_node
1996 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1998 raise errors.OpExecError("Could not contact node %s for infos" %
2001 freememory = nodeinfo[node_current]['memory_free']
2002 memory = instance.memory
2003 if memory > freememory:
2004 raise errors.OpExecError("Not enough memory to start instance"
2006 " needed %s MiB, available %s MiB" %
2007 (instance.name, node_current, memory,
2010 _StartInstanceDisks(self.cfg, instance, force)
2012 if not rpc.call_instance_start(node_current, instance, extra_args):
2013 _ShutdownInstanceDisks(instance, self.cfg)
2014 raise errors.OpExecError("Could not start instance")
2016 self.cfg.MarkInstanceUp(instance.name)
2019 class LURebootInstance(LogicalUnit):
2020 """Reboot an instance.
2023 HPATH = "instance-reboot"
2024 HTYPE = constants.HTYPE_INSTANCE
2025 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2027 def BuildHooksEnv(self):
2030 This runs on master, primary and secondary nodes of the instance.
2034 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2036 env.update(_BuildInstanceHookEnvByObject(self.instance))
2037 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2038 list(self.instance.secondary_nodes))
2041 def CheckPrereq(self):
2042 """Check prerequisites.
2044 This checks that the instance is in the cluster.
2047 instance = self.cfg.GetInstanceInfo(
2048 self.cfg.ExpandInstanceName(self.op.instance_name))
2049 if instance is None:
2050 raise errors.OpPrereqError("Instance '%s' not known" %
2051 self.op.instance_name)
2053 # check bridges existance
2054 _CheckInstanceBridgesExist(instance)
2056 self.instance = instance
2057 self.op.instance_name = instance.name
2059 def Exec(self, feedback_fn):
2060 """Reboot the instance.
2063 instance = self.instance
2064 ignore_secondaries = self.op.ignore_secondaries
2065 reboot_type = self.op.reboot_type
2066 extra_args = getattr(self.op, "extra_args", "")
2068 node_current = instance.primary_node
2070 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2071 constants.INSTANCE_REBOOT_HARD,
2072 constants.INSTANCE_REBOOT_FULL]:
2073 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2074 (constants.INSTANCE_REBOOT_SOFT,
2075 constants.INSTANCE_REBOOT_HARD,
2076 constants.INSTANCE_REBOOT_FULL))
2078 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2079 constants.INSTANCE_REBOOT_HARD]:
2080 if not rpc.call_instance_reboot(node_current, instance,
2081 reboot_type, extra_args):
2082 raise errors.OpExecError("Could not reboot instance")
2084 if not rpc.call_instance_shutdown(node_current, instance):
2085 raise errors.OpExecError("could not shutdown instance for full reboot")
2086 _ShutdownInstanceDisks(instance, self.cfg)
2087 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2088 if not rpc.call_instance_start(node_current, instance, extra_args):
2089 _ShutdownInstanceDisks(instance, self.cfg)
2090 raise errors.OpExecError("Could not start instance for full reboot")
2092 self.cfg.MarkInstanceUp(instance.name)
2095 class LUShutdownInstance(LogicalUnit):
2096 """Shutdown an instance.
2099 HPATH = "instance-stop"
2100 HTYPE = constants.HTYPE_INSTANCE
2101 _OP_REQP = ["instance_name"]
2103 def BuildHooksEnv(self):
2106 This runs on master, primary and secondary nodes of the instance.
2109 env = _BuildInstanceHookEnvByObject(self.instance)
2110 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2111 list(self.instance.secondary_nodes))
2114 def CheckPrereq(self):
2115 """Check prerequisites.
2117 This checks that the instance is in the cluster.
2120 instance = self.cfg.GetInstanceInfo(
2121 self.cfg.ExpandInstanceName(self.op.instance_name))
2122 if instance is None:
2123 raise errors.OpPrereqError("Instance '%s' not known" %
2124 self.op.instance_name)
2125 self.instance = instance
2127 def Exec(self, feedback_fn):
2128 """Shutdown the instance.
2131 instance = self.instance
2132 node_current = instance.primary_node
2133 if not rpc.call_instance_shutdown(node_current, instance):
2134 logger.Error("could not shutdown instance")
2136 self.cfg.MarkInstanceDown(instance.name)
2137 _ShutdownInstanceDisks(instance, self.cfg)
2140 class LUReinstallInstance(LogicalUnit):
2141 """Reinstall an instance.
2144 HPATH = "instance-reinstall"
2145 HTYPE = constants.HTYPE_INSTANCE
2146 _OP_REQP = ["instance_name"]
2148 def BuildHooksEnv(self):
2151 This runs on master, primary and secondary nodes of the instance.
2154 env = _BuildInstanceHookEnvByObject(self.instance)
2155 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2156 list(self.instance.secondary_nodes))
2159 def CheckPrereq(self):
2160 """Check prerequisites.
2162 This checks that the instance is in the cluster and is not running.
2165 instance = self.cfg.GetInstanceInfo(
2166 self.cfg.ExpandInstanceName(self.op.instance_name))
2167 if instance is None:
2168 raise errors.OpPrereqError("Instance '%s' not known" %
2169 self.op.instance_name)
2170 if instance.disk_template == constants.DT_DISKLESS:
2171 raise errors.OpPrereqError("Instance '%s' has no disks" %
2172 self.op.instance_name)
2173 if instance.status != "down":
2174 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2175 self.op.instance_name)
2176 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2178 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2179 (self.op.instance_name,
2180 instance.primary_node))
2182 self.op.os_type = getattr(self.op, "os_type", None)
2183 if self.op.os_type is not None:
2185 pnode = self.cfg.GetNodeInfo(
2186 self.cfg.ExpandNodeName(instance.primary_node))
2188 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2190 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2192 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2193 " primary node" % self.op.os_type)
2195 self.instance = instance
2197 def Exec(self, feedback_fn):
2198 """Reinstall the instance.
2201 inst = self.instance
2203 if self.op.os_type is not None:
2204 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2205 inst.os = self.op.os_type
2206 self.cfg.AddInstance(inst)
2208 _StartInstanceDisks(self.cfg, inst, None)
2210 feedback_fn("Running the instance OS create scripts...")
2211 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2212 raise errors.OpExecError("Could not install OS for instance %s"
2214 (inst.name, inst.primary_node))
2216 _ShutdownInstanceDisks(inst, self.cfg)
2219 class LURenameInstance(LogicalUnit):
2220 """Rename an instance.
2223 HPATH = "instance-rename"
2224 HTYPE = constants.HTYPE_INSTANCE
2225 _OP_REQP = ["instance_name", "new_name"]
2227 def BuildHooksEnv(self):
2230 This runs on master, primary and secondary nodes of the instance.
2233 env = _BuildInstanceHookEnvByObject(self.instance)
2234 env["INSTANCE_NEW_NAME"] = self.op.new_name
2235 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2236 list(self.instance.secondary_nodes))
2239 def CheckPrereq(self):
2240 """Check prerequisites.
2242 This checks that the instance is in the cluster and is not running.
2245 instance = self.cfg.GetInstanceInfo(
2246 self.cfg.ExpandInstanceName(self.op.instance_name))
2247 if instance is None:
2248 raise errors.OpPrereqError("Instance '%s' not known" %
2249 self.op.instance_name)
2250 if instance.status != "down":
2251 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2252 self.op.instance_name)
2253 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2255 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2256 (self.op.instance_name,
2257 instance.primary_node))
2258 self.instance = instance
2260 # new name verification
2261 name_info = utils.HostInfo(self.op.new_name)
2263 self.op.new_name = new_name = name_info.name
2264 if not getattr(self.op, "ignore_ip", False):
2265 command = ["fping", "-q", name_info.ip]
2266 result = utils.RunCmd(command)
2267 if not result.failed:
2268 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2269 (name_info.ip, new_name))
2272 def Exec(self, feedback_fn):
2273 """Reinstall the instance.
2276 inst = self.instance
2277 old_name = inst.name
2279 self.cfg.RenameInstance(inst.name, self.op.new_name)
2281 # re-read the instance from the configuration after rename
2282 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2284 _StartInstanceDisks(self.cfg, inst, None)
2286 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2288 msg = ("Could run OS rename script for instance %s on node %s (but the"
2289 " instance has been renamed in Ganeti)" %
2290 (inst.name, inst.primary_node))
2293 _ShutdownInstanceDisks(inst, self.cfg)
2296 class LURemoveInstance(LogicalUnit):
2297 """Remove an instance.
2300 HPATH = "instance-remove"
2301 HTYPE = constants.HTYPE_INSTANCE
2302 _OP_REQP = ["instance_name"]
2304 def BuildHooksEnv(self):
2307 This runs on master, primary and secondary nodes of the instance.
2310 env = _BuildInstanceHookEnvByObject(self.instance)
2311 nl = [self.sstore.GetMasterNode()]
2314 def CheckPrereq(self):
2315 """Check prerequisites.
2317 This checks that the instance is in the cluster.
2320 instance = self.cfg.GetInstanceInfo(
2321 self.cfg.ExpandInstanceName(self.op.instance_name))
2322 if instance is None:
2323 raise errors.OpPrereqError("Instance '%s' not known" %
2324 self.op.instance_name)
2325 self.instance = instance
2327 def Exec(self, feedback_fn):
2328 """Remove the instance.
2331 instance = self.instance
2332 logger.Info("shutting down instance %s on node %s" %
2333 (instance.name, instance.primary_node))
2335 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2336 if self.op.ignore_failures:
2337 feedback_fn("Warning: can't shutdown instance")
2339 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2340 (instance.name, instance.primary_node))
2342 logger.Info("removing block devices for instance %s" % instance.name)
2344 if not _RemoveDisks(instance, self.cfg):
2345 if self.op.ignore_failures:
2346 feedback_fn("Warning: can't remove instance's disks")
2348 raise errors.OpExecError("Can't remove instance's disks")
2350 logger.Info("removing instance %s out of cluster config" % instance.name)
2352 self.cfg.RemoveInstance(instance.name)
2355 class LUQueryInstances(NoHooksLU):
2356 """Logical unit for querying instances.
2359 _OP_REQP = ["output_fields", "names"]
2361 def CheckPrereq(self):
2362 """Check prerequisites.
2364 This checks that the fields required are valid output fields.
2367 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2368 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2369 "admin_state", "admin_ram",
2370 "disk_template", "ip", "mac", "bridge",
2371 "sda_size", "sdb_size"],
2372 dynamic=self.dynamic_fields,
2373 selected=self.op.output_fields)
2375 self.wanted = _GetWantedInstances(self, self.op.names)
2377 def Exec(self, feedback_fn):
2378 """Computes the list of nodes and their attributes.
2381 instance_names = self.wanted
2382 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2385 # begin data gathering
2387 nodes = frozenset([inst.primary_node for inst in instance_list])
2390 if self.dynamic_fields.intersection(self.op.output_fields):
2392 node_data = rpc.call_all_instances_info(nodes)
2394 result = node_data[name]
2396 live_data.update(result)
2397 elif result == False:
2398 bad_nodes.append(name)
2399 # else no instance is alive
2401 live_data = dict([(name, {}) for name in instance_names])
2403 # end data gathering
2406 for instance in instance_list:
2408 for field in self.op.output_fields:
2413 elif field == "pnode":
2414 val = instance.primary_node
2415 elif field == "snodes":
2416 val = list(instance.secondary_nodes)
2417 elif field == "admin_state":
2418 val = (instance.status != "down")
2419 elif field == "oper_state":
2420 if instance.primary_node in bad_nodes:
2423 val = bool(live_data.get(instance.name))
2424 elif field == "admin_ram":
2425 val = instance.memory
2426 elif field == "oper_ram":
2427 if instance.primary_node in bad_nodes:
2429 elif instance.name in live_data:
2430 val = live_data[instance.name].get("memory", "?")
2433 elif field == "disk_template":
2434 val = instance.disk_template
2436 val = instance.nics[0].ip
2437 elif field == "bridge":
2438 val = instance.nics[0].bridge
2439 elif field == "mac":
2440 val = instance.nics[0].mac
2441 elif field == "sda_size" or field == "sdb_size":
2442 disk = instance.FindDisk(field[:3])
2448 raise errors.ParameterError(field)
2455 class LUFailoverInstance(LogicalUnit):
2456 """Failover an instance.
2459 HPATH = "instance-failover"
2460 HTYPE = constants.HTYPE_INSTANCE
2461 _OP_REQP = ["instance_name", "ignore_consistency"]
2463 def BuildHooksEnv(self):
2466 This runs on master, primary and secondary nodes of the instance.
2470 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2472 env.update(_BuildInstanceHookEnvByObject(self.instance))
2473 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2476 def CheckPrereq(self):
2477 """Check prerequisites.
2479 This checks that the instance is in the cluster.
2482 instance = self.cfg.GetInstanceInfo(
2483 self.cfg.ExpandInstanceName(self.op.instance_name))
2484 if instance is None:
2485 raise errors.OpPrereqError("Instance '%s' not known" %
2486 self.op.instance_name)
2488 if instance.disk_template not in constants.DTS_NET_MIRROR:
2489 raise errors.OpPrereqError("Instance's disk layout is not"
2490 " network mirrored, cannot failover.")
2492 secondary_nodes = instance.secondary_nodes
2493 if not secondary_nodes:
2494 raise errors.ProgrammerError("no secondary node but using "
2495 "DT_REMOTE_RAID1 template")
2497 # check memory requirements on the secondary node
2498 target_node = secondary_nodes[0]
2499 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2500 info = nodeinfo.get(target_node, None)
2502 raise errors.OpPrereqError("Cannot get current information"
2503 " from node '%s'" % nodeinfo)
2504 if instance.memory > info['memory_free']:
2505 raise errors.OpPrereqError("Not enough memory on target node %s."
2506 " %d MB available, %d MB required" %
2507 (target_node, info['memory_free'],
2510 # check bridge existance
2511 brlist = [nic.bridge for nic in instance.nics]
2512 if not rpc.call_bridges_exist(target_node, brlist):
2513 raise errors.OpPrereqError("One or more target bridges %s does not"
2514 " exist on destination node '%s'" %
2515 (brlist, target_node))
2517 self.instance = instance
2519 def Exec(self, feedback_fn):
2520 """Failover an instance.
2522 The failover is done by shutting it down on its present node and
2523 starting it on the secondary.
2526 instance = self.instance
2528 source_node = instance.primary_node
2529 target_node = instance.secondary_nodes[0]
2531 feedback_fn("* checking disk consistency between source and target")
2532 for dev in instance.disks:
2533 # for remote_raid1, these are md over drbd
2534 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2535 if not self.op.ignore_consistency:
2536 raise errors.OpExecError("Disk %s is degraded on target node,"
2537 " aborting failover." % dev.iv_name)
2539 feedback_fn("* checking target node resource availability")
2540 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2543 raise errors.OpExecError("Could not contact target node %s." %
2546 free_memory = int(nodeinfo[target_node]['memory_free'])
2547 memory = instance.memory
2548 if memory > free_memory:
2549 raise errors.OpExecError("Not enough memory to create instance %s on"
2550 " node %s. needed %s MiB, available %s MiB" %
2551 (instance.name, target_node, memory,
2554 feedback_fn("* shutting down instance on source node")
2555 logger.Info("Shutting down instance %s on node %s" %
2556 (instance.name, source_node))
2558 if not rpc.call_instance_shutdown(source_node, instance):
2559 if self.op.ignore_consistency:
2560 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2561 " anyway. Please make sure node %s is down" %
2562 (instance.name, source_node, source_node))
2564 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2565 (instance.name, source_node))
2567 feedback_fn("* deactivating the instance's disks on source node")
2568 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2569 raise errors.OpExecError("Can't shut down the instance's disks.")
2571 instance.primary_node = target_node
2572 # distribute new instance config to the other nodes
2573 self.cfg.AddInstance(instance)
2575 feedback_fn("* activating the instance's disks on target node")
2576 logger.Info("Starting instance %s on node %s" %
2577 (instance.name, target_node))
2579 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2580 ignore_secondaries=True)
2582 _ShutdownInstanceDisks(instance, self.cfg)
2583 raise errors.OpExecError("Can't activate the instance's disks")
2585 feedback_fn("* starting the instance on the target node")
2586 if not rpc.call_instance_start(target_node, instance, None):
2587 _ShutdownInstanceDisks(instance, self.cfg)
2588 raise errors.OpExecError("Could not start instance %s on node %s." %
2589 (instance.name, target_node))
2592 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2593 """Create a tree of block devices on the primary node.
2595 This always creates all devices.
2599 for child in device.children:
2600 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2603 cfg.SetDiskID(device, node)
2604 new_id = rpc.call_blockdev_create(node, device, device.size,
2605 instance.name, True, info)
2608 if device.physical_id is None:
2609 device.physical_id = new_id
2613 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2614 """Create a tree of block devices on a secondary node.
2616 If this device type has to be created on secondaries, create it and
2619 If not, just recurse to children keeping the same 'force' value.
2622 if device.CreateOnSecondary():
2625 for child in device.children:
2626 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2627 child, force, info):
2632 cfg.SetDiskID(device, node)
2633 new_id = rpc.call_blockdev_create(node, device, device.size,
2634 instance.name, False, info)
2637 if device.physical_id is None:
2638 device.physical_id = new_id
2642 def _GenerateUniqueNames(cfg, exts):
2643 """Generate a suitable LV name.
2645 This will generate a logical volume name for the given instance.
2650 new_id = cfg.GenerateUniqueID()
2651 results.append("%s%s" % (new_id, val))
2655 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2656 """Generate a drbd device complete with its children.
2659 port = cfg.AllocatePort()
2660 vgname = cfg.GetVGName()
2661 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662 logical_id=(vgname, names[0]))
2663 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664 logical_id=(vgname, names[1]))
2665 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2666 logical_id = (primary, secondary, port),
2667 children = [dev_data, dev_meta])
2671 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2672 """Generate a drbd8 device complete with its children.
2675 port = cfg.AllocatePort()
2676 vgname = cfg.GetVGName()
2677 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2678 logical_id=(vgname, names[0]))
2679 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2680 logical_id=(vgname, names[1]))
2681 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2682 logical_id = (primary, secondary, port),
2683 children = [dev_data, dev_meta],
2687 def _GenerateDiskTemplate(cfg, template_name,
2688 instance_name, primary_node,
2689 secondary_nodes, disk_sz, swap_sz):
2690 """Generate the entire disk layout for a given template type.
2693 #TODO: compute space requirements
2695 vgname = cfg.GetVGName()
2696 if template_name == "diskless":
2698 elif template_name == "plain":
2699 if len(secondary_nodes) != 0:
2700 raise errors.ProgrammerError("Wrong template configuration")
2702 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704 logical_id=(vgname, names[0]),
2706 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707 logical_id=(vgname, names[1]),
2709 disks = [sda_dev, sdb_dev]
2710 elif template_name == "local_raid1":
2711 if len(secondary_nodes) != 0:
2712 raise errors.ProgrammerError("Wrong template configuration")
2715 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2716 ".sdb_m1", ".sdb_m2"])
2717 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2718 logical_id=(vgname, names[0]))
2719 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2720 logical_id=(vgname, names[1]))
2721 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2723 children = [sda_dev_m1, sda_dev_m2])
2724 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2725 logical_id=(vgname, names[2]))
2726 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2727 logical_id=(vgname, names[3]))
2728 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2730 children = [sdb_dev_m1, sdb_dev_m2])
2731 disks = [md_sda_dev, md_sdb_dev]
2732 elif template_name == constants.DT_REMOTE_RAID1:
2733 if len(secondary_nodes) != 1:
2734 raise errors.ProgrammerError("Wrong template configuration")
2735 remote_node = secondary_nodes[0]
2736 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2737 ".sdb_data", ".sdb_meta"])
2738 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2739 disk_sz, names[0:2])
2740 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2741 children = [drbd_sda_dev], size=disk_sz)
2742 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2743 swap_sz, names[2:4])
2744 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2745 children = [drbd_sdb_dev], size=swap_sz)
2746 disks = [md_sda_dev, md_sdb_dev]
2747 elif template_name == constants.DT_DRBD8:
2748 if len(secondary_nodes) != 1:
2749 raise errors.ProgrammerError("Wrong template configuration")
2750 remote_node = secondary_nodes[0]
2751 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752 ".sdb_data", ".sdb_meta"])
2753 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2754 disk_sz, names[0:2], "sda")
2755 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2756 swap_sz, names[2:4], "sdb")
2757 disks = [drbd_sda_dev, drbd_sdb_dev]
2759 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2763 def _GetInstanceInfoText(instance):
2764 """Compute that text that should be added to the disk's metadata.
2767 return "originstname+%s" % instance.name
2770 def _CreateDisks(cfg, instance):
2771 """Create all disks for an instance.
2773 This abstracts away some work from AddInstance.
2776 instance: the instance object
2779 True or False showing the success of the creation process
2782 info = _GetInstanceInfoText(instance)
2784 for device in instance.disks:
2785 logger.Info("creating volume %s for instance %s" %
2786 (device.iv_name, instance.name))
2788 for secondary_node in instance.secondary_nodes:
2789 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2790 device, False, info):
2791 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2792 (device.iv_name, device, secondary_node))
2795 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2796 instance, device, info):
2797 logger.Error("failed to create volume %s on primary!" %
2803 def _RemoveDisks(instance, cfg):
2804 """Remove all disks for an instance.
2806 This abstracts away some work from `AddInstance()` and
2807 `RemoveInstance()`. Note that in case some of the devices couldn't
2808 be removed, the removal will continue with the other ones (compare
2809 with `_CreateDisks()`).
2812 instance: the instance object
2815 True or False showing the success of the removal proces
2818 logger.Info("removing block devices for instance %s" % instance.name)
2821 for device in instance.disks:
2822 for node, disk in device.ComputeNodeTree(instance.primary_node):
2823 cfg.SetDiskID(disk, node)
2824 if not rpc.call_blockdev_remove(node, disk):
2825 logger.Error("could not remove block device %s on node %s,"
2826 " continuing anyway" %
2827 (device.iv_name, node))
2832 class LUCreateInstance(LogicalUnit):
2833 """Create an instance.
2836 HPATH = "instance-add"
2837 HTYPE = constants.HTYPE_INSTANCE
2838 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2839 "disk_template", "swap_size", "mode", "start", "vcpus",
2840 "wait_for_sync", "ip_check", "mac"]
2842 def BuildHooksEnv(self):
2845 This runs on master, primary and secondary nodes of the instance.
2849 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2850 "INSTANCE_DISK_SIZE": self.op.disk_size,
2851 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2852 "INSTANCE_ADD_MODE": self.op.mode,
2854 if self.op.mode == constants.INSTANCE_IMPORT:
2855 env["INSTANCE_SRC_NODE"] = self.op.src_node
2856 env["INSTANCE_SRC_PATH"] = self.op.src_path
2857 env["INSTANCE_SRC_IMAGE"] = self.src_image
2859 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2860 primary_node=self.op.pnode,
2861 secondary_nodes=self.secondaries,
2862 status=self.instance_status,
2863 os_type=self.op.os_type,
2864 memory=self.op.mem_size,
2865 vcpus=self.op.vcpus,
2866 nics=[(self.inst_ip, self.op.bridge)],
2869 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2874 def CheckPrereq(self):
2875 """Check prerequisites.
2878 if self.op.mode not in (constants.INSTANCE_CREATE,
2879 constants.INSTANCE_IMPORT):
2880 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2883 if self.op.mode == constants.INSTANCE_IMPORT:
2884 src_node = getattr(self.op, "src_node", None)
2885 src_path = getattr(self.op, "src_path", None)
2886 if src_node is None or src_path is None:
2887 raise errors.OpPrereqError("Importing an instance requires source"
2888 " node and path options")
2889 src_node_full = self.cfg.ExpandNodeName(src_node)
2890 if src_node_full is None:
2891 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2892 self.op.src_node = src_node = src_node_full
2894 if not os.path.isabs(src_path):
2895 raise errors.OpPrereqError("The source path must be absolute")
2897 export_info = rpc.call_export_info(src_node, src_path)
2900 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2902 if not export_info.has_section(constants.INISECT_EXP):
2903 raise errors.ProgrammerError("Corrupted export config")
2905 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2906 if (int(ei_version) != constants.EXPORT_VERSION):
2907 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2908 (ei_version, constants.EXPORT_VERSION))
2910 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2911 raise errors.OpPrereqError("Can't import instance with more than"
2914 # FIXME: are the old os-es, disk sizes, etc. useful?
2915 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2916 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2918 self.src_image = diskimage
2919 else: # INSTANCE_CREATE
2920 if getattr(self.op, "os_type", None) is None:
2921 raise errors.OpPrereqError("No guest OS specified")
2923 # check primary node
2924 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2926 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2928 self.op.pnode = pnode.name
2930 self.secondaries = []
2931 # disk template and mirror node verification
2932 if self.op.disk_template not in constants.DISK_TEMPLATES:
2933 raise errors.OpPrereqError("Invalid disk template name")
2935 if self.op.disk_template in constants.DTS_NET_MIRROR:
2936 if getattr(self.op, "snode", None) is None:
2937 raise errors.OpPrereqError("The networked disk templates need"
2940 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2941 if snode_name is None:
2942 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2944 elif snode_name == pnode.name:
2945 raise errors.OpPrereqError("The secondary node cannot be"
2946 " the primary node.")
2947 self.secondaries.append(snode_name)
2949 # Check lv size requirements
2950 nodenames = [pnode.name] + self.secondaries
2951 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2953 # Required free disk space as a function of disk and swap space
2955 constants.DT_DISKLESS: 0,
2956 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2957 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2958 # 256 MB are added for drbd metadata, 128MB for each drbd device
2959 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2960 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2963 if self.op.disk_template not in req_size_dict:
2964 raise errors.ProgrammerError("Disk template '%s' size requirement"
2965 " is unknown" % self.op.disk_template)
2967 req_size = req_size_dict[self.op.disk_template]
2969 for node in nodenames:
2970 info = nodeinfo.get(node, None)
2972 raise errors.OpPrereqError("Cannot get current information"
2973 " from node '%s'" % nodeinfo)
2974 if req_size > info['vg_free']:
2975 raise errors.OpPrereqError("Not enough disk space on target node %s."
2976 " %d MB available, %d MB required" %
2977 (node, info['vg_free'], req_size))
2980 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2982 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2983 " primary node" % self.op.os_type)
2985 # instance verification
2986 hostname1 = utils.HostInfo(self.op.instance_name)
2988 self.op.instance_name = instance_name = hostname1.name
2989 instance_list = self.cfg.GetInstanceList()
2990 if instance_name in instance_list:
2991 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2994 ip = getattr(self.op, "ip", None)
2995 if ip is None or ip.lower() == "none":
2997 elif ip.lower() == "auto":
2998 inst_ip = hostname1.ip
3000 if not utils.IsValidIP(ip):
3001 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3002 " like a valid IP" % ip)
3004 self.inst_ip = inst_ip
3006 if self.op.start and not self.op.ip_check:
3007 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3008 " adding an instance in start mode")
3010 if self.op.ip_check:
3011 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3012 constants.DEFAULT_NODED_PORT):
3013 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3014 (hostname1.ip, instance_name))
3016 # MAC address verification
3017 if self.op.mac != "auto":
3018 if not utils.IsValidMac(self.op.mac.lower()):
3019 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3022 # bridge verification
3023 bridge = getattr(self.op, "bridge", None)
3025 self.op.bridge = self.cfg.GetDefBridge()
3027 self.op.bridge = bridge
3029 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3030 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3031 " destination node '%s'" %
3032 (self.op.bridge, pnode.name))
3035 self.instance_status = 'up'
3037 self.instance_status = 'down'
3039 def Exec(self, feedback_fn):
3040 """Create and add the instance to the cluster.
3043 instance = self.op.instance_name
3044 pnode_name = self.pnode.name
3046 if self.op.mac == "auto":
3047 mac_address=self.cfg.GenerateMAC()
3049 mac_address=self.op.mac
3051 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3052 if self.inst_ip is not None:
3053 nic.ip = self.inst_ip
3055 ht_kind = self.sstore.GetHypervisorType()
3056 if ht_kind in constants.HTS_REQ_PORT:
3057 network_port = self.cfg.AllocatePort()
3061 disks = _GenerateDiskTemplate(self.cfg,
3062 self.op.disk_template,
3063 instance, pnode_name,
3064 self.secondaries, self.op.disk_size,
3067 iobj = objects.Instance(name=instance, os=self.op.os_type,
3068 primary_node=pnode_name,
3069 memory=self.op.mem_size,
3070 vcpus=self.op.vcpus,
3071 nics=[nic], disks=disks,
3072 disk_template=self.op.disk_template,
3073 status=self.instance_status,
3074 network_port=network_port,
3077 feedback_fn("* creating instance disks...")
3078 if not _CreateDisks(self.cfg, iobj):
3079 _RemoveDisks(iobj, self.cfg)
3080 raise errors.OpExecError("Device creation failed, reverting...")
3082 feedback_fn("adding instance %s to cluster config" % instance)
3084 self.cfg.AddInstance(iobj)
3086 if self.op.wait_for_sync:
3087 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3088 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3089 # make sure the disks are not degraded (still sync-ing is ok)
3091 feedback_fn("* checking mirrors status")
3092 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3097 _RemoveDisks(iobj, self.cfg)
3098 self.cfg.RemoveInstance(iobj.name)
3099 raise errors.OpExecError("There are some degraded disks for"
3102 feedback_fn("creating os for instance %s on node %s" %
3103 (instance, pnode_name))
3105 if iobj.disk_template != constants.DT_DISKLESS:
3106 if self.op.mode == constants.INSTANCE_CREATE:
3107 feedback_fn("* running the instance OS create scripts...")
3108 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3109 raise errors.OpExecError("could not add os for instance %s"
3111 (instance, pnode_name))
3113 elif self.op.mode == constants.INSTANCE_IMPORT:
3114 feedback_fn("* running the instance OS import scripts...")
3115 src_node = self.op.src_node
3116 src_image = self.src_image
3117 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3118 src_node, src_image):
3119 raise errors.OpExecError("Could not import os for instance"
3121 (instance, pnode_name))
3123 # also checked in the prereq part
3124 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3128 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3129 feedback_fn("* starting instance...")
3130 if not rpc.call_instance_start(pnode_name, iobj, None):
3131 raise errors.OpExecError("Could not start instance")
3134 class LUConnectConsole(NoHooksLU):
3135 """Connect to an instance's console.
3137 This is somewhat special in that it returns the command line that
3138 you need to run on the master node in order to connect to the
3142 _OP_REQP = ["instance_name"]
3144 def CheckPrereq(self):
3145 """Check prerequisites.
3147 This checks that the instance is in the cluster.
3150 instance = self.cfg.GetInstanceInfo(
3151 self.cfg.ExpandInstanceName(self.op.instance_name))
3152 if instance is None:
3153 raise errors.OpPrereqError("Instance '%s' not known" %
3154 self.op.instance_name)
3155 self.instance = instance
3157 def Exec(self, feedback_fn):
3158 """Connect to the console of an instance
3161 instance = self.instance
3162 node = instance.primary_node
3164 node_insts = rpc.call_instance_list([node])[node]
3165 if node_insts is False:
3166 raise errors.OpExecError("Can't connect to node %s." % node)
3168 if instance.name not in node_insts:
3169 raise errors.OpExecError("Instance %s is not running." % instance.name)
3171 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3173 hyper = hypervisor.GetHypervisor()
3174 console_cmd = hyper.GetShellCommandForConsole(instance)
3176 argv = ["ssh", "-q", "-t"]
3177 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3178 argv.extend(ssh.BATCH_MODE_OPTS)
3180 argv.append(console_cmd)
3184 class LUAddMDDRBDComponent(LogicalUnit):
3185 """Adda new mirror member to an instance's disk.
3188 HPATH = "mirror-add"
3189 HTYPE = constants.HTYPE_INSTANCE
3190 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3192 def BuildHooksEnv(self):
3195 This runs on the master, the primary and all the secondaries.
3199 "NEW_SECONDARY": self.op.remote_node,
3200 "DISK_NAME": self.op.disk_name,
3202 env.update(_BuildInstanceHookEnvByObject(self.instance))
3203 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3204 self.op.remote_node,] + list(self.instance.secondary_nodes)
3207 def CheckPrereq(self):
3208 """Check prerequisites.
3210 This checks that the instance is in the cluster.
3213 instance = self.cfg.GetInstanceInfo(
3214 self.cfg.ExpandInstanceName(self.op.instance_name))
3215 if instance is None:
3216 raise errors.OpPrereqError("Instance '%s' not known" %
3217 self.op.instance_name)
3218 self.instance = instance
3220 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3221 if remote_node is None:
3222 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3223 self.remote_node = remote_node
3225 if remote_node == instance.primary_node:
3226 raise errors.OpPrereqError("The specified node is the primary node of"
3229 if instance.disk_template != constants.DT_REMOTE_RAID1:
3230 raise errors.OpPrereqError("Instance's disk layout is not"
3232 for disk in instance.disks:
3233 if disk.iv_name == self.op.disk_name:
3236 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3237 " instance." % self.op.disk_name)
3238 if len(disk.children) > 1:
3239 raise errors.OpPrereqError("The device already has two slave devices."
3240 " This would create a 3-disk raid1 which we"
3244 def Exec(self, feedback_fn):
3245 """Add the mirror component
3249 instance = self.instance
3251 remote_node = self.remote_node
3252 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3253 names = _GenerateUniqueNames(self.cfg, lv_names)
3254 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3255 remote_node, disk.size, names)
3257 logger.Info("adding new mirror component on secondary")
3259 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3261 _GetInstanceInfoText(instance)):
3262 raise errors.OpExecError("Failed to create new component on secondary"
3263 " node %s" % remote_node)
3265 logger.Info("adding new mirror component on primary")
3267 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3269 _GetInstanceInfoText(instance)):
3270 # remove secondary dev
3271 self.cfg.SetDiskID(new_drbd, remote_node)
3272 rpc.call_blockdev_remove(remote_node, new_drbd)
3273 raise errors.OpExecError("Failed to create volume on primary")
3275 # the device exists now
3276 # call the primary node to add the mirror to md
3277 logger.Info("adding new mirror component to md")
3278 if not rpc.call_blockdev_addchildren(instance.primary_node,
3280 logger.Error("Can't add mirror compoment to md!")
3281 self.cfg.SetDiskID(new_drbd, remote_node)
3282 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3283 logger.Error("Can't rollback on secondary")
3284 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3285 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3286 logger.Error("Can't rollback on primary")
3287 raise errors.OpExecError("Can't add mirror component to md array")
3289 disk.children.append(new_drbd)
3291 self.cfg.AddInstance(instance)
3293 _WaitForSync(self.cfg, instance, self.proc)
3298 class LURemoveMDDRBDComponent(LogicalUnit):
3299 """Remove a component from a remote_raid1 disk.
3302 HPATH = "mirror-remove"
3303 HTYPE = constants.HTYPE_INSTANCE
3304 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3306 def BuildHooksEnv(self):
3309 This runs on the master, the primary and all the secondaries.
3313 "DISK_NAME": self.op.disk_name,
3314 "DISK_ID": self.op.disk_id,
3315 "OLD_SECONDARY": self.old_secondary,
3317 env.update(_BuildInstanceHookEnvByObject(self.instance))
3318 nl = [self.sstore.GetMasterNode(),
3319 self.instance.primary_node] + list(self.instance.secondary_nodes)
3322 def CheckPrereq(self):
3323 """Check prerequisites.
3325 This checks that the instance is in the cluster.
3328 instance = self.cfg.GetInstanceInfo(
3329 self.cfg.ExpandInstanceName(self.op.instance_name))
3330 if instance is None:
3331 raise errors.OpPrereqError("Instance '%s' not known" %
3332 self.op.instance_name)
3333 self.instance = instance
3335 if instance.disk_template != constants.DT_REMOTE_RAID1:
3336 raise errors.OpPrereqError("Instance's disk layout is not"
3338 for disk in instance.disks:
3339 if disk.iv_name == self.op.disk_name:
3342 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3343 " instance." % self.op.disk_name)
3344 for child in disk.children:
3345 if (child.dev_type == constants.LD_DRBD7 and
3346 child.logical_id[2] == self.op.disk_id):
3349 raise errors.OpPrereqError("Can't find the device with this port.")
3351 if len(disk.children) < 2:
3352 raise errors.OpPrereqError("Cannot remove the last component from"
3356 if self.child.logical_id[0] == instance.primary_node:
3360 self.old_secondary = self.child.logical_id[oid]
3362 def Exec(self, feedback_fn):
3363 """Remove the mirror component
3366 instance = self.instance
3369 logger.Info("remove mirror component")
3370 self.cfg.SetDiskID(disk, instance.primary_node)
3371 if not rpc.call_blockdev_removechildren(instance.primary_node,
3373 raise errors.OpExecError("Can't remove child from mirror.")
3375 for node in child.logical_id[:2]:
3376 self.cfg.SetDiskID(child, node)
3377 if not rpc.call_blockdev_remove(node, child):
3378 logger.Error("Warning: failed to remove device from node %s,"
3379 " continuing operation." % node)
3381 disk.children.remove(child)
3382 self.cfg.AddInstance(instance)
3385 class LUReplaceDisks(LogicalUnit):
3386 """Replace the disks of an instance.
3389 HPATH = "mirrors-replace"
3390 HTYPE = constants.HTYPE_INSTANCE
3391 _OP_REQP = ["instance_name", "mode", "disks"]
3393 def BuildHooksEnv(self):
3396 This runs on the master, the primary and all the secondaries.
3400 "MODE": self.op.mode,
3401 "NEW_SECONDARY": self.op.remote_node,
3402 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3404 env.update(_BuildInstanceHookEnvByObject(self.instance))
3406 self.sstore.GetMasterNode(),
3407 self.instance.primary_node,
3409 if self.op.remote_node is not None:
3410 nl.append(self.op.remote_node)
3413 def CheckPrereq(self):
3414 """Check prerequisites.
3416 This checks that the instance is in the cluster.
3419 instance = self.cfg.GetInstanceInfo(
3420 self.cfg.ExpandInstanceName(self.op.instance_name))
3421 if instance is None:
3422 raise errors.OpPrereqError("Instance '%s' not known" %
3423 self.op.instance_name)
3424 self.instance = instance
3425 self.op.instance_name = instance.name
3427 if instance.disk_template not in constants.DTS_NET_MIRROR:
3428 raise errors.OpPrereqError("Instance's disk layout is not"
3429 " network mirrored.")
3431 if len(instance.secondary_nodes) != 1:
3432 raise errors.OpPrereqError("The instance has a strange layout,"
3433 " expected one secondary but found %d" %
3434 len(instance.secondary_nodes))
3436 self.sec_node = instance.secondary_nodes[0]
3438 remote_node = getattr(self.op, "remote_node", None)
3439 if remote_node is not None:
3440 remote_node = self.cfg.ExpandNodeName(remote_node)
3441 if remote_node is None:
3442 raise errors.OpPrereqError("Node '%s' not known" %
3443 self.op.remote_node)
3444 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3446 self.remote_node_info = None
3447 if remote_node == instance.primary_node:
3448 raise errors.OpPrereqError("The specified node is the primary node of"
3450 elif remote_node == self.sec_node:
3451 if self.op.mode == constants.REPLACE_DISK_SEC:
3452 # this is for DRBD8, where we can't execute the same mode of
3453 # replacement as for drbd7 (no different port allocated)
3454 raise errors.OpPrereqError("Same secondary given, cannot execute"
3456 # the user gave the current secondary, switch to
3457 # 'no-replace-secondary' mode for drbd7
3459 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3460 self.op.mode != constants.REPLACE_DISK_ALL):
3461 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3462 " disks replacement, not individual ones")
3463 if instance.disk_template == constants.DT_DRBD8:
3464 if (self.op.mode == constants.REPLACE_DISK_ALL and
3465 remote_node is not None):
3466 # switch to replace secondary mode
3467 self.op.mode = constants.REPLACE_DISK_SEC
3469 if self.op.mode == constants.REPLACE_DISK_ALL:
3470 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3471 " secondary disk replacement, not"
3473 elif self.op.mode == constants.REPLACE_DISK_PRI:
3474 if remote_node is not None:
3475 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3476 " the secondary while doing a primary"
3477 " node disk replacement")
3478 self.tgt_node = instance.primary_node
3479 self.oth_node = instance.secondary_nodes[0]
3480 elif self.op.mode == constants.REPLACE_DISK_SEC:
3481 self.new_node = remote_node # this can be None, in which case
3482 # we don't change the secondary
3483 self.tgt_node = instance.secondary_nodes[0]
3484 self.oth_node = instance.primary_node
3486 raise errors.ProgrammerError("Unhandled disk replace mode")
3488 for name in self.op.disks:
3489 if instance.FindDisk(name) is None:
3490 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3491 (name, instance.name))
3492 self.op.remote_node = remote_node
3494 def _ExecRR1(self, feedback_fn):
3495 """Replace the disks of an instance.
3498 instance = self.instance
3501 if self.op.remote_node is None:
3502 remote_node = self.sec_node
3504 remote_node = self.op.remote_node
3506 for dev in instance.disks:
3508 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3509 names = _GenerateUniqueNames(cfg, lv_names)
3510 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3511 remote_node, size, names)
3512 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3513 logger.Info("adding new mirror component on secondary for %s" %
3516 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3518 _GetInstanceInfoText(instance)):
3519 raise errors.OpExecError("Failed to create new component on secondary"
3520 " node %s. Full abort, cleanup manually!" %
3523 logger.Info("adding new mirror component on primary")
3525 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3527 _GetInstanceInfoText(instance)):
3528 # remove secondary dev
3529 cfg.SetDiskID(new_drbd, remote_node)
3530 rpc.call_blockdev_remove(remote_node, new_drbd)
3531 raise errors.OpExecError("Failed to create volume on primary!"
3532 " Full abort, cleanup manually!!")
3534 # the device exists now
3535 # call the primary node to add the mirror to md
3536 logger.Info("adding new mirror component to md")
3537 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3539 logger.Error("Can't add mirror compoment to md!")
3540 cfg.SetDiskID(new_drbd, remote_node)
3541 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3542 logger.Error("Can't rollback on secondary")
3543 cfg.SetDiskID(new_drbd, instance.primary_node)
3544 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3545 logger.Error("Can't rollback on primary")
3546 raise errors.OpExecError("Full abort, cleanup manually!!")
3548 dev.children.append(new_drbd)
3549 cfg.AddInstance(instance)
3551 # this can fail as the old devices are degraded and _WaitForSync
3552 # does a combined result over all disks, so we don't check its
3554 _WaitForSync(cfg, instance, self.proc, unlock=True)
3556 # so check manually all the devices
3557 for name in iv_names:
3558 dev, child, new_drbd = iv_names[name]
3559 cfg.SetDiskID(dev, instance.primary_node)
3560 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3562 raise errors.OpExecError("MD device %s is degraded!" % name)
3563 cfg.SetDiskID(new_drbd, instance.primary_node)
3564 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3566 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3568 for name in iv_names:
3569 dev, child, new_drbd = iv_names[name]
3570 logger.Info("remove mirror %s component" % name)
3571 cfg.SetDiskID(dev, instance.primary_node)
3572 if not rpc.call_blockdev_removechildren(instance.primary_node,
3574 logger.Error("Can't remove child from mirror, aborting"
3575 " *this device cleanup*.\nYou need to cleanup manually!!")
3578 for node in child.logical_id[:2]:
3579 logger.Info("remove child device on %s" % node)
3580 cfg.SetDiskID(child, node)
3581 if not rpc.call_blockdev_remove(node, child):
3582 logger.Error("Warning: failed to remove device from node %s,"
3583 " continuing operation." % node)
3585 dev.children.remove(child)
3587 cfg.AddInstance(instance)
3589 def _ExecD8DiskOnly(self, feedback_fn):
3590 """Replace a disk on the primary or secondary for dbrd8.
3592 The algorithm for replace is quite complicated:
3593 - for each disk to be replaced:
3594 - create new LVs on the target node with unique names
3595 - detach old LVs from the drbd device
3596 - rename old LVs to name_replaced.<time_t>
3597 - rename new LVs to old LVs
3598 - attach the new LVs (with the old names now) to the drbd device
3599 - wait for sync across all devices
3600 - for each modified disk:
3601 - remove old LVs (which have the name name_replaces.<time_t>)
3603 Failures are not very well handled.
3607 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3608 instance = self.instance
3610 vgname = self.cfg.GetVGName()
3613 tgt_node = self.tgt_node
3614 oth_node = self.oth_node
3616 # Step: check device activation
3617 self.proc.LogStep(1, steps_total, "check device existence")
3618 info("checking volume groups")
3619 my_vg = cfg.GetVGName()
3620 results = rpc.call_vg_list([oth_node, tgt_node])
3622 raise errors.OpExecError("Can't list volume groups on the nodes")
3623 for node in oth_node, tgt_node:
3624 res = results.get(node, False)
3625 if not res or my_vg not in res:
3626 raise errors.OpExecError("Volume group '%s' not found on %s" %
3628 for dev in instance.disks:
3629 if not dev.iv_name in self.op.disks:
3631 for node in tgt_node, oth_node:
3632 info("checking %s on %s" % (dev.iv_name, node))
3633 cfg.SetDiskID(dev, node)
3634 if not rpc.call_blockdev_find(node, dev):
3635 raise errors.OpExecError("Can't find device %s on node %s" %
3636 (dev.iv_name, node))
3638 # Step: check other node consistency
3639 self.proc.LogStep(2, steps_total, "check peer consistency")
3640 for dev in instance.disks:
3641 if not dev.iv_name in self.op.disks:
3643 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3644 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3645 oth_node==instance.primary_node):
3646 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3647 " to replace disks on this node (%s)" %
3648 (oth_node, tgt_node))
3650 # Step: create new storage
3651 self.proc.LogStep(3, steps_total, "allocate new storage")
3652 for dev in instance.disks:
3653 if not dev.iv_name in self.op.disks:
3656 cfg.SetDiskID(dev, tgt_node)
3657 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3658 names = _GenerateUniqueNames(cfg, lv_names)
3659 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3660 logical_id=(vgname, names[0]))
3661 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3662 logical_id=(vgname, names[1]))
3663 new_lvs = [lv_data, lv_meta]
3664 old_lvs = dev.children
3665 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3666 info("creating new local storage on %s for %s" %
3667 (tgt_node, dev.iv_name))
3668 # since we *always* want to create this LV, we use the
3669 # _Create...OnPrimary (which forces the creation), even if we
3670 # are talking about the secondary node
3671 for new_lv in new_lvs:
3672 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3673 _GetInstanceInfoText(instance)):
3674 raise errors.OpExecError("Failed to create new LV named '%s' on"
3676 (new_lv.logical_id[1], tgt_node))
3678 # Step: for each lv, detach+rename*2+attach
3679 self.proc.LogStep(4, steps_total, "change drbd configuration")
3680 for dev, old_lvs, new_lvs in iv_names.itervalues():
3681 info("detaching %s drbd from local storage" % dev.iv_name)
3682 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3683 raise errors.OpExecError("Can't detach drbd from local storage on node"
3684 " %s for device %s" % (tgt_node, dev.iv_name))
3686 #cfg.Update(instance)
3688 # ok, we created the new LVs, so now we know we have the needed
3689 # storage; as such, we proceed on the target node to rename
3690 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3691 # using the assumption than logical_id == physical_id (which in
3692 # turn is the unique_id on that node)
3694 # FIXME(iustin): use a better name for the replaced LVs
3695 temp_suffix = int(time.time())
3696 ren_fn = lambda d, suff: (d.physical_id[0],
3697 d.physical_id[1] + "_replaced-%s" % suff)
3698 # build the rename list based on what LVs exist on the node
3700 for to_ren in old_lvs:
3701 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3702 if find_res is not None: # device exists
3703 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3705 info("renaming the old LVs on the target node")
3706 if not rpc.call_blockdev_rename(tgt_node, rlist):
3707 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3708 # now we rename the new LVs to the old LVs
3709 info("renaming the new LVs on the target node")
3710 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3711 if not rpc.call_blockdev_rename(tgt_node, rlist):
3712 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3714 for old, new in zip(old_lvs, new_lvs):
3715 new.logical_id = old.logical_id
3716 cfg.SetDiskID(new, tgt_node)
3718 for disk in old_lvs:
3719 disk.logical_id = ren_fn(disk, temp_suffix)
3720 cfg.SetDiskID(disk, tgt_node)
3722 # now that the new lvs have the old name, we can add them to the device
3723 info("adding new mirror component on %s" % tgt_node)
3724 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3725 for new_lv in new_lvs:
3726 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3727 warning("Can't rollback device %s", hint="manually cleanup unused"
3729 raise errors.OpExecError("Can't add local storage to drbd")
3731 dev.children = new_lvs
3732 cfg.Update(instance)
3734 # Step: wait for sync
3736 # this can fail as the old devices are degraded and _WaitForSync
3737 # does a combined result over all disks, so we don't check its
3739 self.proc.LogStep(5, steps_total, "sync devices")
3740 _WaitForSync(cfg, instance, self.proc, unlock=True)
3742 # so check manually all the devices
3743 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3744 cfg.SetDiskID(dev, instance.primary_node)
3745 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3747 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3749 # Step: remove old storage
3750 self.proc.LogStep(6, steps_total, "removing old storage")
3751 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3752 info("remove logical volumes for %s" % name)
3754 cfg.SetDiskID(lv, tgt_node)
3755 if not rpc.call_blockdev_remove(tgt_node, lv):
3756 warning("Can't remove old LV", hint="manually remove unused LVs")
3759 def _ExecD8Secondary(self, feedback_fn):
3760 """Replace the secondary node for drbd8.
3762 The algorithm for replace is quite complicated:
3763 - for all disks of the instance:
3764 - create new LVs on the new node with same names
3765 - shutdown the drbd device on the old secondary
3766 - disconnect the drbd network on the primary
3767 - create the drbd device on the new secondary
3768 - network attach the drbd on the primary, using an artifice:
3769 the drbd code for Attach() will connect to the network if it
3770 finds a device which is connected to the good local disks but
3772 - wait for sync across all devices
3773 - remove all disks from the old secondary
3775 Failures are not very well handled.
3779 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3780 instance = self.instance
3782 vgname = self.cfg.GetVGName()
3785 old_node = self.tgt_node
3786 new_node = self.new_node
3787 pri_node = instance.primary_node
3789 # Step: check device activation
3790 self.proc.LogStep(1, steps_total, "check device existence")
3791 info("checking volume groups")
3792 my_vg = cfg.GetVGName()
3793 results = rpc.call_vg_list([pri_node, new_node])
3795 raise errors.OpExecError("Can't list volume groups on the nodes")
3796 for node in pri_node, new_node:
3797 res = results.get(node, False)
3798 if not res or my_vg not in res:
3799 raise errors.OpExecError("Volume group '%s' not found on %s" %
3801 for dev in instance.disks:
3802 if not dev.iv_name in self.op.disks:
3804 info("checking %s on %s" % (dev.iv_name, pri_node))
3805 cfg.SetDiskID(dev, pri_node)
3806 if not rpc.call_blockdev_find(pri_node, dev):
3807 raise errors.OpExecError("Can't find device %s on node %s" %
3808 (dev.iv_name, pri_node))
3810 # Step: check other node consistency
3811 self.proc.LogStep(2, steps_total, "check peer consistency")
3812 for dev in instance.disks:
3813 if not dev.iv_name in self.op.disks:
3815 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3816 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3817 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3818 " unsafe to replace the secondary" %
3821 # Step: create new storage
3822 self.proc.LogStep(3, steps_total, "allocate new storage")
3823 for dev in instance.disks:
3825 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3826 # since we *always* want to create this LV, we use the
3827 # _Create...OnPrimary (which forces the creation), even if we
3828 # are talking about the secondary node
3829 for new_lv in dev.children:
3830 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3831 _GetInstanceInfoText(instance)):
3832 raise errors.OpExecError("Failed to create new LV named '%s' on"
3834 (new_lv.logical_id[1], new_node))
3836 iv_names[dev.iv_name] = (dev, dev.children)
3838 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3839 for dev in instance.disks:
3841 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3842 # create new devices on new_node
3843 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3844 logical_id=(pri_node, new_node,
3846 children=dev.children)
3847 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3849 _GetInstanceInfoText(instance)):
3850 raise errors.OpExecError("Failed to create new DRBD on"
3851 " node '%s'" % new_node)
3853 for dev in instance.disks:
3854 # we have new devices, shutdown the drbd on the old secondary
3855 info("shutting down drbd for %s on old node" % dev.iv_name)
3856 cfg.SetDiskID(dev, old_node)
3857 if not rpc.call_blockdev_shutdown(old_node, dev):
3858 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3859 hint="Please cleanup this device manually as soon as possible")
3861 info("detaching primary drbds from the network (=> standalone)")
3863 for dev in instance.disks:
3864 cfg.SetDiskID(dev, pri_node)
3865 # set the physical (unique in bdev terms) id to None, meaning
3866 # detach from network
3867 dev.physical_id = (None,) * len(dev.physical_id)
3868 # and 'find' the device, which will 'fix' it to match the
3870 if rpc.call_blockdev_find(pri_node, dev):
3873 warning("Failed to detach drbd %s from network, unusual case" %
3877 # no detaches succeeded (very unlikely)
3878 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3880 # if we managed to detach at least one, we update all the disks of
3881 # the instance to point to the new secondary
3882 info("updating instance configuration")
3883 for dev in instance.disks:
3884 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3885 cfg.SetDiskID(dev, pri_node)
3886 cfg.Update(instance)
3888 # and now perform the drbd attach
3889 info("attaching primary drbds to new secondary (standalone => connected)")
3891 for dev in instance.disks:
3892 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3893 # since the attach is smart, it's enough to 'find' the device,
3894 # it will automatically activate the network, if the physical_id
3896 cfg.SetDiskID(dev, pri_node)
3897 if not rpc.call_blockdev_find(pri_node, dev):
3898 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3899 "please do a gnt-instance info to see the status of disks")
3901 # this can fail as the old devices are degraded and _WaitForSync
3902 # does a combined result over all disks, so we don't check its
3904 self.proc.LogStep(5, steps_total, "sync devices")
3905 _WaitForSync(cfg, instance, self.proc, unlock=True)
3907 # so check manually all the devices
3908 for name, (dev, old_lvs) in iv_names.iteritems():
3909 cfg.SetDiskID(dev, pri_node)
3910 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3912 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3914 self.proc.LogStep(6, steps_total, "removing old storage")
3915 for name, (dev, old_lvs) in iv_names.iteritems():
3916 info("remove logical volumes for %s" % name)
3918 cfg.SetDiskID(lv, old_node)
3919 if not rpc.call_blockdev_remove(old_node, lv):
3920 warning("Can't remove LV on old secondary",
3921 hint="Cleanup stale volumes by hand")
3923 def Exec(self, feedback_fn):
3924 """Execute disk replacement.
3926 This dispatches the disk replacement to the appropriate handler.
3929 instance = self.instance
3930 if instance.disk_template == constants.DT_REMOTE_RAID1:
3932 elif instance.disk_template == constants.DT_DRBD8:
3933 if self.op.remote_node is None:
3934 fn = self._ExecD8DiskOnly
3936 fn = self._ExecD8Secondary
3938 raise errors.ProgrammerError("Unhandled disk replacement case")
3939 return fn(feedback_fn)
3942 class LUQueryInstanceData(NoHooksLU):
3943 """Query runtime instance data.
3946 _OP_REQP = ["instances"]
3948 def CheckPrereq(self):
3949 """Check prerequisites.
3951 This only checks the optional instance list against the existing names.
3954 if not isinstance(self.op.instances, list):
3955 raise errors.OpPrereqError("Invalid argument type 'instances'")
3956 if self.op.instances:
3957 self.wanted_instances = []
3958 names = self.op.instances
3960 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3961 if instance is None:
3962 raise errors.OpPrereqError("No such instance name '%s'" % name)
3963 self.wanted_instances.append(instance)
3965 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3966 in self.cfg.GetInstanceList()]
3970 def _ComputeDiskStatus(self, instance, snode, dev):
3971 """Compute block device status.
3974 self.cfg.SetDiskID(dev, instance.primary_node)
3975 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3976 if dev.dev_type in constants.LDS_DRBD:
3977 # we change the snode then (otherwise we use the one passed in)
3978 if dev.logical_id[0] == instance.primary_node:
3979 snode = dev.logical_id[1]
3981 snode = dev.logical_id[0]
3984 self.cfg.SetDiskID(dev, snode)
3985 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3990 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3991 for child in dev.children]
3996 "iv_name": dev.iv_name,
3997 "dev_type": dev.dev_type,
3998 "logical_id": dev.logical_id,
3999 "physical_id": dev.physical_id,
4000 "pstatus": dev_pstatus,
4001 "sstatus": dev_sstatus,
4002 "children": dev_children,
4007 def Exec(self, feedback_fn):
4008 """Gather and return data"""
4010 for instance in self.wanted_instances:
4011 remote_info = rpc.call_instance_info(instance.primary_node,
4013 if remote_info and "state" in remote_info:
4016 remote_state = "down"
4017 if instance.status == "down":
4018 config_state = "down"
4022 disks = [self._ComputeDiskStatus(instance, None, device)
4023 for device in instance.disks]
4026 "name": instance.name,
4027 "config_state": config_state,
4028 "run_state": remote_state,
4029 "pnode": instance.primary_node,
4030 "snodes": instance.secondary_nodes,
4032 "memory": instance.memory,
4033 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4035 "network_port": instance.network_port,
4036 "vcpus": instance.vcpus,
4039 result[instance.name] = idict
4044 class LUSetInstanceParms(LogicalUnit):
4045 """Modifies an instances's parameters.
4048 HPATH = "instance-modify"
4049 HTYPE = constants.HTYPE_INSTANCE
4050 _OP_REQP = ["instance_name"]
4052 def BuildHooksEnv(self):
4055 This runs on the master, primary and secondaries.
4060 args['memory'] = self.mem
4062 args['vcpus'] = self.vcpus
4063 if self.do_ip or self.do_bridge:
4067 ip = self.instance.nics[0].ip
4069 bridge = self.bridge
4071 bridge = self.instance.nics[0].bridge
4072 args['nics'] = [(ip, bridge)]
4073 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4074 nl = [self.sstore.GetMasterNode(),
4075 self.instance.primary_node] + list(self.instance.secondary_nodes)
4078 def CheckPrereq(self):
4079 """Check prerequisites.
4081 This only checks the instance list against the existing names.
4084 self.mem = getattr(self.op, "mem", None)
4085 self.vcpus = getattr(self.op, "vcpus", None)
4086 self.ip = getattr(self.op, "ip", None)
4087 self.mac = getattr(self.op, "mac", None)
4088 self.bridge = getattr(self.op, "bridge", None)
4089 if [self.mem, self.vcpus, self.ip, self.bridge, self.mac].count(None) == 5:
4090 raise errors.OpPrereqError("No changes submitted")
4091 if self.mem is not None:
4093 self.mem = int(self.mem)
4094 except ValueError, err:
4095 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4096 if self.vcpus is not None:
4098 self.vcpus = int(self.vcpus)
4099 except ValueError, err:
4100 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4101 if self.ip is not None:
4103 if self.ip.lower() == "none":
4106 if not utils.IsValidIP(self.ip):
4107 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4110 self.do_bridge = (self.bridge is not None)
4111 if self.mac is not None:
4112 if self.cfg.IsMacInUse(self.mac):
4113 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4115 if not utils.IsValidMac(self.mac):
4116 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4118 instance = self.cfg.GetInstanceInfo(
4119 self.cfg.ExpandInstanceName(self.op.instance_name))
4120 if instance is None:
4121 raise errors.OpPrereqError("No such instance name '%s'" %
4122 self.op.instance_name)
4123 self.op.instance_name = instance.name
4124 self.instance = instance
4127 def Exec(self, feedback_fn):
4128 """Modifies an instance.
4130 All parameters take effect only at the next restart of the instance.
4133 instance = self.instance
4135 instance.memory = self.mem
4136 result.append(("mem", self.mem))
4138 instance.vcpus = self.vcpus
4139 result.append(("vcpus", self.vcpus))
4141 instance.nics[0].ip = self.ip
4142 result.append(("ip", self.ip))
4144 instance.nics[0].bridge = self.bridge
4145 result.append(("bridge", self.bridge))
4147 instance.nics[0].mac = self.mac
4148 result.append(("mac", self.mac))
4150 self.cfg.AddInstance(instance)
4155 class LUQueryExports(NoHooksLU):
4156 """Query the exports list
4161 def CheckPrereq(self):
4162 """Check that the nodelist contains only existing nodes.
4165 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4167 def Exec(self, feedback_fn):
4168 """Compute the list of all the exported system images.
4171 a dictionary with the structure node->(export-list)
4172 where export-list is a list of the instances exported on
4176 return rpc.call_export_list(self.nodes)
4179 class LUExportInstance(LogicalUnit):
4180 """Export an instance to an image in the cluster.
4183 HPATH = "instance-export"
4184 HTYPE = constants.HTYPE_INSTANCE
4185 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4187 def BuildHooksEnv(self):
4190 This will run on the master, primary node and target node.
4194 "EXPORT_NODE": self.op.target_node,
4195 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4197 env.update(_BuildInstanceHookEnvByObject(self.instance))
4198 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4199 self.op.target_node]
4202 def CheckPrereq(self):
4203 """Check prerequisites.
4205 This checks that the instance name is a valid one.
4208 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4209 self.instance = self.cfg.GetInstanceInfo(instance_name)
4210 if self.instance is None:
4211 raise errors.OpPrereqError("Instance '%s' not found" %
4212 self.op.instance_name)
4215 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4216 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4218 if self.dst_node is None:
4219 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4220 self.op.target_node)
4221 self.op.target_node = self.dst_node.name
4223 def Exec(self, feedback_fn):
4224 """Export an instance to an image in the cluster.
4227 instance = self.instance
4228 dst_node = self.dst_node
4229 src_node = instance.primary_node
4230 # shutdown the instance, unless requested not to do so
4231 if self.op.shutdown:
4232 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4233 self.proc.ChainOpCode(op)
4235 vgname = self.cfg.GetVGName()
4240 for disk in instance.disks:
4241 if disk.iv_name == "sda":
4242 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4243 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4245 if not new_dev_name:
4246 logger.Error("could not snapshot block device %s on node %s" %
4247 (disk.logical_id[1], src_node))
4249 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4250 logical_id=(vgname, new_dev_name),
4251 physical_id=(vgname, new_dev_name),
4252 iv_name=disk.iv_name)
4253 snap_disks.append(new_dev)
4256 if self.op.shutdown:
4257 op = opcodes.OpStartupInstance(instance_name=instance.name,
4259 self.proc.ChainOpCode(op)
4261 # TODO: check for size
4263 for dev in snap_disks:
4264 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4266 logger.Error("could not export block device %s from node"
4268 (dev.logical_id[1], src_node, dst_node.name))
4269 if not rpc.call_blockdev_remove(src_node, dev):
4270 logger.Error("could not remove snapshot block device %s from"
4271 " node %s" % (dev.logical_id[1], src_node))
4273 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4274 logger.Error("could not finalize export for instance %s on node %s" %
4275 (instance.name, dst_node.name))
4277 nodelist = self.cfg.GetNodeList()
4278 nodelist.remove(dst_node.name)
4280 # on one-node clusters nodelist will be empty after the removal
4281 # if we proceed the backup would be removed because OpQueryExports
4282 # substitutes an empty list with the full cluster node list.
4284 op = opcodes.OpQueryExports(nodes=nodelist)
4285 exportlist = self.proc.ChainOpCode(op)
4286 for node in exportlist:
4287 if instance.name in exportlist[node]:
4288 if not rpc.call_export_remove(node, instance.name):
4289 logger.Error("could not remove older export for instance %s"
4290 " on node %s" % (instance.name, node))
4293 class TagsLU(NoHooksLU):
4296 This is an abstract class which is the parent of all the other tags LUs.
4299 def CheckPrereq(self):
4300 """Check prerequisites.
4303 if self.op.kind == constants.TAG_CLUSTER:
4304 self.target = self.cfg.GetClusterInfo()
4305 elif self.op.kind == constants.TAG_NODE:
4306 name = self.cfg.ExpandNodeName(self.op.name)
4308 raise errors.OpPrereqError("Invalid node name (%s)" %
4311 self.target = self.cfg.GetNodeInfo(name)
4312 elif self.op.kind == constants.TAG_INSTANCE:
4313 name = self.cfg.ExpandInstanceName(self.op.name)
4315 raise errors.OpPrereqError("Invalid instance name (%s)" %
4318 self.target = self.cfg.GetInstanceInfo(name)
4320 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4324 class LUGetTags(TagsLU):
4325 """Returns the tags of a given object.
4328 _OP_REQP = ["kind", "name"]
4330 def Exec(self, feedback_fn):
4331 """Returns the tag list.
4334 return self.target.GetTags()
4337 class LUSearchTags(NoHooksLU):
4338 """Searches the tags for a given pattern.
4341 _OP_REQP = ["pattern"]
4343 def CheckPrereq(self):
4344 """Check prerequisites.
4346 This checks the pattern passed for validity by compiling it.
4350 self.re = re.compile(self.op.pattern)
4351 except re.error, err:
4352 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4353 (self.op.pattern, err))
4355 def Exec(self, feedback_fn):
4356 """Returns the tag list.
4360 tgts = [("/cluster", cfg.GetClusterInfo())]
4361 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4362 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4363 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4364 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4366 for path, target in tgts:
4367 for tag in target.GetTags():
4368 if self.re.search(tag):
4369 results.append((path, tag))
4373 class LUAddTags(TagsLU):
4374 """Sets a tag on a given object.
4377 _OP_REQP = ["kind", "name", "tags"]
4379 def CheckPrereq(self):
4380 """Check prerequisites.
4382 This checks the type and length of the tag name and value.
4385 TagsLU.CheckPrereq(self)
4386 for tag in self.op.tags:
4387 objects.TaggableObject.ValidateTag(tag)
4389 def Exec(self, feedback_fn):
4394 for tag in self.op.tags:
4395 self.target.AddTag(tag)
4396 except errors.TagError, err:
4397 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4399 self.cfg.Update(self.target)
4400 except errors.ConfigurationError:
4401 raise errors.OpRetryError("There has been a modification to the"
4402 " config file and the operation has been"
4403 " aborted. Please retry.")
4406 class LUDelTags(TagsLU):
4407 """Delete a list of tags from a given object.
4410 _OP_REQP = ["kind", "name", "tags"]
4412 def CheckPrereq(self):
4413 """Check prerequisites.
4415 This checks that we have the given tag.
4418 TagsLU.CheckPrereq(self)
4419 for tag in self.op.tags:
4420 objects.TaggableObject.ValidateTag(tag)
4421 del_tags = frozenset(self.op.tags)
4422 cur_tags = self.target.GetTags()
4423 if not del_tags <= cur_tags:
4424 diff_tags = del_tags - cur_tags
4425 diff_names = ["'%s'" % tag for tag in diff_tags]
4427 raise errors.OpPrereqError("Tag(s) %s not found" %
4428 (",".join(diff_names)))
4430 def Exec(self, feedback_fn):
4431 """Remove the tag from the object.
4434 for tag in self.op.tags:
4435 self.target.RemoveTag(tag)
4437 self.cfg.Update(self.target)
4438 except errors.ConfigurationError:
4439 raise errors.OpRetryError("There has been a modification to the"
4440 " config file and the operation has been"
4441 " aborted. Please retry.")