4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
278 env["INSTANCE_NIC_COUNT"] = nic_count
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284 """Builds instance related env variables for hooks from an object.
287 instance: objects.Instance object of instance
288 override: dict of values to override
291 'name': instance.name,
292 'primary_node': instance.primary_node,
293 'secondary_nodes': instance.secondary_nodes,
294 'os_type': instance.os,
295 'status': instance.os,
296 'memory': instance.memory,
297 'vcpus': instance.vcpus,
298 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
301 args.update(override)
302 return _BuildInstanceHookEnv(**args)
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306 """Ensure a node has a correct known_hosts entry.
309 fullnode - Fully qualified domain name of host. (str)
310 ip - IPv4 address of host (str)
311 pubkey - the public key of the cluster
314 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
326 logger.Debug('read %s' % (repr(rawline),))
328 parts = rawline.rstrip('\r\n').split()
330 # Ignore unwanted lines
331 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332 fields = parts[0].split(',')
337 for spec in [ ip, fullnode ]:
338 if spec not in fields:
343 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344 if haveall and key == pubkey:
346 save_lines.append(rawline)
347 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350 if havesome and (not haveall or key != pubkey):
352 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355 save_lines.append(rawline)
358 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362 save_lines = save_lines + add_lines
364 # Write a new file and replace old.
365 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367 newfile = os.fdopen(fd, 'w')
369 newfile.write(''.join(save_lines))
372 logger.Debug("Wrote new known_hosts.")
373 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376 # Simply appending a new line will do the trick.
378 for add in add_lines:
384 def _HasValidVG(vglist, vgname):
385 """Checks if the volume group list is valid.
387 A non-None return value means there's an error, and the return value
388 is the error message.
391 vgsize = vglist.get(vgname, None)
393 return "volume group '%s' missing" % vgname
395 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
400 def _InitSSHSetup(node):
401 """Setup the SSH configuration for the cluster.
404 This generates a dsa keypair for root, adds the pub key to the
405 permitted hosts and adds the hostkey to its own known hosts.
408 node: the name of this host as a fqdn
411 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413 for name in priv_key, pub_key:
414 if os.path.exists(name):
415 utils.CreateBackup(name)
416 utils.RemoveFile(name)
418 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425 f = open(pub_key, 'r')
427 utils.AddAuthorizedKey(auth_keys, f.read(8192))
432 def _InitGanetiServerSetup(ss):
433 """Setup the necessary configuration for the initial node daemon.
435 This creates the nodepass file containing the shared password for
436 the cluster and also generates the SSL certificate.
439 # Create pseudo random password
440 randpass = sha.new(os.urandom(64)).hexdigest()
441 # and write it into sstore
442 ss.SetKey(ss.SS_NODED_PASS, randpass)
444 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445 "-days", str(365*5), "-nodes", "-x509",
446 "-keyout", constants.SSL_CERT_FILE,
447 "-out", constants.SSL_CERT_FILE, "-batch"])
449 raise errors.OpExecError("could not generate server ssl cert, command"
450 " %s had exitcode %s and error message %s" %
451 (result.cmd, result.exit_code, result.output))
453 os.chmod(constants.SSL_CERT_FILE, 0400)
455 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458 raise errors.OpExecError("Could not start the node daemon, command %s"
459 " had exitcode %s and error %s" %
460 (result.cmd, result.exit_code, result.output))
463 def _CheckInstanceBridgesExist(instance):
464 """Check that the brigdes needed by an instance exist.
467 # check bridges existance
468 brlist = [nic.bridge for nic in instance.nics]
469 if not rpc.call_bridges_exist(instance.primary_node, brlist):
470 raise errors.OpPrereqError("one or more target bridges %s does not"
471 " exist on destination node '%s'" %
472 (brlist, instance.primary_node))
475 class LUInitCluster(LogicalUnit):
476 """Initialise the cluster.
479 HPATH = "cluster-init"
480 HTYPE = constants.HTYPE_CLUSTER
481 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482 "def_bridge", "master_netdev"]
485 def BuildHooksEnv(self):
488 Notes: Since we don't require a cluster, we must manually add
489 ourselves in the post-run node list.
492 env = {"OP_TARGET": self.op.cluster_name}
493 return env, [], [self.hostname.name]
495 def CheckPrereq(self):
496 """Verify that the passed name is a valid one.
499 if config.ConfigWriter.IsCluster():
500 raise errors.OpPrereqError("Cluster is already initialised")
502 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503 if not os.path.exists(constants.VNC_PASSWORD_FILE):
504 raise errors.OpPrereqError("Please prepare the cluster VNC"
506 constants.VNC_PASSWORD_FILE)
508 self.hostname = hostname = utils.HostInfo()
510 if hostname.ip.startswith("127."):
511 raise errors.OpPrereqError("This host's IP resolves to the private"
512 " range (%s). Please fix DNS or /etc/hosts." %
515 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518 constants.DEFAULT_NODED_PORT):
519 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520 " to %s,\nbut this ip address does not"
521 " belong to this host."
522 " Aborting." % hostname.ip)
524 secondary_ip = getattr(self.op, "secondary_ip", None)
525 if secondary_ip and not utils.IsValidIP(secondary_ip):
526 raise errors.OpPrereqError("Invalid secondary ip given")
528 secondary_ip != hostname.ip and
529 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530 constants.DEFAULT_NODED_PORT))):
531 raise errors.OpPrereqError("You gave %s as secondary IP,"
532 " but it does not belong to this host." %
534 self.secondary_ip = secondary_ip
536 # checks presence of the volume group given
537 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540 raise errors.OpPrereqError("Error: %s" % vgstatus)
542 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547 if self.op.hypervisor_type not in constants.HYPER_TYPES:
548 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549 self.op.hypervisor_type)
551 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554 (self.op.master_netdev,
555 result.output.strip()))
557 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559 raise errors.OpPrereqError("Init.d script '%s' missing or not"
560 " executable." % constants.NODE_INITD_SCRIPT)
562 def Exec(self, feedback_fn):
563 """Initialize the cluster.
566 clustername = self.clustername
567 hostname = self.hostname
569 # set up the simple store
570 self.sstore = ss = ssconf.SimpleStore()
571 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577 # set up the inter-node password and certificate
578 _InitGanetiServerSetup(ss)
580 # start the master ip
581 rpc.call_node_start_master(hostname.name)
583 # set up ssh config and /etc/hosts
584 f = open(constants.SSH_HOST_RSA_PUB, 'r')
589 sshkey = sshline.split(" ")[1]
591 _AddHostToEtcHosts(hostname.name)
593 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595 _InitSSHSetup(hostname.name)
597 # init of cluster config file
598 self.cfg = cfgw = config.ConfigWriter()
599 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600 sshkey, self.op.mac_prefix,
601 self.op.vg_name, self.op.def_bridge)
604 class LUDestroyCluster(NoHooksLU):
605 """Logical unit for destroying the cluster.
610 def CheckPrereq(self):
611 """Check prerequisites.
613 This checks whether the cluster is empty.
615 Any errors are signalled by raising errors.OpPrereqError.
618 master = self.sstore.GetMasterNode()
620 nodelist = self.cfg.GetNodeList()
621 if len(nodelist) != 1 or nodelist[0] != master:
622 raise errors.OpPrereqError("There are still %d node(s) in"
623 " this cluster." % (len(nodelist) - 1))
624 instancelist = self.cfg.GetInstanceList()
626 raise errors.OpPrereqError("There are still %d instance(s) in"
627 " this cluster." % len(instancelist))
629 def Exec(self, feedback_fn):
630 """Destroys the cluster.
633 master = self.sstore.GetMasterNode()
634 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635 utils.CreateBackup(priv_key)
636 utils.CreateBackup(pub_key)
637 rpc.call_node_leave_cluster(master)
640 class LUVerifyCluster(NoHooksLU):
641 """Verifies the cluster status.
646 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647 remote_version, feedback_fn):
648 """Run multiple tests against a node.
651 - compares ganeti version
652 - checks vg existance and size > 20G
653 - checks config file checksum
654 - checks ssh to other nodes
657 node: name of the node to check
658 file_list: required list of files
659 local_cksum: dictionary of local files and their checksums
662 # compares ganeti version
663 local_version = constants.PROTOCOL_VERSION
664 if not remote_version:
665 feedback_fn(" - ERROR: connection to %s failed" % (node))
668 if local_version != remote_version:
669 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
670 (local_version, node, remote_version))
673 # checks vg existance and size > 20G
677 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
681 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
686 # checks config file checksum
689 if 'filelist' not in node_result:
691 feedback_fn(" - ERROR: node hasn't returned file checksum data")
693 remote_cksum = node_result['filelist']
694 for file_name in file_list:
695 if file_name not in remote_cksum:
697 feedback_fn(" - ERROR: file '%s' missing" % file_name)
698 elif remote_cksum[file_name] != local_cksum[file_name]:
700 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
702 if 'nodelist' not in node_result:
704 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
706 if node_result['nodelist']:
708 for node in node_result['nodelist']:
709 feedback_fn(" - ERROR: communication with node '%s': %s" %
710 (node, node_result['nodelist'][node]))
711 hyp_result = node_result.get('hypervisor', None)
712 if hyp_result is not None:
713 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
716 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717 """Verify an instance.
719 This function checks to see if the required block devices are
720 available on the instance's node.
725 instancelist = self.cfg.GetInstanceList()
726 if not instance in instancelist:
727 feedback_fn(" - ERROR: instance %s not in instance list %s" %
728 (instance, instancelist))
731 instanceconfig = self.cfg.GetInstanceInfo(instance)
732 node_current = instanceconfig.primary_node
735 instanceconfig.MapLVsByNode(node_vol_should)
737 for node in node_vol_should:
738 for volume in node_vol_should[node]:
739 if node not in node_vol_is or volume not in node_vol_is[node]:
740 feedback_fn(" - ERROR: volume %s missing on node %s" %
744 if not instanceconfig.status == 'down':
745 if not instance in node_instance[node_current]:
746 feedback_fn(" - ERROR: instance %s not running on node %s" %
747 (instance, node_current))
750 for node in node_instance:
751 if (not node == node_current):
752 if instance in node_instance[node]:
753 feedback_fn(" - ERROR: instance %s should not run on node %s" %
759 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760 """Verify if there are any unknown volumes in the cluster.
762 The .os, .swap and backup volumes are ignored. All other volumes are
768 for node in node_vol_is:
769 for volume in node_vol_is[node]:
770 if node not in node_vol_should or volume not in node_vol_should[node]:
771 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
776 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777 """Verify the list of running instances.
779 This checks what instances are running but unknown to the cluster.
783 for node in node_instance:
784 for runninginstance in node_instance[node]:
785 if runninginstance not in instancelist:
786 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
787 (runninginstance, node))
791 def CheckPrereq(self):
792 """Check prerequisites.
794 This has no prerequisites.
799 def Exec(self, feedback_fn):
800 """Verify integrity of cluster, performing various test on nodes.
804 feedback_fn("* Verifying global settings")
805 for msg in self.cfg.VerifyConfig():
806 feedback_fn(" - ERROR: %s" % msg)
808 vg_name = self.cfg.GetVGName()
809 nodelist = utils.NiceSort(self.cfg.GetNodeList())
810 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
814 # FIXME: verify OS list
816 file_names = list(self.sstore.GetFileList())
817 file_names.append(constants.SSL_CERT_FILE)
818 file_names.append(constants.CLUSTER_CONF_FILE)
819 local_checksums = utils.FingerprintFiles(file_names)
821 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823 all_instanceinfo = rpc.call_instance_list(nodelist)
824 all_vglist = rpc.call_vg_list(nodelist)
825 node_verify_param = {
826 'filelist': file_names,
827 'nodelist': nodelist,
830 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831 all_rversion = rpc.call_version(nodelist)
833 for node in nodelist:
834 feedback_fn("* Verifying node %s" % node)
835 result = self._VerifyNode(node, file_names, local_checksums,
836 all_vglist[node], all_nvinfo[node],
837 all_rversion[node], feedback_fn)
841 volumeinfo = all_volumeinfo[node]
843 if isinstance(volumeinfo, basestring):
844 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
845 (node, volumeinfo[-400:].encode('string_escape')))
847 node_volume[node] = {}
848 elif not isinstance(volumeinfo, dict):
849 feedback_fn(" - ERROR: connection to %s failed" % (node,))
853 node_volume[node] = volumeinfo
856 nodeinstance = all_instanceinfo[node]
857 if type(nodeinstance) != list:
858 feedback_fn(" - ERROR: connection to %s failed" % (node,))
862 node_instance[node] = nodeinstance
866 for instance in instancelist:
867 feedback_fn("* Verifying instance %s" % instance)
868 result = self._VerifyInstance(instance, node_volume, node_instance,
872 inst_config = self.cfg.GetInstanceInfo(instance)
874 inst_config.MapLVsByNode(node_vol_should)
876 feedback_fn("* Verifying orphan volumes")
877 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
881 feedback_fn("* Verifying remaining instances")
882 result = self._VerifyOrphanInstances(instancelist, node_instance,
889 class LUVerifyDisks(NoHooksLU):
890 """Verifies the cluster disks status.
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This has no prerequisites.
903 def Exec(self, feedback_fn):
904 """Verify integrity of cluster disks.
907 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909 vg_name = self.cfg.GetVGName()
910 nodes = utils.NiceSort(self.cfg.GetNodeList())
911 instances = [self.cfg.GetInstanceInfo(name)
912 for name in self.cfg.GetInstanceList()]
915 for inst in instances:
917 if (inst.status != "up" or
918 inst.disk_template not in constants.DTS_NET_MIRROR):
920 inst.MapLVsByNode(inst_lvs)
921 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922 for node, vol_list in inst_lvs.iteritems():
924 nv_dict[(node, vol)] = inst
929 node_lvs = rpc.call_volume_list(nodes, vg_name)
936 if isinstance(lvs, basestring):
937 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939 elif not isinstance(lvs, dict):
940 logger.Info("connection to node %s failed or invalid data returned" %
942 res_nodes.append(node)
945 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946 inst = nv_dict.pop((node, lv_name), None)
947 if (not lv_online and inst is not None
948 and inst.name not in res_instances):
949 res_instances.append(inst.name)
951 # any leftover items in nv_dict are missing LVs, let's arrange the
953 for key, inst in nv_dict.iteritems():
954 if inst.name not in res_missing:
955 res_missing[inst.name] = []
956 res_missing[inst.name].append(key)
961 class LURenameCluster(LogicalUnit):
962 """Rename the cluster.
965 HPATH = "cluster-rename"
966 HTYPE = constants.HTYPE_CLUSTER
969 def BuildHooksEnv(self):
974 "OP_TARGET": self.op.sstore.GetClusterName(),
975 "NEW_NAME": self.op.name,
977 mn = self.sstore.GetMasterNode()
978 return env, [mn], [mn]
980 def CheckPrereq(self):
981 """Verify that the passed name is a valid one.
984 hostname = utils.HostInfo(self.op.name)
986 new_name = hostname.name
987 self.ip = new_ip = hostname.ip
988 old_name = self.sstore.GetClusterName()
989 old_ip = self.sstore.GetMasterIP()
990 if new_name == old_name and new_ip == old_ip:
991 raise errors.OpPrereqError("Neither the name nor the IP address of the"
992 " cluster has changed")
994 result = utils.RunCmd(["fping", "-q", new_ip])
995 if not result.failed:
996 raise errors.OpPrereqError("The given cluster IP address (%s) is"
997 " reachable on the network. Aborting." %
1000 self.op.name = new_name
1002 def Exec(self, feedback_fn):
1003 """Rename the cluster.
1006 clustername = self.op.name
1010 # shutdown the master IP
1011 master = ss.GetMasterNode()
1012 if not rpc.call_node_stop_master(master):
1013 raise errors.OpExecError("Could not disable the master role")
1017 ss.SetKey(ss.SS_MASTER_IP, ip)
1018 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020 # Distribute updated ss config to all nodes
1021 myself = self.cfg.GetNodeInfo(master)
1022 dist_nodes = self.cfg.GetNodeList()
1023 if myself.name in dist_nodes:
1024 dist_nodes.remove(myself.name)
1026 logger.Debug("Copying updated ssconf data to all nodes")
1027 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028 fname = ss.KeyToFilename(keyname)
1029 result = rpc.call_upload_file(dist_nodes, fname)
1030 for to_node in dist_nodes:
1031 if not result[to_node]:
1032 logger.Error("copy of file %s to node %s failed" %
1035 if not rpc.call_node_start_master(master):
1036 logger.Error("Could not re-enable the master role on the master,"
1037 " please restart manually.")
1040 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041 """Sleep and poll for an instance's disk to sync.
1044 if not instance.disks:
1048 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050 node = instance.primary_node
1052 for dev in instance.disks:
1053 cfgw.SetDiskID(dev, node)
1059 cumul_degraded = False
1060 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062 proc.LogWarning("Can't get any data from node %s" % node)
1065 raise errors.RemoteError("Can't contact node %s for mirror data,"
1066 " aborting." % node)
1070 for i in range(len(rstats)):
1073 proc.LogWarning("Can't compute data for node %s/%s" %
1074 (node, instance.disks[i].iv_name))
1076 # we ignore the ldisk parameter
1077 perc_done, est_time, is_degraded, _ = mstat
1078 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079 if perc_done is not None:
1081 if est_time is not None:
1082 rem_time = "%d estimated seconds remaining" % est_time
1085 rem_time = "no time estimate"
1086 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087 (instance.disks[i].iv_name, perc_done, rem_time))
1094 time.sleep(min(60, max_time))
1100 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101 return not cumul_degraded
1104 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105 """Check that mirrors are not degraded.
1107 The ldisk parameter, if True, will change the test from the
1108 is_degraded attribute (which represents overall non-ok status for
1109 the device(s)) to the ldisk (representing the local storage status).
1112 cfgw.SetDiskID(dev, node)
1119 if on_primary or dev.AssembleOnSecondary():
1120 rstats = rpc.call_blockdev_find(node, dev)
1122 logger.ToStderr("Can't get any data from node %s" % node)
1125 result = result and (not rstats[idx])
1127 for child in dev.children:
1128 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133 class LUDiagnoseOS(NoHooksLU):
1134 """Logical unit for OS diagnose/query.
1139 def CheckPrereq(self):
1140 """Check prerequisites.
1142 This always succeeds, since this is a pure query LU.
1147 def Exec(self, feedback_fn):
1148 """Compute the list of OSes.
1151 node_list = self.cfg.GetNodeList()
1152 node_data = rpc.call_os_diagnose(node_list)
1153 if node_data == False:
1154 raise errors.OpExecError("Can't gather the list of OSes")
1158 class LURemoveNode(LogicalUnit):
1159 """Logical unit for removing a node.
1162 HPATH = "node-remove"
1163 HTYPE = constants.HTYPE_NODE
1164 _OP_REQP = ["node_name"]
1166 def BuildHooksEnv(self):
1169 This doesn't run on the target node in the pre phase as a failed
1170 node would not allows itself to run.
1174 "OP_TARGET": self.op.node_name,
1175 "NODE_NAME": self.op.node_name,
1177 all_nodes = self.cfg.GetNodeList()
1178 all_nodes.remove(self.op.node_name)
1179 return env, all_nodes, all_nodes
1181 def CheckPrereq(self):
1182 """Check prerequisites.
1185 - the node exists in the configuration
1186 - it does not have primary or secondary instances
1187 - it's not the master
1189 Any errors are signalled by raising errors.OpPrereqError.
1192 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196 instance_list = self.cfg.GetInstanceList()
1198 masternode = self.sstore.GetMasterNode()
1199 if node.name == masternode:
1200 raise errors.OpPrereqError("Node is the master node,"
1201 " you need to failover first.")
1203 for instance_name in instance_list:
1204 instance = self.cfg.GetInstanceInfo(instance_name)
1205 if node.name == instance.primary_node:
1206 raise errors.OpPrereqError("Instance %s still running on the node,"
1207 " please remove first." % instance_name)
1208 if node.name in instance.secondary_nodes:
1209 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210 " please remove first." % instance_name)
1211 self.op.node_name = node.name
1214 def Exec(self, feedback_fn):
1215 """Removes the node from the cluster.
1219 logger.Info("stopping the node daemon and removing configs from node %s" %
1222 rpc.call_node_leave_cluster(node.name)
1224 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226 logger.Info("Removing node %s from config" % node.name)
1228 self.cfg.RemoveNode(node.name)
1230 _RemoveHostFromEtcHosts(node.name)
1233 class LUQueryNodes(NoHooksLU):
1234 """Logical unit for querying nodes.
1237 _OP_REQP = ["output_fields", "names"]
1239 def CheckPrereq(self):
1240 """Check prerequisites.
1242 This checks that the fields required are valid output fields.
1245 self.dynamic_fields = frozenset(["dtotal", "dfree",
1246 "mtotal", "mnode", "mfree",
1249 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250 "pinst_list", "sinst_list",
1252 dynamic=self.dynamic_fields,
1253 selected=self.op.output_fields)
1255 self.wanted = _GetWantedNodes(self, self.op.names)
1257 def Exec(self, feedback_fn):
1258 """Computes the list of nodes and their attributes.
1261 nodenames = self.wanted
1262 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264 # begin data gathering
1266 if self.dynamic_fields.intersection(self.op.output_fields):
1268 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269 for name in nodenames:
1270 nodeinfo = node_data.get(name, None)
1273 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278 "bootid": nodeinfo['bootid'],
1281 live_data[name] = {}
1283 live_data = dict.fromkeys(nodenames, {})
1285 node_to_primary = dict([(name, set()) for name in nodenames])
1286 node_to_secondary = dict([(name, set()) for name in nodenames])
1288 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289 "sinst_cnt", "sinst_list"))
1290 if inst_fields & frozenset(self.op.output_fields):
1291 instancelist = self.cfg.GetInstanceList()
1293 for instance_name in instancelist:
1294 inst = self.cfg.GetInstanceInfo(instance_name)
1295 if inst.primary_node in node_to_primary:
1296 node_to_primary[inst.primary_node].add(inst.name)
1297 for secnode in inst.secondary_nodes:
1298 if secnode in node_to_secondary:
1299 node_to_secondary[secnode].add(inst.name)
1301 # end data gathering
1304 for node in nodelist:
1306 for field in self.op.output_fields:
1309 elif field == "pinst_list":
1310 val = list(node_to_primary[node.name])
1311 elif field == "sinst_list":
1312 val = list(node_to_secondary[node.name])
1313 elif field == "pinst_cnt":
1314 val = len(node_to_primary[node.name])
1315 elif field == "sinst_cnt":
1316 val = len(node_to_secondary[node.name])
1317 elif field == "pip":
1318 val = node.primary_ip
1319 elif field == "sip":
1320 val = node.secondary_ip
1321 elif field in self.dynamic_fields:
1322 val = live_data[node.name].get(field, None)
1324 raise errors.ParameterError(field)
1325 node_output.append(val)
1326 output.append(node_output)
1331 class LUQueryNodeVolumes(NoHooksLU):
1332 """Logical unit for getting volumes on node(s).
1335 _OP_REQP = ["nodes", "output_fields"]
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1340 This checks that the fields required are valid output fields.
1343 self.nodes = _GetWantedNodes(self, self.op.nodes)
1345 _CheckOutputFields(static=["node"],
1346 dynamic=["phys", "vg", "name", "size", "instance"],
1347 selected=self.op.output_fields)
1350 def Exec(self, feedback_fn):
1351 """Computes the list of nodes and their attributes.
1354 nodenames = self.nodes
1355 volumes = rpc.call_node_volumes(nodenames)
1357 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358 in self.cfg.GetInstanceList()]
1360 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1363 for node in nodenames:
1364 if node not in volumes or not volumes[node]:
1367 node_vols = volumes[node][:]
1368 node_vols.sort(key=lambda vol: vol['dev'])
1370 for vol in node_vols:
1372 for field in self.op.output_fields:
1375 elif field == "phys":
1379 elif field == "name":
1381 elif field == "size":
1382 val = int(float(vol['size']))
1383 elif field == "instance":
1385 if node not in lv_by_node[inst]:
1387 if vol['name'] in lv_by_node[inst][node]:
1393 raise errors.ParameterError(field)
1394 node_output.append(str(val))
1396 output.append(node_output)
1401 class LUAddNode(LogicalUnit):
1402 """Logical unit for adding node to the cluster.
1406 HTYPE = constants.HTYPE_NODE
1407 _OP_REQP = ["node_name"]
1409 def BuildHooksEnv(self):
1412 This will run on all nodes before, and on all nodes + the new node after.
1416 "OP_TARGET": self.op.node_name,
1417 "NODE_NAME": self.op.node_name,
1418 "NODE_PIP": self.op.primary_ip,
1419 "NODE_SIP": self.op.secondary_ip,
1421 nodes_0 = self.cfg.GetNodeList()
1422 nodes_1 = nodes_0 + [self.op.node_name, ]
1423 return env, nodes_0, nodes_1
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the new node is not already in the config
1431 - its parameters (single/dual homed) matches the cluster
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node_name = self.op.node_name
1439 dns_data = utils.HostInfo(node_name)
1441 node = dns_data.name
1442 primary_ip = self.op.primary_ip = dns_data.ip
1443 secondary_ip = getattr(self.op, "secondary_ip", None)
1444 if secondary_ip is None:
1445 secondary_ip = primary_ip
1446 if not utils.IsValidIP(secondary_ip):
1447 raise errors.OpPrereqError("Invalid secondary IP given")
1448 self.op.secondary_ip = secondary_ip
1449 node_list = cfg.GetNodeList()
1450 if node in node_list:
1451 raise errors.OpPrereqError("Node %s is already in the configuration"
1454 for existing_node_name in node_list:
1455 existing_node = cfg.GetNodeInfo(existing_node_name)
1456 if (existing_node.primary_ip == primary_ip or
1457 existing_node.secondary_ip == primary_ip or
1458 existing_node.primary_ip == secondary_ip or
1459 existing_node.secondary_ip == secondary_ip):
1460 raise errors.OpPrereqError("New node ip address(es) conflict with"
1461 " existing node %s" % existing_node.name)
1463 # check that the type of the node (single versus dual homed) is the
1464 # same as for the master
1465 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466 master_singlehomed = myself.secondary_ip == myself.primary_ip
1467 newbie_singlehomed = secondary_ip == primary_ip
1468 if master_singlehomed != newbie_singlehomed:
1469 if master_singlehomed:
1470 raise errors.OpPrereqError("The master has no private ip but the"
1471 " new node has one")
1473 raise errors.OpPrereqError("The master has a private ip but the"
1474 " new node doesn't have one")
1476 # checks reachablity
1477 if not utils.TcpPing(utils.HostInfo().name,
1479 constants.DEFAULT_NODED_PORT):
1480 raise errors.OpPrereqError("Node not reachable by ping")
1482 if not newbie_singlehomed:
1483 # check reachability from my secondary ip to newbie's secondary ip
1484 if not utils.TcpPing(myself.secondary_ip,
1486 constants.DEFAULT_NODED_PORT):
1487 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488 " based ping to noded port")
1490 self.new_node = objects.Node(name=node,
1491 primary_ip=primary_ip,
1492 secondary_ip=secondary_ip)
1494 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497 constants.VNC_PASSWORD_FILE)
1499 def Exec(self, feedback_fn):
1500 """Adds the new node to the cluster.
1503 new_node = self.new_node
1504 node = new_node.name
1506 # set up inter-node password and certificate and restarts the node daemon
1507 gntpass = self.sstore.GetNodeDaemonPassword()
1508 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509 raise errors.OpExecError("ganeti password corruption detected")
1510 f = open(constants.SSL_CERT_FILE)
1512 gntpem = f.read(8192)
1515 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516 # so we use this to detect an invalid certificate; as long as the
1517 # cert doesn't contain this, the here-document will be correctly
1518 # parsed by the shell sequence below
1519 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521 if not gntpem.endswith("\n"):
1522 raise errors.OpExecError("PEM must end with newline")
1523 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525 # and then connect with ssh to set password and start ganeti-noded
1526 # note that all the below variables are sanitized at this point,
1527 # either by being constants or by the checks above
1529 mycommand = ("umask 077 && "
1530 "echo '%s' > '%s' && "
1531 "cat > '%s' << '!EOF.' && \n"
1532 "%s!EOF.\n%s restart" %
1533 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534 constants.SSL_CERT_FILE, gntpem,
1535 constants.NODE_INITD_SCRIPT))
1537 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539 raise errors.OpExecError("Remote command on node %s, error: %s,"
1541 (node, result.fail_reason, result.output))
1543 # check connectivity
1546 result = rpc.call_version([node])[node]
1548 if constants.PROTOCOL_VERSION == result:
1549 logger.Info("communication to node %s fine, sw version %s match" %
1552 raise errors.OpExecError("Version mismatch master version %s,"
1553 " node version %s" %
1554 (constants.PROTOCOL_VERSION, result))
1556 raise errors.OpExecError("Cannot get version from the new node")
1559 logger.Info("copy ssh key to node %s" % node)
1560 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1569 keyarray.append(f.read())
1573 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574 keyarray[3], keyarray[4], keyarray[5])
1577 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579 # Add node to our /etc/hosts, and add key to known_hosts
1580 _AddHostToEtcHosts(new_node.name)
1582 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583 self.cfg.GetHostKey())
1585 if new_node.secondary_ip != new_node.primary_ip:
1586 if not rpc.call_node_tcp_ping(new_node.name,
1587 constants.LOCALHOST_IP_ADDRESS,
1588 new_node.secondary_ip,
1589 constants.DEFAULT_NODED_PORT,
1591 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592 " you gave (%s). Please fix and re-run this"
1593 " command." % new_node.secondary_ip)
1595 success, msg = ssh.VerifyNodeHostname(node)
1597 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598 " than the one the resolver gives: %s."
1599 " Please fix and re-run this command." %
1602 # Distribute updated /etc/hosts and known_hosts to all nodes,
1603 # including the node just added
1604 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605 dist_nodes = self.cfg.GetNodeList() + [node]
1606 if myself.name in dist_nodes:
1607 dist_nodes.remove(myself.name)
1609 logger.Debug("Copying hosts and known_hosts to all nodes")
1610 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611 result = rpc.call_upload_file(dist_nodes, fname)
1612 for to_node in dist_nodes:
1613 if not result[to_node]:
1614 logger.Error("copy of file %s to node %s failed" %
1617 to_copy = ss.GetFileList()
1618 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619 to_copy.append(constants.VNC_PASSWORD_FILE)
1620 for fname in to_copy:
1621 if not ssh.CopyFileToNode(node, fname):
1622 logger.Error("could not copy file %s to node %s" % (fname, node))
1624 logger.Info("adding node %s to cluster.conf" % node)
1625 self.cfg.AddNode(new_node)
1628 class LUMasterFailover(LogicalUnit):
1629 """Failover the master node to the current node.
1631 This is a special LU in that it must run on a non-master node.
1634 HPATH = "master-failover"
1635 HTYPE = constants.HTYPE_CLUSTER
1639 def BuildHooksEnv(self):
1642 This will run on the new master only in the pre phase, and on all
1643 the nodes in the post phase.
1647 "OP_TARGET": self.new_master,
1648 "NEW_MASTER": self.new_master,
1649 "OLD_MASTER": self.old_master,
1651 return env, [self.new_master], self.cfg.GetNodeList()
1653 def CheckPrereq(self):
1654 """Check prerequisites.
1656 This checks that we are not already the master.
1659 self.new_master = utils.HostInfo().name
1660 self.old_master = self.sstore.GetMasterNode()
1662 if self.old_master == self.new_master:
1663 raise errors.OpPrereqError("This commands must be run on the node"
1664 " where you want the new master to be."
1665 " %s is already the master" %
1668 def Exec(self, feedback_fn):
1669 """Failover the master node.
1671 This command, when run on a non-master node, will cause the current
1672 master to cease being master, and the non-master to become new
1676 #TODO: do not rely on gethostname returning the FQDN
1677 logger.Info("setting master to %s, old master: %s" %
1678 (self.new_master, self.old_master))
1680 if not rpc.call_node_stop_master(self.old_master):
1681 logger.Error("could disable the master role on the old master"
1682 " %s, please disable manually" % self.old_master)
1685 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688 logger.Error("could not distribute the new simple store master file"
1689 " to the other nodes, please check.")
1691 if not rpc.call_node_start_master(self.new_master):
1692 logger.Error("could not start the master role on the new master"
1693 " %s, please check" % self.new_master)
1694 feedback_fn("Error in activating the master IP on the new master,"
1695 " please fix manually.")
1699 class LUQueryClusterInfo(NoHooksLU):
1700 """Query cluster configuration.
1706 def CheckPrereq(self):
1707 """No prerequsites needed for this LU.
1712 def Exec(self, feedback_fn):
1713 """Return cluster config.
1717 "name": self.sstore.GetClusterName(),
1718 "software_version": constants.RELEASE_VERSION,
1719 "protocol_version": constants.PROTOCOL_VERSION,
1720 "config_version": constants.CONFIG_VERSION,
1721 "os_api_version": constants.OS_API_VERSION,
1722 "export_version": constants.EXPORT_VERSION,
1723 "master": self.sstore.GetMasterNode(),
1724 "architecture": (platform.architecture()[0], platform.machine()),
1730 class LUClusterCopyFile(NoHooksLU):
1731 """Copy file to cluster.
1734 _OP_REQP = ["nodes", "filename"]
1736 def CheckPrereq(self):
1737 """Check prerequisites.
1739 It should check that the named file exists and that the given list
1743 if not os.path.exists(self.op.filename):
1744 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746 self.nodes = _GetWantedNodes(self, self.op.nodes)
1748 def Exec(self, feedback_fn):
1749 """Copy a file from master to some nodes.
1752 opts - class with options as members
1753 args - list containing a single element, the file name
1755 nodes - list containing the name of target nodes; if empty, all nodes
1758 filename = self.op.filename
1760 myname = utils.HostInfo().name
1762 for node in self.nodes:
1765 if not ssh.CopyFileToNode(node, filename):
1766 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1769 class LUDumpClusterConfig(NoHooksLU):
1770 """Return a text-representation of the cluster-config.
1775 def CheckPrereq(self):
1776 """No prerequisites.
1781 def Exec(self, feedback_fn):
1782 """Dump a representation of the cluster config to the standard output.
1785 return self.cfg.DumpConfig()
1788 class LURunClusterCommand(NoHooksLU):
1789 """Run a command on some nodes.
1792 _OP_REQP = ["command", "nodes"]
1794 def CheckPrereq(self):
1795 """Check prerequisites.
1797 It checks that the given list of nodes is valid.
1800 self.nodes = _GetWantedNodes(self, self.op.nodes)
1802 def Exec(self, feedback_fn):
1803 """Run a command on some nodes.
1807 for node in self.nodes:
1808 result = ssh.SSHCall(node, "root", self.op.command)
1809 data.append((node, result.output, result.exit_code))
1814 class LUActivateInstanceDisks(NoHooksLU):
1815 """Bring up an instance's disks.
1818 _OP_REQP = ["instance_name"]
1820 def CheckPrereq(self):
1821 """Check prerequisites.
1823 This checks that the instance is in the cluster.
1826 instance = self.cfg.GetInstanceInfo(
1827 self.cfg.ExpandInstanceName(self.op.instance_name))
1828 if instance is None:
1829 raise errors.OpPrereqError("Instance '%s' not known" %
1830 self.op.instance_name)
1831 self.instance = instance
1834 def Exec(self, feedback_fn):
1835 """Activate the disks.
1838 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840 raise errors.OpExecError("Cannot activate block devices")
1845 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846 """Prepare the block devices for an instance.
1848 This sets up the block devices on all nodes.
1851 instance: a ganeti.objects.Instance object
1852 ignore_secondaries: if true, errors on secondary nodes won't result
1853 in an error return from the function
1856 false if the operation failed
1857 list of (host, instance_visible_name, node_visible_name) if the operation
1858 suceeded with the mapping from node devices to instance devices
1862 for inst_disk in instance.disks:
1863 master_result = None
1864 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865 cfg.SetDiskID(node_disk, node)
1866 is_primary = node == instance.primary_node
1867 result = rpc.call_blockdev_assemble(node, node_disk,
1868 instance.name, is_primary)
1870 logger.Error("could not prepare block device %s on node %s"
1871 " (is_primary=%s)" %
1872 (inst_disk.iv_name, node, is_primary))
1873 if is_primary or not ignore_secondaries:
1876 master_result = result
1877 device_info.append((instance.primary_node, inst_disk.iv_name,
1880 # leave the disks configured for the primary node
1881 # this is a workaround that would be fixed better by
1882 # improving the logical/physical id handling
1883 for disk in instance.disks:
1884 cfg.SetDiskID(disk, instance.primary_node)
1886 return disks_ok, device_info
1889 def _StartInstanceDisks(cfg, instance, force):
1890 """Start the disks of an instance.
1893 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894 ignore_secondaries=force)
1896 _ShutdownInstanceDisks(instance, cfg)
1897 if force is not None and not force:
1898 logger.Error("If the message above refers to a secondary node,"
1899 " you can retry the operation using '--force'.")
1900 raise errors.OpExecError("Disk consistency error")
1903 class LUDeactivateInstanceDisks(NoHooksLU):
1904 """Shutdown an instance's disks.
1907 _OP_REQP = ["instance_name"]
1909 def CheckPrereq(self):
1910 """Check prerequisites.
1912 This checks that the instance is in the cluster.
1915 instance = self.cfg.GetInstanceInfo(
1916 self.cfg.ExpandInstanceName(self.op.instance_name))
1917 if instance is None:
1918 raise errors.OpPrereqError("Instance '%s' not known" %
1919 self.op.instance_name)
1920 self.instance = instance
1922 def Exec(self, feedback_fn):
1923 """Deactivate the disks
1926 instance = self.instance
1927 ins_l = rpc.call_instance_list([instance.primary_node])
1928 ins_l = ins_l[instance.primary_node]
1929 if not type(ins_l) is list:
1930 raise errors.OpExecError("Can't contact node '%s'" %
1931 instance.primary_node)
1933 if self.instance.name in ins_l:
1934 raise errors.OpExecError("Instance is running, can't shutdown"
1937 _ShutdownInstanceDisks(instance, self.cfg)
1940 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941 """Shutdown block devices of an instance.
1943 This does the shutdown on all nodes of the instance.
1945 If the ignore_primary is false, errors on the primary node are
1950 for disk in instance.disks:
1951 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952 cfg.SetDiskID(top_disk, node)
1953 if not rpc.call_blockdev_shutdown(node, top_disk):
1954 logger.Error("could not shutdown block device %s on node %s" %
1955 (disk.iv_name, node))
1956 if not ignore_primary or node != instance.primary_node:
1961 class LUStartupInstance(LogicalUnit):
1962 """Starts an instance.
1965 HPATH = "instance-start"
1966 HTYPE = constants.HTYPE_INSTANCE
1967 _OP_REQP = ["instance_name", "force"]
1969 def BuildHooksEnv(self):
1972 This runs on master, primary and secondary nodes of the instance.
1976 "FORCE": self.op.force,
1978 env.update(_BuildInstanceHookEnvByObject(self.instance))
1979 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1980 list(self.instance.secondary_nodes))
1983 def CheckPrereq(self):
1984 """Check prerequisites.
1986 This checks that the instance is in the cluster.
1989 instance = self.cfg.GetInstanceInfo(
1990 self.cfg.ExpandInstanceName(self.op.instance_name))
1991 if instance is None:
1992 raise errors.OpPrereqError("Instance '%s' not known" %
1993 self.op.instance_name)
1995 # check bridges existance
1996 _CheckInstanceBridgesExist(instance)
1998 self.instance = instance
1999 self.op.instance_name = instance.name
2001 def Exec(self, feedback_fn):
2002 """Start the instance.
2005 instance = self.instance
2006 force = self.op.force
2007 extra_args = getattr(self.op, "extra_args", "")
2009 node_current = instance.primary_node
2011 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
2013 raise errors.OpExecError("Could not contact node %s for infos" %
2016 freememory = nodeinfo[node_current]['memory_free']
2017 memory = instance.memory
2018 if memory > freememory:
2019 raise errors.OpExecError("Not enough memory to start instance"
2021 " needed %s MiB, available %s MiB" %
2022 (instance.name, node_current, memory,
2025 _StartInstanceDisks(self.cfg, instance, force)
2027 if not rpc.call_instance_start(node_current, instance, extra_args):
2028 _ShutdownInstanceDisks(instance, self.cfg)
2029 raise errors.OpExecError("Could not start instance")
2031 self.cfg.MarkInstanceUp(instance.name)
2034 class LURebootInstance(LogicalUnit):
2035 """Reboot an instance.
2038 HPATH = "instance-reboot"
2039 HTYPE = constants.HTYPE_INSTANCE
2040 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2042 def BuildHooksEnv(self):
2045 This runs on master, primary and secondary nodes of the instance.
2049 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2051 env.update(_BuildInstanceHookEnvByObject(self.instance))
2052 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053 list(self.instance.secondary_nodes))
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2059 This checks that the instance is in the cluster.
2062 instance = self.cfg.GetInstanceInfo(
2063 self.cfg.ExpandInstanceName(self.op.instance_name))
2064 if instance is None:
2065 raise errors.OpPrereqError("Instance '%s' not known" %
2066 self.op.instance_name)
2068 # check bridges existance
2069 _CheckInstanceBridgesExist(instance)
2071 self.instance = instance
2072 self.op.instance_name = instance.name
2074 def Exec(self, feedback_fn):
2075 """Reboot the instance.
2078 instance = self.instance
2079 ignore_secondaries = self.op.ignore_secondaries
2080 reboot_type = self.op.reboot_type
2081 extra_args = getattr(self.op, "extra_args", "")
2083 node_current = instance.primary_node
2085 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2086 constants.INSTANCE_REBOOT_HARD,
2087 constants.INSTANCE_REBOOT_FULL]:
2088 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2089 (constants.INSTANCE_REBOOT_SOFT,
2090 constants.INSTANCE_REBOOT_HARD,
2091 constants.INSTANCE_REBOOT_FULL))
2093 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2094 constants.INSTANCE_REBOOT_HARD]:
2095 if not rpc.call_instance_reboot(node_current, instance,
2096 reboot_type, extra_args):
2097 raise errors.OpExecError("Could not reboot instance")
2099 if not rpc.call_instance_shutdown(node_current, instance):
2100 raise errors.OpExecError("could not shutdown instance for full reboot")
2101 _ShutdownInstanceDisks(instance, self.cfg)
2102 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2103 if not rpc.call_instance_start(node_current, instance, extra_args):
2104 _ShutdownInstanceDisks(instance, self.cfg)
2105 raise errors.OpExecError("Could not start instance for full reboot")
2107 self.cfg.MarkInstanceUp(instance.name)
2110 class LUShutdownInstance(LogicalUnit):
2111 """Shutdown an instance.
2114 HPATH = "instance-stop"
2115 HTYPE = constants.HTYPE_INSTANCE
2116 _OP_REQP = ["instance_name"]
2118 def BuildHooksEnv(self):
2121 This runs on master, primary and secondary nodes of the instance.
2124 env = _BuildInstanceHookEnvByObject(self.instance)
2125 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126 list(self.instance.secondary_nodes))
2129 def CheckPrereq(self):
2130 """Check prerequisites.
2132 This checks that the instance is in the cluster.
2135 instance = self.cfg.GetInstanceInfo(
2136 self.cfg.ExpandInstanceName(self.op.instance_name))
2137 if instance is None:
2138 raise errors.OpPrereqError("Instance '%s' not known" %
2139 self.op.instance_name)
2140 self.instance = instance
2142 def Exec(self, feedback_fn):
2143 """Shutdown the instance.
2146 instance = self.instance
2147 node_current = instance.primary_node
2148 if not rpc.call_instance_shutdown(node_current, instance):
2149 logger.Error("could not shutdown instance")
2151 self.cfg.MarkInstanceDown(instance.name)
2152 _ShutdownInstanceDisks(instance, self.cfg)
2155 class LUReinstallInstance(LogicalUnit):
2156 """Reinstall an instance.
2159 HPATH = "instance-reinstall"
2160 HTYPE = constants.HTYPE_INSTANCE
2161 _OP_REQP = ["instance_name"]
2163 def BuildHooksEnv(self):
2166 This runs on master, primary and secondary nodes of the instance.
2169 env = _BuildInstanceHookEnvByObject(self.instance)
2170 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2171 list(self.instance.secondary_nodes))
2174 def CheckPrereq(self):
2175 """Check prerequisites.
2177 This checks that the instance is in the cluster and is not running.
2180 instance = self.cfg.GetInstanceInfo(
2181 self.cfg.ExpandInstanceName(self.op.instance_name))
2182 if instance is None:
2183 raise errors.OpPrereqError("Instance '%s' not known" %
2184 self.op.instance_name)
2185 if instance.disk_template == constants.DT_DISKLESS:
2186 raise errors.OpPrereqError("Instance '%s' has no disks" %
2187 self.op.instance_name)
2188 if instance.status != "down":
2189 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2190 self.op.instance_name)
2191 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2193 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2194 (self.op.instance_name,
2195 instance.primary_node))
2197 self.op.os_type = getattr(self.op, "os_type", None)
2198 if self.op.os_type is not None:
2200 pnode = self.cfg.GetNodeInfo(
2201 self.cfg.ExpandNodeName(instance.primary_node))
2203 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2205 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2207 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2208 " primary node" % self.op.os_type)
2210 self.instance = instance
2212 def Exec(self, feedback_fn):
2213 """Reinstall the instance.
2216 inst = self.instance
2218 if self.op.os_type is not None:
2219 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2220 inst.os = self.op.os_type
2221 self.cfg.AddInstance(inst)
2223 _StartInstanceDisks(self.cfg, inst, None)
2225 feedback_fn("Running the instance OS create scripts...")
2226 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2227 raise errors.OpExecError("Could not install OS for instance %s"
2229 (inst.name, inst.primary_node))
2231 _ShutdownInstanceDisks(inst, self.cfg)
2234 class LURenameInstance(LogicalUnit):
2235 """Rename an instance.
2238 HPATH = "instance-rename"
2239 HTYPE = constants.HTYPE_INSTANCE
2240 _OP_REQP = ["instance_name", "new_name"]
2242 def BuildHooksEnv(self):
2245 This runs on master, primary and secondary nodes of the instance.
2248 env = _BuildInstanceHookEnvByObject(self.instance)
2249 env["INSTANCE_NEW_NAME"] = self.op.new_name
2250 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2251 list(self.instance.secondary_nodes))
2254 def CheckPrereq(self):
2255 """Check prerequisites.
2257 This checks that the instance is in the cluster and is not running.
2260 instance = self.cfg.GetInstanceInfo(
2261 self.cfg.ExpandInstanceName(self.op.instance_name))
2262 if instance is None:
2263 raise errors.OpPrereqError("Instance '%s' not known" %
2264 self.op.instance_name)
2265 if instance.status != "down":
2266 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2267 self.op.instance_name)
2268 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2270 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2271 (self.op.instance_name,
2272 instance.primary_node))
2273 self.instance = instance
2275 # new name verification
2276 name_info = utils.HostInfo(self.op.new_name)
2278 self.op.new_name = new_name = name_info.name
2279 if not getattr(self.op, "ignore_ip", False):
2280 command = ["fping", "-q", name_info.ip]
2281 result = utils.RunCmd(command)
2282 if not result.failed:
2283 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2284 (name_info.ip, new_name))
2287 def Exec(self, feedback_fn):
2288 """Reinstall the instance.
2291 inst = self.instance
2292 old_name = inst.name
2294 self.cfg.RenameInstance(inst.name, self.op.new_name)
2296 # re-read the instance from the configuration after rename
2297 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2299 _StartInstanceDisks(self.cfg, inst, None)
2301 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2303 msg = ("Could run OS rename script for instance %s on node %s (but the"
2304 " instance has been renamed in Ganeti)" %
2305 (inst.name, inst.primary_node))
2308 _ShutdownInstanceDisks(inst, self.cfg)
2311 class LURemoveInstance(LogicalUnit):
2312 """Remove an instance.
2315 HPATH = "instance-remove"
2316 HTYPE = constants.HTYPE_INSTANCE
2317 _OP_REQP = ["instance_name"]
2319 def BuildHooksEnv(self):
2322 This runs on master, primary and secondary nodes of the instance.
2325 env = _BuildInstanceHookEnvByObject(self.instance)
2326 nl = [self.sstore.GetMasterNode()]
2329 def CheckPrereq(self):
2330 """Check prerequisites.
2332 This checks that the instance is in the cluster.
2335 instance = self.cfg.GetInstanceInfo(
2336 self.cfg.ExpandInstanceName(self.op.instance_name))
2337 if instance is None:
2338 raise errors.OpPrereqError("Instance '%s' not known" %
2339 self.op.instance_name)
2340 self.instance = instance
2342 def Exec(self, feedback_fn):
2343 """Remove the instance.
2346 instance = self.instance
2347 logger.Info("shutting down instance %s on node %s" %
2348 (instance.name, instance.primary_node))
2350 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351 if self.op.ignore_failures:
2352 feedback_fn("Warning: can't shutdown instance")
2354 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355 (instance.name, instance.primary_node))
2357 logger.Info("removing block devices for instance %s" % instance.name)
2359 if not _RemoveDisks(instance, self.cfg):
2360 if self.op.ignore_failures:
2361 feedback_fn("Warning: can't remove instance's disks")
2363 raise errors.OpExecError("Can't remove instance's disks")
2365 logger.Info("removing instance %s out of cluster config" % instance.name)
2367 self.cfg.RemoveInstance(instance.name)
2370 class LUQueryInstances(NoHooksLU):
2371 """Logical unit for querying instances.
2374 _OP_REQP = ["output_fields", "names"]
2376 def CheckPrereq(self):
2377 """Check prerequisites.
2379 This checks that the fields required are valid output fields.
2382 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2383 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384 "admin_state", "admin_ram",
2385 "disk_template", "ip", "mac", "bridge",
2386 "sda_size", "sdb_size"],
2387 dynamic=self.dynamic_fields,
2388 selected=self.op.output_fields)
2390 self.wanted = _GetWantedInstances(self, self.op.names)
2392 def Exec(self, feedback_fn):
2393 """Computes the list of nodes and their attributes.
2396 instance_names = self.wanted
2397 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2400 # begin data gathering
2402 nodes = frozenset([inst.primary_node for inst in instance_list])
2405 if self.dynamic_fields.intersection(self.op.output_fields):
2407 node_data = rpc.call_all_instances_info(nodes)
2409 result = node_data[name]
2411 live_data.update(result)
2412 elif result == False:
2413 bad_nodes.append(name)
2414 # else no instance is alive
2416 live_data = dict([(name, {}) for name in instance_names])
2418 # end data gathering
2421 for instance in instance_list:
2423 for field in self.op.output_fields:
2428 elif field == "pnode":
2429 val = instance.primary_node
2430 elif field == "snodes":
2431 val = list(instance.secondary_nodes)
2432 elif field == "admin_state":
2433 val = (instance.status != "down")
2434 elif field == "oper_state":
2435 if instance.primary_node in bad_nodes:
2438 val = bool(live_data.get(instance.name))
2439 elif field == "admin_ram":
2440 val = instance.memory
2441 elif field == "oper_ram":
2442 if instance.primary_node in bad_nodes:
2444 elif instance.name in live_data:
2445 val = live_data[instance.name].get("memory", "?")
2448 elif field == "disk_template":
2449 val = instance.disk_template
2451 val = instance.nics[0].ip
2452 elif field == "bridge":
2453 val = instance.nics[0].bridge
2454 elif field == "mac":
2455 val = instance.nics[0].mac
2456 elif field == "sda_size" or field == "sdb_size":
2457 disk = instance.FindDisk(field[:3])
2463 raise errors.ParameterError(field)
2470 class LUFailoverInstance(LogicalUnit):
2471 """Failover an instance.
2474 HPATH = "instance-failover"
2475 HTYPE = constants.HTYPE_INSTANCE
2476 _OP_REQP = ["instance_name", "ignore_consistency"]
2478 def BuildHooksEnv(self):
2481 This runs on master, primary and secondary nodes of the instance.
2485 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2487 env.update(_BuildInstanceHookEnvByObject(self.instance))
2488 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2491 def CheckPrereq(self):
2492 """Check prerequisites.
2494 This checks that the instance is in the cluster.
2497 instance = self.cfg.GetInstanceInfo(
2498 self.cfg.ExpandInstanceName(self.op.instance_name))
2499 if instance is None:
2500 raise errors.OpPrereqError("Instance '%s' not known" %
2501 self.op.instance_name)
2503 if instance.disk_template not in constants.DTS_NET_MIRROR:
2504 raise errors.OpPrereqError("Instance's disk layout is not"
2505 " network mirrored, cannot failover.")
2507 secondary_nodes = instance.secondary_nodes
2508 if not secondary_nodes:
2509 raise errors.ProgrammerError("no secondary node but using "
2510 "DT_REMOTE_RAID1 template")
2512 # check memory requirements on the secondary node
2513 target_node = secondary_nodes[0]
2514 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2515 info = nodeinfo.get(target_node, None)
2517 raise errors.OpPrereqError("Cannot get current information"
2518 " from node '%s'" % nodeinfo)
2519 if instance.memory > info['memory_free']:
2520 raise errors.OpPrereqError("Not enough memory on target node %s."
2521 " %d MB available, %d MB required" %
2522 (target_node, info['memory_free'],
2525 # check bridge existance
2526 brlist = [nic.bridge for nic in instance.nics]
2527 if not rpc.call_bridges_exist(target_node, brlist):
2528 raise errors.OpPrereqError("One or more target bridges %s does not"
2529 " exist on destination node '%s'" %
2530 (brlist, target_node))
2532 self.instance = instance
2534 def Exec(self, feedback_fn):
2535 """Failover an instance.
2537 The failover is done by shutting it down on its present node and
2538 starting it on the secondary.
2541 instance = self.instance
2543 source_node = instance.primary_node
2544 target_node = instance.secondary_nodes[0]
2546 feedback_fn("* checking disk consistency between source and target")
2547 for dev in instance.disks:
2548 # for remote_raid1, these are md over drbd
2549 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2550 if not self.op.ignore_consistency:
2551 raise errors.OpExecError("Disk %s is degraded on target node,"
2552 " aborting failover." % dev.iv_name)
2554 feedback_fn("* checking target node resource availability")
2555 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2558 raise errors.OpExecError("Could not contact target node %s." %
2561 free_memory = int(nodeinfo[target_node]['memory_free'])
2562 memory = instance.memory
2563 if memory > free_memory:
2564 raise errors.OpExecError("Not enough memory to create instance %s on"
2565 " node %s. needed %s MiB, available %s MiB" %
2566 (instance.name, target_node, memory,
2569 feedback_fn("* shutting down instance on source node")
2570 logger.Info("Shutting down instance %s on node %s" %
2571 (instance.name, source_node))
2573 if not rpc.call_instance_shutdown(source_node, instance):
2574 if self.op.ignore_consistency:
2575 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576 " anyway. Please make sure node %s is down" %
2577 (instance.name, source_node, source_node))
2579 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580 (instance.name, source_node))
2582 feedback_fn("* deactivating the instance's disks on source node")
2583 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584 raise errors.OpExecError("Can't shut down the instance's disks.")
2586 instance.primary_node = target_node
2587 # distribute new instance config to the other nodes
2588 self.cfg.AddInstance(instance)
2590 feedback_fn("* activating the instance's disks on target node")
2591 logger.Info("Starting instance %s on node %s" %
2592 (instance.name, target_node))
2594 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595 ignore_secondaries=True)
2597 _ShutdownInstanceDisks(instance, self.cfg)
2598 raise errors.OpExecError("Can't activate the instance's disks")
2600 feedback_fn("* starting the instance on the target node")
2601 if not rpc.call_instance_start(target_node, instance, None):
2602 _ShutdownInstanceDisks(instance, self.cfg)
2603 raise errors.OpExecError("Could not start instance %s on node %s." %
2604 (instance.name, target_node))
2607 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608 """Create a tree of block devices on the primary node.
2610 This always creates all devices.
2614 for child in device.children:
2615 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2618 cfg.SetDiskID(device, node)
2619 new_id = rpc.call_blockdev_create(node, device, device.size,
2620 instance.name, True, info)
2623 if device.physical_id is None:
2624 device.physical_id = new_id
2628 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629 """Create a tree of block devices on a secondary node.
2631 If this device type has to be created on secondaries, create it and
2634 If not, just recurse to children keeping the same 'force' value.
2637 if device.CreateOnSecondary():
2640 for child in device.children:
2641 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642 child, force, info):
2647 cfg.SetDiskID(device, node)
2648 new_id = rpc.call_blockdev_create(node, device, device.size,
2649 instance.name, False, info)
2652 if device.physical_id is None:
2653 device.physical_id = new_id
2657 def _GenerateUniqueNames(cfg, exts):
2658 """Generate a suitable LV name.
2660 This will generate a logical volume name for the given instance.
2665 new_id = cfg.GenerateUniqueID()
2666 results.append("%s%s" % (new_id, val))
2670 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671 """Generate a drbd device complete with its children.
2674 port = cfg.AllocatePort()
2675 vgname = cfg.GetVGName()
2676 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677 logical_id=(vgname, names[0]))
2678 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679 logical_id=(vgname, names[1]))
2680 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681 logical_id = (primary, secondary, port),
2682 children = [dev_data, dev_meta])
2686 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687 """Generate a drbd8 device complete with its children.
2690 port = cfg.AllocatePort()
2691 vgname = cfg.GetVGName()
2692 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693 logical_id=(vgname, names[0]))
2694 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695 logical_id=(vgname, names[1]))
2696 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697 logical_id = (primary, secondary, port),
2698 children = [dev_data, dev_meta],
2702 def _GenerateDiskTemplate(cfg, template_name,
2703 instance_name, primary_node,
2704 secondary_nodes, disk_sz, swap_sz):
2705 """Generate the entire disk layout for a given template type.
2708 #TODO: compute space requirements
2710 vgname = cfg.GetVGName()
2711 if template_name == "diskless":
2713 elif template_name == "plain":
2714 if len(secondary_nodes) != 0:
2715 raise errors.ProgrammerError("Wrong template configuration")
2717 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719 logical_id=(vgname, names[0]),
2721 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722 logical_id=(vgname, names[1]),
2724 disks = [sda_dev, sdb_dev]
2725 elif template_name == "local_raid1":
2726 if len(secondary_nodes) != 0:
2727 raise errors.ProgrammerError("Wrong template configuration")
2730 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731 ".sdb_m1", ".sdb_m2"])
2732 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733 logical_id=(vgname, names[0]))
2734 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735 logical_id=(vgname, names[1]))
2736 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2738 children = [sda_dev_m1, sda_dev_m2])
2739 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740 logical_id=(vgname, names[2]))
2741 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742 logical_id=(vgname, names[3]))
2743 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2745 children = [sdb_dev_m1, sdb_dev_m2])
2746 disks = [md_sda_dev, md_sdb_dev]
2747 elif template_name == constants.DT_REMOTE_RAID1:
2748 if len(secondary_nodes) != 1:
2749 raise errors.ProgrammerError("Wrong template configuration")
2750 remote_node = secondary_nodes[0]
2751 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752 ".sdb_data", ".sdb_meta"])
2753 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754 disk_sz, names[0:2])
2755 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756 children = [drbd_sda_dev], size=disk_sz)
2757 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758 swap_sz, names[2:4])
2759 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760 children = [drbd_sdb_dev], size=swap_sz)
2761 disks = [md_sda_dev, md_sdb_dev]
2762 elif template_name == constants.DT_DRBD8:
2763 if len(secondary_nodes) != 1:
2764 raise errors.ProgrammerError("Wrong template configuration")
2765 remote_node = secondary_nodes[0]
2766 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767 ".sdb_data", ".sdb_meta"])
2768 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769 disk_sz, names[0:2], "sda")
2770 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771 swap_sz, names[2:4], "sdb")
2772 disks = [drbd_sda_dev, drbd_sdb_dev]
2774 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2778 def _GetInstanceInfoText(instance):
2779 """Compute that text that should be added to the disk's metadata.
2782 return "originstname+%s" % instance.name
2785 def _CreateDisks(cfg, instance):
2786 """Create all disks for an instance.
2788 This abstracts away some work from AddInstance.
2791 instance: the instance object
2794 True or False showing the success of the creation process
2797 info = _GetInstanceInfoText(instance)
2799 for device in instance.disks:
2800 logger.Info("creating volume %s for instance %s" %
2801 (device.iv_name, instance.name))
2803 for secondary_node in instance.secondary_nodes:
2804 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805 device, False, info):
2806 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807 (device.iv_name, device, secondary_node))
2810 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811 instance, device, info):
2812 logger.Error("failed to create volume %s on primary!" %
2818 def _RemoveDisks(instance, cfg):
2819 """Remove all disks for an instance.
2821 This abstracts away some work from `AddInstance()` and
2822 `RemoveInstance()`. Note that in case some of the devices couldn't
2823 be removed, the removal will continue with the other ones (compare
2824 with `_CreateDisks()`).
2827 instance: the instance object
2830 True or False showing the success of the removal proces
2833 logger.Info("removing block devices for instance %s" % instance.name)
2836 for device in instance.disks:
2837 for node, disk in device.ComputeNodeTree(instance.primary_node):
2838 cfg.SetDiskID(disk, node)
2839 if not rpc.call_blockdev_remove(node, disk):
2840 logger.Error("could not remove block device %s on node %s,"
2841 " continuing anyway" %
2842 (device.iv_name, node))
2847 class LUCreateInstance(LogicalUnit):
2848 """Create an instance.
2851 HPATH = "instance-add"
2852 HTYPE = constants.HTYPE_INSTANCE
2853 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854 "disk_template", "swap_size", "mode", "start", "vcpus",
2855 "wait_for_sync", "ip_check", "mac"]
2857 def BuildHooksEnv(self):
2860 This runs on master, primary and secondary nodes of the instance.
2864 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865 "INSTANCE_DISK_SIZE": self.op.disk_size,
2866 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867 "INSTANCE_ADD_MODE": self.op.mode,
2869 if self.op.mode == constants.INSTANCE_IMPORT:
2870 env["INSTANCE_SRC_NODE"] = self.op.src_node
2871 env["INSTANCE_SRC_PATH"] = self.op.src_path
2872 env["INSTANCE_SRC_IMAGE"] = self.src_image
2874 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875 primary_node=self.op.pnode,
2876 secondary_nodes=self.secondaries,
2877 status=self.instance_status,
2878 os_type=self.op.os_type,
2879 memory=self.op.mem_size,
2880 vcpus=self.op.vcpus,
2881 nics=[(self.inst_ip, self.op.bridge)],
2884 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2889 def CheckPrereq(self):
2890 """Check prerequisites.
2893 if self.op.mode not in (constants.INSTANCE_CREATE,
2894 constants.INSTANCE_IMPORT):
2895 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2898 if self.op.mode == constants.INSTANCE_IMPORT:
2899 src_node = getattr(self.op, "src_node", None)
2900 src_path = getattr(self.op, "src_path", None)
2901 if src_node is None or src_path is None:
2902 raise errors.OpPrereqError("Importing an instance requires source"
2903 " node and path options")
2904 src_node_full = self.cfg.ExpandNodeName(src_node)
2905 if src_node_full is None:
2906 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2907 self.op.src_node = src_node = src_node_full
2909 if not os.path.isabs(src_path):
2910 raise errors.OpPrereqError("The source path must be absolute")
2912 export_info = rpc.call_export_info(src_node, src_path)
2915 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2917 if not export_info.has_section(constants.INISECT_EXP):
2918 raise errors.ProgrammerError("Corrupted export config")
2920 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2921 if (int(ei_version) != constants.EXPORT_VERSION):
2922 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2923 (ei_version, constants.EXPORT_VERSION))
2925 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2926 raise errors.OpPrereqError("Can't import instance with more than"
2929 # FIXME: are the old os-es, disk sizes, etc. useful?
2930 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2931 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2933 self.src_image = diskimage
2934 else: # INSTANCE_CREATE
2935 if getattr(self.op, "os_type", None) is None:
2936 raise errors.OpPrereqError("No guest OS specified")
2938 # check primary node
2939 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2941 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2943 self.op.pnode = pnode.name
2945 self.secondaries = []
2946 # disk template and mirror node verification
2947 if self.op.disk_template not in constants.DISK_TEMPLATES:
2948 raise errors.OpPrereqError("Invalid disk template name")
2950 if self.op.disk_template in constants.DTS_NET_MIRROR:
2951 if getattr(self.op, "snode", None) is None:
2952 raise errors.OpPrereqError("The networked disk templates need"
2955 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2956 if snode_name is None:
2957 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2959 elif snode_name == pnode.name:
2960 raise errors.OpPrereqError("The secondary node cannot be"
2961 " the primary node.")
2962 self.secondaries.append(snode_name)
2964 # Check lv size requirements
2965 nodenames = [pnode.name] + self.secondaries
2966 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2968 # Required free disk space as a function of disk and swap space
2970 constants.DT_DISKLESS: 0,
2971 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973 # 256 MB are added for drbd metadata, 128MB for each drbd device
2974 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2978 if self.op.disk_template not in req_size_dict:
2979 raise errors.ProgrammerError("Disk template '%s' size requirement"
2980 " is unknown" % self.op.disk_template)
2982 req_size = req_size_dict[self.op.disk_template]
2984 for node in nodenames:
2985 info = nodeinfo.get(node, None)
2987 raise errors.OpPrereqError("Cannot get current information"
2988 " from node '%s'" % nodeinfo)
2989 if req_size > info['vg_free']:
2990 raise errors.OpPrereqError("Not enough disk space on target node %s."
2991 " %d MB available, %d MB required" %
2992 (node, info['vg_free'], req_size))
2995 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2997 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2998 " primary node" % self.op.os_type)
3000 if self.op.kernel_path == constants.VALUE_NONE:
3001 raise errors.OpPrereqError("Can't set instance kernel to none")
3003 # instance verification
3004 hostname1 = utils.HostInfo(self.op.instance_name)
3006 self.op.instance_name = instance_name = hostname1.name
3007 instance_list = self.cfg.GetInstanceList()
3008 if instance_name in instance_list:
3009 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3012 ip = getattr(self.op, "ip", None)
3013 if ip is None or ip.lower() == "none":
3015 elif ip.lower() == "auto":
3016 inst_ip = hostname1.ip
3018 if not utils.IsValidIP(ip):
3019 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3020 " like a valid IP" % ip)
3022 self.inst_ip = inst_ip
3024 if self.op.start and not self.op.ip_check:
3025 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3026 " adding an instance in start mode")
3028 if self.op.ip_check:
3029 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3030 constants.DEFAULT_NODED_PORT):
3031 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3032 (hostname1.ip, instance_name))
3034 # MAC address verification
3035 if self.op.mac != "auto":
3036 if not utils.IsValidMac(self.op.mac.lower()):
3037 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3040 # bridge verification
3041 bridge = getattr(self.op, "bridge", None)
3043 self.op.bridge = self.cfg.GetDefBridge()
3045 self.op.bridge = bridge
3047 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3048 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3049 " destination node '%s'" %
3050 (self.op.bridge, pnode.name))
3053 self.instance_status = 'up'
3055 self.instance_status = 'down'
3057 def Exec(self, feedback_fn):
3058 """Create and add the instance to the cluster.
3061 instance = self.op.instance_name
3062 pnode_name = self.pnode.name
3064 if self.op.mac == "auto":
3065 mac_address=self.cfg.GenerateMAC()
3067 mac_address=self.op.mac
3069 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3070 if self.inst_ip is not None:
3071 nic.ip = self.inst_ip
3073 ht_kind = self.sstore.GetHypervisorType()
3074 if ht_kind in constants.HTS_REQ_PORT:
3075 network_port = self.cfg.AllocatePort()
3079 disks = _GenerateDiskTemplate(self.cfg,
3080 self.op.disk_template,
3081 instance, pnode_name,
3082 self.secondaries, self.op.disk_size,
3085 iobj = objects.Instance(name=instance, os=self.op.os_type,
3086 primary_node=pnode_name,
3087 memory=self.op.mem_size,
3088 vcpus=self.op.vcpus,
3089 nics=[nic], disks=disks,
3090 disk_template=self.op.disk_template,
3091 status=self.instance_status,
3092 network_port=network_port,
3093 kernel_path=self.op.kernel_path,
3094 initrd_path=self.op.initrd_path,
3097 feedback_fn("* creating instance disks...")
3098 if not _CreateDisks(self.cfg, iobj):
3099 _RemoveDisks(iobj, self.cfg)
3100 raise errors.OpExecError("Device creation failed, reverting...")
3102 feedback_fn("adding instance %s to cluster config" % instance)
3104 self.cfg.AddInstance(iobj)
3106 if self.op.wait_for_sync:
3107 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3108 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3109 # make sure the disks are not degraded (still sync-ing is ok)
3111 feedback_fn("* checking mirrors status")
3112 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3117 _RemoveDisks(iobj, self.cfg)
3118 self.cfg.RemoveInstance(iobj.name)
3119 raise errors.OpExecError("There are some degraded disks for"
3122 feedback_fn("creating os for instance %s on node %s" %
3123 (instance, pnode_name))
3125 if iobj.disk_template != constants.DT_DISKLESS:
3126 if self.op.mode == constants.INSTANCE_CREATE:
3127 feedback_fn("* running the instance OS create scripts...")
3128 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3129 raise errors.OpExecError("could not add os for instance %s"
3131 (instance, pnode_name))
3133 elif self.op.mode == constants.INSTANCE_IMPORT:
3134 feedback_fn("* running the instance OS import scripts...")
3135 src_node = self.op.src_node
3136 src_image = self.src_image
3137 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3138 src_node, src_image):
3139 raise errors.OpExecError("Could not import os for instance"
3141 (instance, pnode_name))
3143 # also checked in the prereq part
3144 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3148 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3149 feedback_fn("* starting instance...")
3150 if not rpc.call_instance_start(pnode_name, iobj, None):
3151 raise errors.OpExecError("Could not start instance")
3154 class LUConnectConsole(NoHooksLU):
3155 """Connect to an instance's console.
3157 This is somewhat special in that it returns the command line that
3158 you need to run on the master node in order to connect to the
3162 _OP_REQP = ["instance_name"]
3164 def CheckPrereq(self):
3165 """Check prerequisites.
3167 This checks that the instance is in the cluster.
3170 instance = self.cfg.GetInstanceInfo(
3171 self.cfg.ExpandInstanceName(self.op.instance_name))
3172 if instance is None:
3173 raise errors.OpPrereqError("Instance '%s' not known" %
3174 self.op.instance_name)
3175 self.instance = instance
3177 def Exec(self, feedback_fn):
3178 """Connect to the console of an instance
3181 instance = self.instance
3182 node = instance.primary_node
3184 node_insts = rpc.call_instance_list([node])[node]
3185 if node_insts is False:
3186 raise errors.OpExecError("Can't connect to node %s." % node)
3188 if instance.name not in node_insts:
3189 raise errors.OpExecError("Instance %s is not running." % instance.name)
3191 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3193 hyper = hypervisor.GetHypervisor()
3194 console_cmd = hyper.GetShellCommandForConsole(instance)
3196 argv = ["ssh", "-q", "-t"]
3197 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3198 argv.extend(ssh.BATCH_MODE_OPTS)
3200 argv.append(console_cmd)
3204 class LUAddMDDRBDComponent(LogicalUnit):
3205 """Adda new mirror member to an instance's disk.
3208 HPATH = "mirror-add"
3209 HTYPE = constants.HTYPE_INSTANCE
3210 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3212 def BuildHooksEnv(self):
3215 This runs on the master, the primary and all the secondaries.
3219 "NEW_SECONDARY": self.op.remote_node,
3220 "DISK_NAME": self.op.disk_name,
3222 env.update(_BuildInstanceHookEnvByObject(self.instance))
3223 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3224 self.op.remote_node,] + list(self.instance.secondary_nodes)
3227 def CheckPrereq(self):
3228 """Check prerequisites.
3230 This checks that the instance is in the cluster.
3233 instance = self.cfg.GetInstanceInfo(
3234 self.cfg.ExpandInstanceName(self.op.instance_name))
3235 if instance is None:
3236 raise errors.OpPrereqError("Instance '%s' not known" %
3237 self.op.instance_name)
3238 self.instance = instance
3240 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3241 if remote_node is None:
3242 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3243 self.remote_node = remote_node
3245 if remote_node == instance.primary_node:
3246 raise errors.OpPrereqError("The specified node is the primary node of"
3249 if instance.disk_template != constants.DT_REMOTE_RAID1:
3250 raise errors.OpPrereqError("Instance's disk layout is not"
3252 for disk in instance.disks:
3253 if disk.iv_name == self.op.disk_name:
3256 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3257 " instance." % self.op.disk_name)
3258 if len(disk.children) > 1:
3259 raise errors.OpPrereqError("The device already has two slave devices."
3260 " This would create a 3-disk raid1 which we"
3264 def Exec(self, feedback_fn):
3265 """Add the mirror component
3269 instance = self.instance
3271 remote_node = self.remote_node
3272 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3273 names = _GenerateUniqueNames(self.cfg, lv_names)
3274 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3275 remote_node, disk.size, names)
3277 logger.Info("adding new mirror component on secondary")
3279 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3281 _GetInstanceInfoText(instance)):
3282 raise errors.OpExecError("Failed to create new component on secondary"
3283 " node %s" % remote_node)
3285 logger.Info("adding new mirror component on primary")
3287 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3289 _GetInstanceInfoText(instance)):
3290 # remove secondary dev
3291 self.cfg.SetDiskID(new_drbd, remote_node)
3292 rpc.call_blockdev_remove(remote_node, new_drbd)
3293 raise errors.OpExecError("Failed to create volume on primary")
3295 # the device exists now
3296 # call the primary node to add the mirror to md
3297 logger.Info("adding new mirror component to md")
3298 if not rpc.call_blockdev_addchildren(instance.primary_node,
3300 logger.Error("Can't add mirror compoment to md!")
3301 self.cfg.SetDiskID(new_drbd, remote_node)
3302 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3303 logger.Error("Can't rollback on secondary")
3304 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3305 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3306 logger.Error("Can't rollback on primary")
3307 raise errors.OpExecError("Can't add mirror component to md array")
3309 disk.children.append(new_drbd)
3311 self.cfg.AddInstance(instance)
3313 _WaitForSync(self.cfg, instance, self.proc)
3318 class LURemoveMDDRBDComponent(LogicalUnit):
3319 """Remove a component from a remote_raid1 disk.
3322 HPATH = "mirror-remove"
3323 HTYPE = constants.HTYPE_INSTANCE
3324 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3326 def BuildHooksEnv(self):
3329 This runs on the master, the primary and all the secondaries.
3333 "DISK_NAME": self.op.disk_name,
3334 "DISK_ID": self.op.disk_id,
3335 "OLD_SECONDARY": self.old_secondary,
3337 env.update(_BuildInstanceHookEnvByObject(self.instance))
3338 nl = [self.sstore.GetMasterNode(),
3339 self.instance.primary_node] + list(self.instance.secondary_nodes)
3342 def CheckPrereq(self):
3343 """Check prerequisites.
3345 This checks that the instance is in the cluster.
3348 instance = self.cfg.GetInstanceInfo(
3349 self.cfg.ExpandInstanceName(self.op.instance_name))
3350 if instance is None:
3351 raise errors.OpPrereqError("Instance '%s' not known" %
3352 self.op.instance_name)
3353 self.instance = instance
3355 if instance.disk_template != constants.DT_REMOTE_RAID1:
3356 raise errors.OpPrereqError("Instance's disk layout is not"
3358 for disk in instance.disks:
3359 if disk.iv_name == self.op.disk_name:
3362 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3363 " instance." % self.op.disk_name)
3364 for child in disk.children:
3365 if (child.dev_type == constants.LD_DRBD7 and
3366 child.logical_id[2] == self.op.disk_id):
3369 raise errors.OpPrereqError("Can't find the device with this port.")
3371 if len(disk.children) < 2:
3372 raise errors.OpPrereqError("Cannot remove the last component from"
3376 if self.child.logical_id[0] == instance.primary_node:
3380 self.old_secondary = self.child.logical_id[oid]
3382 def Exec(self, feedback_fn):
3383 """Remove the mirror component
3386 instance = self.instance
3389 logger.Info("remove mirror component")
3390 self.cfg.SetDiskID(disk, instance.primary_node)
3391 if not rpc.call_blockdev_removechildren(instance.primary_node,
3393 raise errors.OpExecError("Can't remove child from mirror.")
3395 for node in child.logical_id[:2]:
3396 self.cfg.SetDiskID(child, node)
3397 if not rpc.call_blockdev_remove(node, child):
3398 logger.Error("Warning: failed to remove device from node %s,"
3399 " continuing operation." % node)
3401 disk.children.remove(child)
3402 self.cfg.AddInstance(instance)
3405 class LUReplaceDisks(LogicalUnit):
3406 """Replace the disks of an instance.
3409 HPATH = "mirrors-replace"
3410 HTYPE = constants.HTYPE_INSTANCE
3411 _OP_REQP = ["instance_name", "mode", "disks"]
3413 def BuildHooksEnv(self):
3416 This runs on the master, the primary and all the secondaries.
3420 "MODE": self.op.mode,
3421 "NEW_SECONDARY": self.op.remote_node,
3422 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3424 env.update(_BuildInstanceHookEnvByObject(self.instance))
3426 self.sstore.GetMasterNode(),
3427 self.instance.primary_node,
3429 if self.op.remote_node is not None:
3430 nl.append(self.op.remote_node)
3433 def CheckPrereq(self):
3434 """Check prerequisites.
3436 This checks that the instance is in the cluster.
3439 instance = self.cfg.GetInstanceInfo(
3440 self.cfg.ExpandInstanceName(self.op.instance_name))
3441 if instance is None:
3442 raise errors.OpPrereqError("Instance '%s' not known" %
3443 self.op.instance_name)
3444 self.instance = instance
3445 self.op.instance_name = instance.name
3447 if instance.disk_template not in constants.DTS_NET_MIRROR:
3448 raise errors.OpPrereqError("Instance's disk layout is not"
3449 " network mirrored.")
3451 if len(instance.secondary_nodes) != 1:
3452 raise errors.OpPrereqError("The instance has a strange layout,"
3453 " expected one secondary but found %d" %
3454 len(instance.secondary_nodes))
3456 self.sec_node = instance.secondary_nodes[0]
3458 remote_node = getattr(self.op, "remote_node", None)
3459 if remote_node is not None:
3460 remote_node = self.cfg.ExpandNodeName(remote_node)
3461 if remote_node is None:
3462 raise errors.OpPrereqError("Node '%s' not known" %
3463 self.op.remote_node)
3464 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3466 self.remote_node_info = None
3467 if remote_node == instance.primary_node:
3468 raise errors.OpPrereqError("The specified node is the primary node of"
3470 elif remote_node == self.sec_node:
3471 if self.op.mode == constants.REPLACE_DISK_SEC:
3472 # this is for DRBD8, where we can't execute the same mode of
3473 # replacement as for drbd7 (no different port allocated)
3474 raise errors.OpPrereqError("Same secondary given, cannot execute"
3476 # the user gave the current secondary, switch to
3477 # 'no-replace-secondary' mode for drbd7
3479 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3480 self.op.mode != constants.REPLACE_DISK_ALL):
3481 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3482 " disks replacement, not individual ones")
3483 if instance.disk_template == constants.DT_DRBD8:
3484 if (self.op.mode == constants.REPLACE_DISK_ALL and
3485 remote_node is not None):
3486 # switch to replace secondary mode
3487 self.op.mode = constants.REPLACE_DISK_SEC
3489 if self.op.mode == constants.REPLACE_DISK_ALL:
3490 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3491 " secondary disk replacement, not"
3493 elif self.op.mode == constants.REPLACE_DISK_PRI:
3494 if remote_node is not None:
3495 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3496 " the secondary while doing a primary"
3497 " node disk replacement")
3498 self.tgt_node = instance.primary_node
3499 self.oth_node = instance.secondary_nodes[0]
3500 elif self.op.mode == constants.REPLACE_DISK_SEC:
3501 self.new_node = remote_node # this can be None, in which case
3502 # we don't change the secondary
3503 self.tgt_node = instance.secondary_nodes[0]
3504 self.oth_node = instance.primary_node
3506 raise errors.ProgrammerError("Unhandled disk replace mode")
3508 for name in self.op.disks:
3509 if instance.FindDisk(name) is None:
3510 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3511 (name, instance.name))
3512 self.op.remote_node = remote_node
3514 def _ExecRR1(self, feedback_fn):
3515 """Replace the disks of an instance.
3518 instance = self.instance
3521 if self.op.remote_node is None:
3522 remote_node = self.sec_node
3524 remote_node = self.op.remote_node
3526 for dev in instance.disks:
3528 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3529 names = _GenerateUniqueNames(cfg, lv_names)
3530 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3531 remote_node, size, names)
3532 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3533 logger.Info("adding new mirror component on secondary for %s" %
3536 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3538 _GetInstanceInfoText(instance)):
3539 raise errors.OpExecError("Failed to create new component on secondary"
3540 " node %s. Full abort, cleanup manually!" %
3543 logger.Info("adding new mirror component on primary")
3545 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3547 _GetInstanceInfoText(instance)):
3548 # remove secondary dev
3549 cfg.SetDiskID(new_drbd, remote_node)
3550 rpc.call_blockdev_remove(remote_node, new_drbd)
3551 raise errors.OpExecError("Failed to create volume on primary!"
3552 " Full abort, cleanup manually!!")
3554 # the device exists now
3555 # call the primary node to add the mirror to md
3556 logger.Info("adding new mirror component to md")
3557 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3559 logger.Error("Can't add mirror compoment to md!")
3560 cfg.SetDiskID(new_drbd, remote_node)
3561 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3562 logger.Error("Can't rollback on secondary")
3563 cfg.SetDiskID(new_drbd, instance.primary_node)
3564 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3565 logger.Error("Can't rollback on primary")
3566 raise errors.OpExecError("Full abort, cleanup manually!!")
3568 dev.children.append(new_drbd)
3569 cfg.AddInstance(instance)
3571 # this can fail as the old devices are degraded and _WaitForSync
3572 # does a combined result over all disks, so we don't check its
3574 _WaitForSync(cfg, instance, self.proc, unlock=True)
3576 # so check manually all the devices
3577 for name in iv_names:
3578 dev, child, new_drbd = iv_names[name]
3579 cfg.SetDiskID(dev, instance.primary_node)
3580 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3582 raise errors.OpExecError("MD device %s is degraded!" % name)
3583 cfg.SetDiskID(new_drbd, instance.primary_node)
3584 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3586 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3588 for name in iv_names:
3589 dev, child, new_drbd = iv_names[name]
3590 logger.Info("remove mirror %s component" % name)
3591 cfg.SetDiskID(dev, instance.primary_node)
3592 if not rpc.call_blockdev_removechildren(instance.primary_node,
3594 logger.Error("Can't remove child from mirror, aborting"
3595 " *this device cleanup*.\nYou need to cleanup manually!!")
3598 for node in child.logical_id[:2]:
3599 logger.Info("remove child device on %s" % node)
3600 cfg.SetDiskID(child, node)
3601 if not rpc.call_blockdev_remove(node, child):
3602 logger.Error("Warning: failed to remove device from node %s,"
3603 " continuing operation." % node)
3605 dev.children.remove(child)
3607 cfg.AddInstance(instance)
3609 def _ExecD8DiskOnly(self, feedback_fn):
3610 """Replace a disk on the primary or secondary for dbrd8.
3612 The algorithm for replace is quite complicated:
3613 - for each disk to be replaced:
3614 - create new LVs on the target node with unique names
3615 - detach old LVs from the drbd device
3616 - rename old LVs to name_replaced.<time_t>
3617 - rename new LVs to old LVs
3618 - attach the new LVs (with the old names now) to the drbd device
3619 - wait for sync across all devices
3620 - for each modified disk:
3621 - remove old LVs (which have the name name_replaces.<time_t>)
3623 Failures are not very well handled.
3627 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3628 instance = self.instance
3630 vgname = self.cfg.GetVGName()
3633 tgt_node = self.tgt_node
3634 oth_node = self.oth_node
3636 # Step: check device activation
3637 self.proc.LogStep(1, steps_total, "check device existence")
3638 info("checking volume groups")
3639 my_vg = cfg.GetVGName()
3640 results = rpc.call_vg_list([oth_node, tgt_node])
3642 raise errors.OpExecError("Can't list volume groups on the nodes")
3643 for node in oth_node, tgt_node:
3644 res = results.get(node, False)
3645 if not res or my_vg not in res:
3646 raise errors.OpExecError("Volume group '%s' not found on %s" %
3648 for dev in instance.disks:
3649 if not dev.iv_name in self.op.disks:
3651 for node in tgt_node, oth_node:
3652 info("checking %s on %s" % (dev.iv_name, node))
3653 cfg.SetDiskID(dev, node)
3654 if not rpc.call_blockdev_find(node, dev):
3655 raise errors.OpExecError("Can't find device %s on node %s" %
3656 (dev.iv_name, node))
3658 # Step: check other node consistency
3659 self.proc.LogStep(2, steps_total, "check peer consistency")
3660 for dev in instance.disks:
3661 if not dev.iv_name in self.op.disks:
3663 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3664 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3665 oth_node==instance.primary_node):
3666 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3667 " to replace disks on this node (%s)" %
3668 (oth_node, tgt_node))
3670 # Step: create new storage
3671 self.proc.LogStep(3, steps_total, "allocate new storage")
3672 for dev in instance.disks:
3673 if not dev.iv_name in self.op.disks:
3676 cfg.SetDiskID(dev, tgt_node)
3677 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3678 names = _GenerateUniqueNames(cfg, lv_names)
3679 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3680 logical_id=(vgname, names[0]))
3681 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3682 logical_id=(vgname, names[1]))
3683 new_lvs = [lv_data, lv_meta]
3684 old_lvs = dev.children
3685 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3686 info("creating new local storage on %s for %s" %
3687 (tgt_node, dev.iv_name))
3688 # since we *always* want to create this LV, we use the
3689 # _Create...OnPrimary (which forces the creation), even if we
3690 # are talking about the secondary node
3691 for new_lv in new_lvs:
3692 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3693 _GetInstanceInfoText(instance)):
3694 raise errors.OpExecError("Failed to create new LV named '%s' on"
3696 (new_lv.logical_id[1], tgt_node))
3698 # Step: for each lv, detach+rename*2+attach
3699 self.proc.LogStep(4, steps_total, "change drbd configuration")
3700 for dev, old_lvs, new_lvs in iv_names.itervalues():
3701 info("detaching %s drbd from local storage" % dev.iv_name)
3702 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3703 raise errors.OpExecError("Can't detach drbd from local storage on node"
3704 " %s for device %s" % (tgt_node, dev.iv_name))
3706 #cfg.Update(instance)
3708 # ok, we created the new LVs, so now we know we have the needed
3709 # storage; as such, we proceed on the target node to rename
3710 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3711 # using the assumption than logical_id == physical_id (which in
3712 # turn is the unique_id on that node)
3714 # FIXME(iustin): use a better name for the replaced LVs
3715 temp_suffix = int(time.time())
3716 ren_fn = lambda d, suff: (d.physical_id[0],
3717 d.physical_id[1] + "_replaced-%s" % suff)
3718 # build the rename list based on what LVs exist on the node
3720 for to_ren in old_lvs:
3721 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3722 if find_res is not None: # device exists
3723 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3725 info("renaming the old LVs on the target node")
3726 if not rpc.call_blockdev_rename(tgt_node, rlist):
3727 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3728 # now we rename the new LVs to the old LVs
3729 info("renaming the new LVs on the target node")
3730 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3731 if not rpc.call_blockdev_rename(tgt_node, rlist):
3732 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3734 for old, new in zip(old_lvs, new_lvs):
3735 new.logical_id = old.logical_id
3736 cfg.SetDiskID(new, tgt_node)
3738 for disk in old_lvs:
3739 disk.logical_id = ren_fn(disk, temp_suffix)
3740 cfg.SetDiskID(disk, tgt_node)
3742 # now that the new lvs have the old name, we can add them to the device
3743 info("adding new mirror component on %s" % tgt_node)
3744 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3745 for new_lv in new_lvs:
3746 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3747 warning("Can't rollback device %s", hint="manually cleanup unused"
3749 raise errors.OpExecError("Can't add local storage to drbd")
3751 dev.children = new_lvs
3752 cfg.Update(instance)
3754 # Step: wait for sync
3756 # this can fail as the old devices are degraded and _WaitForSync
3757 # does a combined result over all disks, so we don't check its
3759 self.proc.LogStep(5, steps_total, "sync devices")
3760 _WaitForSync(cfg, instance, self.proc, unlock=True)
3762 # so check manually all the devices
3763 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3764 cfg.SetDiskID(dev, instance.primary_node)
3765 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3767 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3769 # Step: remove old storage
3770 self.proc.LogStep(6, steps_total, "removing old storage")
3771 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3772 info("remove logical volumes for %s" % name)
3774 cfg.SetDiskID(lv, tgt_node)
3775 if not rpc.call_blockdev_remove(tgt_node, lv):
3776 warning("Can't remove old LV", hint="manually remove unused LVs")
3779 def _ExecD8Secondary(self, feedback_fn):
3780 """Replace the secondary node for drbd8.
3782 The algorithm for replace is quite complicated:
3783 - for all disks of the instance:
3784 - create new LVs on the new node with same names
3785 - shutdown the drbd device on the old secondary
3786 - disconnect the drbd network on the primary
3787 - create the drbd device on the new secondary
3788 - network attach the drbd on the primary, using an artifice:
3789 the drbd code for Attach() will connect to the network if it
3790 finds a device which is connected to the good local disks but
3792 - wait for sync across all devices
3793 - remove all disks from the old secondary
3795 Failures are not very well handled.
3799 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3800 instance = self.instance
3802 vgname = self.cfg.GetVGName()
3805 old_node = self.tgt_node
3806 new_node = self.new_node
3807 pri_node = instance.primary_node
3809 # Step: check device activation
3810 self.proc.LogStep(1, steps_total, "check device existence")
3811 info("checking volume groups")
3812 my_vg = cfg.GetVGName()
3813 results = rpc.call_vg_list([pri_node, new_node])
3815 raise errors.OpExecError("Can't list volume groups on the nodes")
3816 for node in pri_node, new_node:
3817 res = results.get(node, False)
3818 if not res or my_vg not in res:
3819 raise errors.OpExecError("Volume group '%s' not found on %s" %
3821 for dev in instance.disks:
3822 if not dev.iv_name in self.op.disks:
3824 info("checking %s on %s" % (dev.iv_name, pri_node))
3825 cfg.SetDiskID(dev, pri_node)
3826 if not rpc.call_blockdev_find(pri_node, dev):
3827 raise errors.OpExecError("Can't find device %s on node %s" %
3828 (dev.iv_name, pri_node))
3830 # Step: check other node consistency
3831 self.proc.LogStep(2, steps_total, "check peer consistency")
3832 for dev in instance.disks:
3833 if not dev.iv_name in self.op.disks:
3835 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3836 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3837 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3838 " unsafe to replace the secondary" %
3841 # Step: create new storage
3842 self.proc.LogStep(3, steps_total, "allocate new storage")
3843 for dev in instance.disks:
3845 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3846 # since we *always* want to create this LV, we use the
3847 # _Create...OnPrimary (which forces the creation), even if we
3848 # are talking about the secondary node
3849 for new_lv in dev.children:
3850 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3851 _GetInstanceInfoText(instance)):
3852 raise errors.OpExecError("Failed to create new LV named '%s' on"
3854 (new_lv.logical_id[1], new_node))
3856 iv_names[dev.iv_name] = (dev, dev.children)
3858 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3859 for dev in instance.disks:
3861 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3862 # create new devices on new_node
3863 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3864 logical_id=(pri_node, new_node,
3866 children=dev.children)
3867 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3869 _GetInstanceInfoText(instance)):
3870 raise errors.OpExecError("Failed to create new DRBD on"
3871 " node '%s'" % new_node)
3873 for dev in instance.disks:
3874 # we have new devices, shutdown the drbd on the old secondary
3875 info("shutting down drbd for %s on old node" % dev.iv_name)
3876 cfg.SetDiskID(dev, old_node)
3877 if not rpc.call_blockdev_shutdown(old_node, dev):
3878 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3879 hint="Please cleanup this device manually as soon as possible")
3881 info("detaching primary drbds from the network (=> standalone)")
3883 for dev in instance.disks:
3884 cfg.SetDiskID(dev, pri_node)
3885 # set the physical (unique in bdev terms) id to None, meaning
3886 # detach from network
3887 dev.physical_id = (None,) * len(dev.physical_id)
3888 # and 'find' the device, which will 'fix' it to match the
3890 if rpc.call_blockdev_find(pri_node, dev):
3893 warning("Failed to detach drbd %s from network, unusual case" %
3897 # no detaches succeeded (very unlikely)
3898 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3900 # if we managed to detach at least one, we update all the disks of
3901 # the instance to point to the new secondary
3902 info("updating instance configuration")
3903 for dev in instance.disks:
3904 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3905 cfg.SetDiskID(dev, pri_node)
3906 cfg.Update(instance)
3908 # and now perform the drbd attach
3909 info("attaching primary drbds to new secondary (standalone => connected)")
3911 for dev in instance.disks:
3912 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3913 # since the attach is smart, it's enough to 'find' the device,
3914 # it will automatically activate the network, if the physical_id
3916 cfg.SetDiskID(dev, pri_node)
3917 if not rpc.call_blockdev_find(pri_node, dev):
3918 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3919 "please do a gnt-instance info to see the status of disks")
3921 # this can fail as the old devices are degraded and _WaitForSync
3922 # does a combined result over all disks, so we don't check its
3924 self.proc.LogStep(5, steps_total, "sync devices")
3925 _WaitForSync(cfg, instance, self.proc, unlock=True)
3927 # so check manually all the devices
3928 for name, (dev, old_lvs) in iv_names.iteritems():
3929 cfg.SetDiskID(dev, pri_node)
3930 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3932 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3934 self.proc.LogStep(6, steps_total, "removing old storage")
3935 for name, (dev, old_lvs) in iv_names.iteritems():
3936 info("remove logical volumes for %s" % name)
3938 cfg.SetDiskID(lv, old_node)
3939 if not rpc.call_blockdev_remove(old_node, lv):
3940 warning("Can't remove LV on old secondary",
3941 hint="Cleanup stale volumes by hand")
3943 def Exec(self, feedback_fn):
3944 """Execute disk replacement.
3946 This dispatches the disk replacement to the appropriate handler.
3949 instance = self.instance
3950 if instance.disk_template == constants.DT_REMOTE_RAID1:
3952 elif instance.disk_template == constants.DT_DRBD8:
3953 if self.op.remote_node is None:
3954 fn = self._ExecD8DiskOnly
3956 fn = self._ExecD8Secondary
3958 raise errors.ProgrammerError("Unhandled disk replacement case")
3959 return fn(feedback_fn)
3962 class LUQueryInstanceData(NoHooksLU):
3963 """Query runtime instance data.
3966 _OP_REQP = ["instances"]
3968 def CheckPrereq(self):
3969 """Check prerequisites.
3971 This only checks the optional instance list against the existing names.
3974 if not isinstance(self.op.instances, list):
3975 raise errors.OpPrereqError("Invalid argument type 'instances'")
3976 if self.op.instances:
3977 self.wanted_instances = []
3978 names = self.op.instances
3980 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3981 if instance is None:
3982 raise errors.OpPrereqError("No such instance name '%s'" % name)
3983 self.wanted_instances.append(instance)
3985 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3986 in self.cfg.GetInstanceList()]
3990 def _ComputeDiskStatus(self, instance, snode, dev):
3991 """Compute block device status.
3994 self.cfg.SetDiskID(dev, instance.primary_node)
3995 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3996 if dev.dev_type in constants.LDS_DRBD:
3997 # we change the snode then (otherwise we use the one passed in)
3998 if dev.logical_id[0] == instance.primary_node:
3999 snode = dev.logical_id[1]
4001 snode = dev.logical_id[0]
4004 self.cfg.SetDiskID(dev, snode)
4005 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4010 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4011 for child in dev.children]
4016 "iv_name": dev.iv_name,
4017 "dev_type": dev.dev_type,
4018 "logical_id": dev.logical_id,
4019 "physical_id": dev.physical_id,
4020 "pstatus": dev_pstatus,
4021 "sstatus": dev_sstatus,
4022 "children": dev_children,
4027 def Exec(self, feedback_fn):
4028 """Gather and return data"""
4030 for instance in self.wanted_instances:
4031 remote_info = rpc.call_instance_info(instance.primary_node,
4033 if remote_info and "state" in remote_info:
4036 remote_state = "down"
4037 if instance.status == "down":
4038 config_state = "down"
4042 disks = [self._ComputeDiskStatus(instance, None, device)
4043 for device in instance.disks]
4046 "name": instance.name,
4047 "config_state": config_state,
4048 "run_state": remote_state,
4049 "pnode": instance.primary_node,
4050 "snodes": instance.secondary_nodes,
4052 "memory": instance.memory,
4053 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4055 "network_port": instance.network_port,
4056 "vcpus": instance.vcpus,
4057 "kernel_path": instance.kernel_path,
4058 "initrd_path": instance.initrd_path,
4061 result[instance.name] = idict
4066 class LUSetInstanceParms(LogicalUnit):
4067 """Modifies an instances's parameters.
4070 HPATH = "instance-modify"
4071 HTYPE = constants.HTYPE_INSTANCE
4072 _OP_REQP = ["instance_name"]
4074 def BuildHooksEnv(self):
4077 This runs on the master, primary and secondaries.
4082 args['memory'] = self.mem
4084 args['vcpus'] = self.vcpus
4085 if self.do_ip or self.do_bridge:
4089 ip = self.instance.nics[0].ip
4091 bridge = self.bridge
4093 bridge = self.instance.nics[0].bridge
4094 args['nics'] = [(ip, bridge)]
4095 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4096 nl = [self.sstore.GetMasterNode(),
4097 self.instance.primary_node] + list(self.instance.secondary_nodes)
4100 def CheckPrereq(self):
4101 """Check prerequisites.
4103 This only checks the instance list against the existing names.
4106 self.mem = getattr(self.op, "mem", None)
4107 self.vcpus = getattr(self.op, "vcpus", None)
4108 self.ip = getattr(self.op, "ip", None)
4109 self.mac = getattr(self.op, "mac", None)
4110 self.bridge = getattr(self.op, "bridge", None)
4111 self.kernel_path = getattr(self.op, "kernel_path", None)
4112 self.initrd_path = getattr(self.op, "initrd_path", None)
4113 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4114 self.kernel_path, self.initrd_path]
4115 if all_parms.count(None) == len(all_parms):
4116 raise errors.OpPrereqError("No changes submitted")
4117 if self.mem is not None:
4119 self.mem = int(self.mem)
4120 except ValueError, err:
4121 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4122 if self.vcpus is not None:
4124 self.vcpus = int(self.vcpus)
4125 except ValueError, err:
4126 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4127 if self.ip is not None:
4129 if self.ip.lower() == "none":
4132 if not utils.IsValidIP(self.ip):
4133 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4136 self.do_bridge = (self.bridge is not None)
4137 if self.mac is not None:
4138 if self.cfg.IsMacInUse(self.mac):
4139 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4141 if not utils.IsValidMac(self.mac):
4142 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4144 if self.kernel_path is not None:
4145 self.do_kernel_path = True
4146 if self.kernel_path == constants.VALUE_NONE:
4147 raise errors.OpPrereqError("Can't set instance to no kernel")
4149 if self.kernel_path != constants.VALUE_DEFAULT:
4150 if not os.path.isabs(self.kernel_path):
4151 raise errors.OpPrereError("The kernel path must be an absolute"
4154 self.do_kernel_path = False
4156 if self.initrd_path is not None:
4157 self.do_initrd_path = True
4158 if self.initrd_path not in (constants.VALUE_NONE,
4159 constants.VALUE_DEFAULT):
4160 if not os.path.isabs(self.kernel_path):
4161 raise errors.OpPrereError("The initrd path must be an absolute"
4164 self.do_initrd_path = False
4166 instance = self.cfg.GetInstanceInfo(
4167 self.cfg.ExpandInstanceName(self.op.instance_name))
4168 if instance is None:
4169 raise errors.OpPrereqError("No such instance name '%s'" %
4170 self.op.instance_name)
4171 self.op.instance_name = instance.name
4172 self.instance = instance
4175 def Exec(self, feedback_fn):
4176 """Modifies an instance.
4178 All parameters take effect only at the next restart of the instance.
4181 instance = self.instance
4183 instance.memory = self.mem
4184 result.append(("mem", self.mem))
4186 instance.vcpus = self.vcpus
4187 result.append(("vcpus", self.vcpus))
4189 instance.nics[0].ip = self.ip
4190 result.append(("ip", self.ip))
4192 instance.nics[0].bridge = self.bridge
4193 result.append(("bridge", self.bridge))
4195 instance.nics[0].mac = self.mac
4196 result.append(("mac", self.mac))
4197 if self.do_kernel_path:
4198 instance.kernel_path = self.kernel_path
4199 result.append(("kernel_path", self.kernel_path))
4200 if self.do_initrd_path:
4201 instance.initrd_path = self.initrd_path
4202 result.append(("initrd_path", self.initrd_path))
4204 self.cfg.AddInstance(instance)
4209 class LUQueryExports(NoHooksLU):
4210 """Query the exports list
4215 def CheckPrereq(self):
4216 """Check that the nodelist contains only existing nodes.
4219 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4221 def Exec(self, feedback_fn):
4222 """Compute the list of all the exported system images.
4225 a dictionary with the structure node->(export-list)
4226 where export-list is a list of the instances exported on
4230 return rpc.call_export_list(self.nodes)
4233 class LUExportInstance(LogicalUnit):
4234 """Export an instance to an image in the cluster.
4237 HPATH = "instance-export"
4238 HTYPE = constants.HTYPE_INSTANCE
4239 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4241 def BuildHooksEnv(self):
4244 This will run on the master, primary node and target node.
4248 "EXPORT_NODE": self.op.target_node,
4249 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4251 env.update(_BuildInstanceHookEnvByObject(self.instance))
4252 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4253 self.op.target_node]
4256 def CheckPrereq(self):
4257 """Check prerequisites.
4259 This checks that the instance name is a valid one.
4262 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4263 self.instance = self.cfg.GetInstanceInfo(instance_name)
4264 if self.instance is None:
4265 raise errors.OpPrereqError("Instance '%s' not found" %
4266 self.op.instance_name)
4269 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4270 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4272 if self.dst_node is None:
4273 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4274 self.op.target_node)
4275 self.op.target_node = self.dst_node.name
4277 def Exec(self, feedback_fn):
4278 """Export an instance to an image in the cluster.
4281 instance = self.instance
4282 dst_node = self.dst_node
4283 src_node = instance.primary_node
4284 # shutdown the instance, unless requested not to do so
4285 if self.op.shutdown:
4286 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4287 self.proc.ChainOpCode(op)
4289 vgname = self.cfg.GetVGName()
4294 for disk in instance.disks:
4295 if disk.iv_name == "sda":
4296 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4297 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4299 if not new_dev_name:
4300 logger.Error("could not snapshot block device %s on node %s" %
4301 (disk.logical_id[1], src_node))
4303 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4304 logical_id=(vgname, new_dev_name),
4305 physical_id=(vgname, new_dev_name),
4306 iv_name=disk.iv_name)
4307 snap_disks.append(new_dev)
4310 if self.op.shutdown:
4311 op = opcodes.OpStartupInstance(instance_name=instance.name,
4313 self.proc.ChainOpCode(op)
4315 # TODO: check for size
4317 for dev in snap_disks:
4318 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4320 logger.Error("could not export block device %s from node"
4322 (dev.logical_id[1], src_node, dst_node.name))
4323 if not rpc.call_blockdev_remove(src_node, dev):
4324 logger.Error("could not remove snapshot block device %s from"
4325 " node %s" % (dev.logical_id[1], src_node))
4327 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4328 logger.Error("could not finalize export for instance %s on node %s" %
4329 (instance.name, dst_node.name))
4331 nodelist = self.cfg.GetNodeList()
4332 nodelist.remove(dst_node.name)
4334 # on one-node clusters nodelist will be empty after the removal
4335 # if we proceed the backup would be removed because OpQueryExports
4336 # substitutes an empty list with the full cluster node list.
4338 op = opcodes.OpQueryExports(nodes=nodelist)
4339 exportlist = self.proc.ChainOpCode(op)
4340 for node in exportlist:
4341 if instance.name in exportlist[node]:
4342 if not rpc.call_export_remove(node, instance.name):
4343 logger.Error("could not remove older export for instance %s"
4344 " on node %s" % (instance.name, node))
4347 class TagsLU(NoHooksLU):
4350 This is an abstract class which is the parent of all the other tags LUs.
4353 def CheckPrereq(self):
4354 """Check prerequisites.
4357 if self.op.kind == constants.TAG_CLUSTER:
4358 self.target = self.cfg.GetClusterInfo()
4359 elif self.op.kind == constants.TAG_NODE:
4360 name = self.cfg.ExpandNodeName(self.op.name)
4362 raise errors.OpPrereqError("Invalid node name (%s)" %
4365 self.target = self.cfg.GetNodeInfo(name)
4366 elif self.op.kind == constants.TAG_INSTANCE:
4367 name = self.cfg.ExpandInstanceName(self.op.name)
4369 raise errors.OpPrereqError("Invalid instance name (%s)" %
4372 self.target = self.cfg.GetInstanceInfo(name)
4374 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4378 class LUGetTags(TagsLU):
4379 """Returns the tags of a given object.
4382 _OP_REQP = ["kind", "name"]
4384 def Exec(self, feedback_fn):
4385 """Returns the tag list.
4388 return self.target.GetTags()
4391 class LUSearchTags(NoHooksLU):
4392 """Searches the tags for a given pattern.
4395 _OP_REQP = ["pattern"]
4397 def CheckPrereq(self):
4398 """Check prerequisites.
4400 This checks the pattern passed for validity by compiling it.
4404 self.re = re.compile(self.op.pattern)
4405 except re.error, err:
4406 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4407 (self.op.pattern, err))
4409 def Exec(self, feedback_fn):
4410 """Returns the tag list.
4414 tgts = [("/cluster", cfg.GetClusterInfo())]
4415 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4416 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4417 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4418 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4420 for path, target in tgts:
4421 for tag in target.GetTags():
4422 if self.re.search(tag):
4423 results.append((path, tag))
4427 class LUAddTags(TagsLU):
4428 """Sets a tag on a given object.
4431 _OP_REQP = ["kind", "name", "tags"]
4433 def CheckPrereq(self):
4434 """Check prerequisites.
4436 This checks the type and length of the tag name and value.
4439 TagsLU.CheckPrereq(self)
4440 for tag in self.op.tags:
4441 objects.TaggableObject.ValidateTag(tag)
4443 def Exec(self, feedback_fn):
4448 for tag in self.op.tags:
4449 self.target.AddTag(tag)
4450 except errors.TagError, err:
4451 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4453 self.cfg.Update(self.target)
4454 except errors.ConfigurationError:
4455 raise errors.OpRetryError("There has been a modification to the"
4456 " config file and the operation has been"
4457 " aborted. Please retry.")
4460 class LUDelTags(TagsLU):
4461 """Delete a list of tags from a given object.
4464 _OP_REQP = ["kind", "name", "tags"]
4466 def CheckPrereq(self):
4467 """Check prerequisites.
4469 This checks that we have the given tag.
4472 TagsLU.CheckPrereq(self)
4473 for tag in self.op.tags:
4474 objects.TaggableObject.ValidateTag(tag)
4475 del_tags = frozenset(self.op.tags)
4476 cur_tags = self.target.GetTags()
4477 if not del_tags <= cur_tags:
4478 diff_tags = del_tags - cur_tags
4479 diff_names = ["'%s'" % tag for tag in diff_tags]
4481 raise errors.OpPrereqError("Tag(s) %s not found" %
4482 (",".join(diff_names)))
4484 def Exec(self, feedback_fn):
4485 """Remove the tag from the object.
4488 for tag in self.op.tags:
4489 self.target.RemoveTag(tag)
4491 self.cfg.Update(self.target)
4492 except errors.ConfigurationError:
4493 raise errors.OpRetryError("There has been a modification to the"
4494 " config file and the operation has been"
4495 " aborted. Please retry.")