4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
278 env["INSTANCE_NIC_COUNT"] = nic_count
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284 """Builds instance related env variables for hooks from an object.
287 instance: objects.Instance object of instance
288 override: dict of values to override
291 'name': instance.name,
292 'primary_node': instance.primary_node,
293 'secondary_nodes': instance.secondary_nodes,
294 'os_type': instance.os,
295 'status': instance.os,
296 'memory': instance.memory,
297 'vcpus': instance.vcpus,
298 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
301 args.update(override)
302 return _BuildInstanceHookEnv(**args)
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306 """Ensure a node has a correct known_hosts entry.
309 fullnode - Fully qualified domain name of host. (str)
310 ip - IPv4 address of host (str)
311 pubkey - the public key of the cluster
314 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
326 logger.Debug('read %s' % (repr(rawline),))
328 parts = rawline.rstrip('\r\n').split()
330 # Ignore unwanted lines
331 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332 fields = parts[0].split(',')
337 for spec in [ ip, fullnode ]:
338 if spec not in fields:
343 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344 if haveall and key == pubkey:
346 save_lines.append(rawline)
347 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350 if havesome and (not haveall or key != pubkey):
352 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355 save_lines.append(rawline)
358 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362 save_lines = save_lines + add_lines
364 # Write a new file and replace old.
365 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367 newfile = os.fdopen(fd, 'w')
369 newfile.write(''.join(save_lines))
372 logger.Debug("Wrote new known_hosts.")
373 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376 # Simply appending a new line will do the trick.
378 for add in add_lines:
384 def _HasValidVG(vglist, vgname):
385 """Checks if the volume group list is valid.
387 A non-None return value means there's an error, and the return value
388 is the error message.
391 vgsize = vglist.get(vgname, None)
393 return "volume group '%s' missing" % vgname
395 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
400 def _InitSSHSetup(node):
401 """Setup the SSH configuration for the cluster.
404 This generates a dsa keypair for root, adds the pub key to the
405 permitted hosts and adds the hostkey to its own known hosts.
408 node: the name of this host as a fqdn
411 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413 for name in priv_key, pub_key:
414 if os.path.exists(name):
415 utils.CreateBackup(name)
416 utils.RemoveFile(name)
418 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425 f = open(pub_key, 'r')
427 utils.AddAuthorizedKey(auth_keys, f.read(8192))
432 def _InitGanetiServerSetup(ss):
433 """Setup the necessary configuration for the initial node daemon.
435 This creates the nodepass file containing the shared password for
436 the cluster and also generates the SSL certificate.
439 # Create pseudo random password
440 randpass = sha.new(os.urandom(64)).hexdigest()
441 # and write it into sstore
442 ss.SetKey(ss.SS_NODED_PASS, randpass)
444 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445 "-days", str(365*5), "-nodes", "-x509",
446 "-keyout", constants.SSL_CERT_FILE,
447 "-out", constants.SSL_CERT_FILE, "-batch"])
449 raise errors.OpExecError("could not generate server ssl cert, command"
450 " %s had exitcode %s and error message %s" %
451 (result.cmd, result.exit_code, result.output))
453 os.chmod(constants.SSL_CERT_FILE, 0400)
455 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458 raise errors.OpExecError("Could not start the node daemon, command %s"
459 " had exitcode %s and error %s" %
460 (result.cmd, result.exit_code, result.output))
463 def _CheckInstanceBridgesExist(instance):
464 """Check that the brigdes needed by an instance exist.
467 # check bridges existance
468 brlist = [nic.bridge for nic in instance.nics]
469 if not rpc.call_bridges_exist(instance.primary_node, brlist):
470 raise errors.OpPrereqError("one or more target bridges %s does not"
471 " exist on destination node '%s'" %
472 (brlist, instance.primary_node))
475 class LUInitCluster(LogicalUnit):
476 """Initialise the cluster.
479 HPATH = "cluster-init"
480 HTYPE = constants.HTYPE_CLUSTER
481 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482 "def_bridge", "master_netdev"]
485 def BuildHooksEnv(self):
488 Notes: Since we don't require a cluster, we must manually add
489 ourselves in the post-run node list.
492 env = {"OP_TARGET": self.op.cluster_name}
493 return env, [], [self.hostname.name]
495 def CheckPrereq(self):
496 """Verify that the passed name is a valid one.
499 if config.ConfigWriter.IsCluster():
500 raise errors.OpPrereqError("Cluster is already initialised")
502 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503 if not os.path.exists(constants.VNC_PASSWORD_FILE):
504 raise errors.OpPrereqError("Please prepare the cluster VNC"
506 constants.VNC_PASSWORD_FILE)
508 self.hostname = hostname = utils.HostInfo()
510 if hostname.ip.startswith("127."):
511 raise errors.OpPrereqError("This host's IP resolves to the private"
512 " range (%s). Please fix DNS or /etc/hosts." %
515 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518 constants.DEFAULT_NODED_PORT):
519 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520 " to %s,\nbut this ip address does not"
521 " belong to this host."
522 " Aborting." % hostname.ip)
524 secondary_ip = getattr(self.op, "secondary_ip", None)
525 if secondary_ip and not utils.IsValidIP(secondary_ip):
526 raise errors.OpPrereqError("Invalid secondary ip given")
528 secondary_ip != hostname.ip and
529 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530 constants.DEFAULT_NODED_PORT))):
531 raise errors.OpPrereqError("You gave %s as secondary IP,"
532 " but it does not belong to this host." %
534 self.secondary_ip = secondary_ip
536 # checks presence of the volume group given
537 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540 raise errors.OpPrereqError("Error: %s" % vgstatus)
542 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547 if self.op.hypervisor_type not in constants.HYPER_TYPES:
548 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549 self.op.hypervisor_type)
551 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554 (self.op.master_netdev,
555 result.output.strip()))
557 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559 raise errors.OpPrereqError("Init.d script '%s' missing or not"
560 " executable." % constants.NODE_INITD_SCRIPT)
562 def Exec(self, feedback_fn):
563 """Initialize the cluster.
566 clustername = self.clustername
567 hostname = self.hostname
569 # set up the simple store
570 self.sstore = ss = ssconf.SimpleStore()
571 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577 # set up the inter-node password and certificate
578 _InitGanetiServerSetup(ss)
580 # start the master ip
581 rpc.call_node_start_master(hostname.name)
583 # set up ssh config and /etc/hosts
584 f = open(constants.SSH_HOST_RSA_PUB, 'r')
589 sshkey = sshline.split(" ")[1]
591 _AddHostToEtcHosts(hostname.name)
593 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595 _InitSSHSetup(hostname.name)
597 # init of cluster config file
598 self.cfg = cfgw = config.ConfigWriter()
599 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600 sshkey, self.op.mac_prefix,
601 self.op.vg_name, self.op.def_bridge)
604 class LUDestroyCluster(NoHooksLU):
605 """Logical unit for destroying the cluster.
610 def CheckPrereq(self):
611 """Check prerequisites.
613 This checks whether the cluster is empty.
615 Any errors are signalled by raising errors.OpPrereqError.
618 master = self.sstore.GetMasterNode()
620 nodelist = self.cfg.GetNodeList()
621 if len(nodelist) != 1 or nodelist[0] != master:
622 raise errors.OpPrereqError("There are still %d node(s) in"
623 " this cluster." % (len(nodelist) - 1))
624 instancelist = self.cfg.GetInstanceList()
626 raise errors.OpPrereqError("There are still %d instance(s) in"
627 " this cluster." % len(instancelist))
629 def Exec(self, feedback_fn):
630 """Destroys the cluster.
633 master = self.sstore.GetMasterNode()
634 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635 utils.CreateBackup(priv_key)
636 utils.CreateBackup(pub_key)
637 rpc.call_node_leave_cluster(master)
640 class LUVerifyCluster(NoHooksLU):
641 """Verifies the cluster status.
646 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647 remote_version, feedback_fn):
648 """Run multiple tests against a node.
651 - compares ganeti version
652 - checks vg existance and size > 20G
653 - checks config file checksum
654 - checks ssh to other nodes
657 node: name of the node to check
658 file_list: required list of files
659 local_cksum: dictionary of local files and their checksums
662 # compares ganeti version
663 local_version = constants.PROTOCOL_VERSION
664 if not remote_version:
665 feedback_fn(" - ERROR: connection to %s failed" % (node))
668 if local_version != remote_version:
669 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
670 (local_version, node, remote_version))
673 # checks vg existance and size > 20G
677 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
681 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
686 # checks config file checksum
689 if 'filelist' not in node_result:
691 feedback_fn(" - ERROR: node hasn't returned file checksum data")
693 remote_cksum = node_result['filelist']
694 for file_name in file_list:
695 if file_name not in remote_cksum:
697 feedback_fn(" - ERROR: file '%s' missing" % file_name)
698 elif remote_cksum[file_name] != local_cksum[file_name]:
700 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
702 if 'nodelist' not in node_result:
704 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
706 if node_result['nodelist']:
708 for node in node_result['nodelist']:
709 feedback_fn(" - ERROR: communication with node '%s': %s" %
710 (node, node_result['nodelist'][node]))
711 hyp_result = node_result.get('hypervisor', None)
712 if hyp_result is not None:
713 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
716 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717 """Verify an instance.
719 This function checks to see if the required block devices are
720 available on the instance's node.
725 instancelist = self.cfg.GetInstanceList()
726 if not instance in instancelist:
727 feedback_fn(" - ERROR: instance %s not in instance list %s" %
728 (instance, instancelist))
731 instanceconfig = self.cfg.GetInstanceInfo(instance)
732 node_current = instanceconfig.primary_node
735 instanceconfig.MapLVsByNode(node_vol_should)
737 for node in node_vol_should:
738 for volume in node_vol_should[node]:
739 if node not in node_vol_is or volume not in node_vol_is[node]:
740 feedback_fn(" - ERROR: volume %s missing on node %s" %
744 if not instanceconfig.status == 'down':
745 if not instance in node_instance[node_current]:
746 feedback_fn(" - ERROR: instance %s not running on node %s" %
747 (instance, node_current))
750 for node in node_instance:
751 if (not node == node_current):
752 if instance in node_instance[node]:
753 feedback_fn(" - ERROR: instance %s should not run on node %s" %
759 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760 """Verify if there are any unknown volumes in the cluster.
762 The .os, .swap and backup volumes are ignored. All other volumes are
768 for node in node_vol_is:
769 for volume in node_vol_is[node]:
770 if node not in node_vol_should or volume not in node_vol_should[node]:
771 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
776 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777 """Verify the list of running instances.
779 This checks what instances are running but unknown to the cluster.
783 for node in node_instance:
784 for runninginstance in node_instance[node]:
785 if runninginstance not in instancelist:
786 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
787 (runninginstance, node))
791 def CheckPrereq(self):
792 """Check prerequisites.
794 This has no prerequisites.
799 def Exec(self, feedback_fn):
800 """Verify integrity of cluster, performing various test on nodes.
804 feedback_fn("* Verifying global settings")
805 for msg in self.cfg.VerifyConfig():
806 feedback_fn(" - ERROR: %s" % msg)
808 vg_name = self.cfg.GetVGName()
809 nodelist = utils.NiceSort(self.cfg.GetNodeList())
810 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
814 # FIXME: verify OS list
816 file_names = list(self.sstore.GetFileList())
817 file_names.append(constants.SSL_CERT_FILE)
818 file_names.append(constants.CLUSTER_CONF_FILE)
819 local_checksums = utils.FingerprintFiles(file_names)
821 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823 all_instanceinfo = rpc.call_instance_list(nodelist)
824 all_vglist = rpc.call_vg_list(nodelist)
825 node_verify_param = {
826 'filelist': file_names,
827 'nodelist': nodelist,
830 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831 all_rversion = rpc.call_version(nodelist)
833 for node in nodelist:
834 feedback_fn("* Verifying node %s" % node)
835 result = self._VerifyNode(node, file_names, local_checksums,
836 all_vglist[node], all_nvinfo[node],
837 all_rversion[node], feedback_fn)
841 volumeinfo = all_volumeinfo[node]
843 if isinstance(volumeinfo, basestring):
844 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
845 (node, volumeinfo[-400:].encode('string_escape')))
847 node_volume[node] = {}
848 elif not isinstance(volumeinfo, dict):
849 feedback_fn(" - ERROR: connection to %s failed" % (node,))
853 node_volume[node] = volumeinfo
856 nodeinstance = all_instanceinfo[node]
857 if type(nodeinstance) != list:
858 feedback_fn(" - ERROR: connection to %s failed" % (node,))
862 node_instance[node] = nodeinstance
866 for instance in instancelist:
867 feedback_fn("* Verifying instance %s" % instance)
868 result = self._VerifyInstance(instance, node_volume, node_instance,
872 inst_config = self.cfg.GetInstanceInfo(instance)
874 inst_config.MapLVsByNode(node_vol_should)
876 feedback_fn("* Verifying orphan volumes")
877 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
881 feedback_fn("* Verifying remaining instances")
882 result = self._VerifyOrphanInstances(instancelist, node_instance,
889 class LUVerifyDisks(NoHooksLU):
890 """Verifies the cluster disks status.
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This has no prerequisites.
903 def Exec(self, feedback_fn):
904 """Verify integrity of cluster disks.
907 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909 vg_name = self.cfg.GetVGName()
910 nodes = utils.NiceSort(self.cfg.GetNodeList())
911 instances = [self.cfg.GetInstanceInfo(name)
912 for name in self.cfg.GetInstanceList()]
915 for inst in instances:
917 if (inst.status != "up" or
918 inst.disk_template not in constants.DTS_NET_MIRROR):
920 inst.MapLVsByNode(inst_lvs)
921 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922 for node, vol_list in inst_lvs.iteritems():
924 nv_dict[(node, vol)] = inst
929 node_lvs = rpc.call_volume_list(nodes, vg_name)
936 if isinstance(lvs, basestring):
937 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939 elif not isinstance(lvs, dict):
940 logger.Info("connection to node %s failed or invalid data returned" %
942 res_nodes.append(node)
945 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946 inst = nv_dict.pop((node, lv_name), None)
947 if (not lv_online and inst is not None
948 and inst.name not in res_instances):
949 res_instances.append(inst.name)
951 # any leftover items in nv_dict are missing LVs, let's arrange the
953 for key, inst in nv_dict.iteritems():
954 if inst.name not in res_missing:
955 res_missing[inst.name] = []
956 res_missing[inst.name].append(key)
961 class LURenameCluster(LogicalUnit):
962 """Rename the cluster.
965 HPATH = "cluster-rename"
966 HTYPE = constants.HTYPE_CLUSTER
969 def BuildHooksEnv(self):
974 "OP_TARGET": self.op.sstore.GetClusterName(),
975 "NEW_NAME": self.op.name,
977 mn = self.sstore.GetMasterNode()
978 return env, [mn], [mn]
980 def CheckPrereq(self):
981 """Verify that the passed name is a valid one.
984 hostname = utils.HostInfo(self.op.name)
986 new_name = hostname.name
987 self.ip = new_ip = hostname.ip
988 old_name = self.sstore.GetClusterName()
989 old_ip = self.sstore.GetMasterIP()
990 if new_name == old_name and new_ip == old_ip:
991 raise errors.OpPrereqError("Neither the name nor the IP address of the"
992 " cluster has changed")
994 result = utils.RunCmd(["fping", "-q", new_ip])
995 if not result.failed:
996 raise errors.OpPrereqError("The given cluster IP address (%s) is"
997 " reachable on the network. Aborting." %
1000 self.op.name = new_name
1002 def Exec(self, feedback_fn):
1003 """Rename the cluster.
1006 clustername = self.op.name
1010 # shutdown the master IP
1011 master = ss.GetMasterNode()
1012 if not rpc.call_node_stop_master(master):
1013 raise errors.OpExecError("Could not disable the master role")
1017 ss.SetKey(ss.SS_MASTER_IP, ip)
1018 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020 # Distribute updated ss config to all nodes
1021 myself = self.cfg.GetNodeInfo(master)
1022 dist_nodes = self.cfg.GetNodeList()
1023 if myself.name in dist_nodes:
1024 dist_nodes.remove(myself.name)
1026 logger.Debug("Copying updated ssconf data to all nodes")
1027 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028 fname = ss.KeyToFilename(keyname)
1029 result = rpc.call_upload_file(dist_nodes, fname)
1030 for to_node in dist_nodes:
1031 if not result[to_node]:
1032 logger.Error("copy of file %s to node %s failed" %
1035 if not rpc.call_node_start_master(master):
1036 logger.Error("Could not re-enable the master role on the master,"
1037 " please restart manually.")
1040 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041 """Sleep and poll for an instance's disk to sync.
1044 if not instance.disks:
1048 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050 node = instance.primary_node
1052 for dev in instance.disks:
1053 cfgw.SetDiskID(dev, node)
1059 cumul_degraded = False
1060 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062 proc.LogWarning("Can't get any data from node %s" % node)
1065 raise errors.RemoteError("Can't contact node %s for mirror data,"
1066 " aborting." % node)
1070 for i in range(len(rstats)):
1073 proc.LogWarning("Can't compute data for node %s/%s" %
1074 (node, instance.disks[i].iv_name))
1076 # we ignore the ldisk parameter
1077 perc_done, est_time, is_degraded, _ = mstat
1078 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079 if perc_done is not None:
1081 if est_time is not None:
1082 rem_time = "%d estimated seconds remaining" % est_time
1085 rem_time = "no time estimate"
1086 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087 (instance.disks[i].iv_name, perc_done, rem_time))
1094 time.sleep(min(60, max_time))
1100 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101 return not cumul_degraded
1104 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105 """Check that mirrors are not degraded.
1107 The ldisk parameter, if True, will change the test from the
1108 is_degraded attribute (which represents overall non-ok status for
1109 the device(s)) to the ldisk (representing the local storage status).
1112 cfgw.SetDiskID(dev, node)
1119 if on_primary or dev.AssembleOnSecondary():
1120 rstats = rpc.call_blockdev_find(node, dev)
1122 logger.ToStderr("Can't get any data from node %s" % node)
1125 result = result and (not rstats[idx])
1127 for child in dev.children:
1128 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133 class LUDiagnoseOS(NoHooksLU):
1134 """Logical unit for OS diagnose/query.
1139 def CheckPrereq(self):
1140 """Check prerequisites.
1142 This always succeeds, since this is a pure query LU.
1147 def Exec(self, feedback_fn):
1148 """Compute the list of OSes.
1151 node_list = self.cfg.GetNodeList()
1152 node_data = rpc.call_os_diagnose(node_list)
1153 if node_data == False:
1154 raise errors.OpExecError("Can't gather the list of OSes")
1158 class LURemoveNode(LogicalUnit):
1159 """Logical unit for removing a node.
1162 HPATH = "node-remove"
1163 HTYPE = constants.HTYPE_NODE
1164 _OP_REQP = ["node_name"]
1166 def BuildHooksEnv(self):
1169 This doesn't run on the target node in the pre phase as a failed
1170 node would not allows itself to run.
1174 "OP_TARGET": self.op.node_name,
1175 "NODE_NAME": self.op.node_name,
1177 all_nodes = self.cfg.GetNodeList()
1178 all_nodes.remove(self.op.node_name)
1179 return env, all_nodes, all_nodes
1181 def CheckPrereq(self):
1182 """Check prerequisites.
1185 - the node exists in the configuration
1186 - it does not have primary or secondary instances
1187 - it's not the master
1189 Any errors are signalled by raising errors.OpPrereqError.
1192 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196 instance_list = self.cfg.GetInstanceList()
1198 masternode = self.sstore.GetMasterNode()
1199 if node.name == masternode:
1200 raise errors.OpPrereqError("Node is the master node,"
1201 " you need to failover first.")
1203 for instance_name in instance_list:
1204 instance = self.cfg.GetInstanceInfo(instance_name)
1205 if node.name == instance.primary_node:
1206 raise errors.OpPrereqError("Instance %s still running on the node,"
1207 " please remove first." % instance_name)
1208 if node.name in instance.secondary_nodes:
1209 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210 " please remove first." % instance_name)
1211 self.op.node_name = node.name
1214 def Exec(self, feedback_fn):
1215 """Removes the node from the cluster.
1219 logger.Info("stopping the node daemon and removing configs from node %s" %
1222 rpc.call_node_leave_cluster(node.name)
1224 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226 logger.Info("Removing node %s from config" % node.name)
1228 self.cfg.RemoveNode(node.name)
1230 _RemoveHostFromEtcHosts(node.name)
1233 class LUQueryNodes(NoHooksLU):
1234 """Logical unit for querying nodes.
1237 _OP_REQP = ["output_fields", "names"]
1239 def CheckPrereq(self):
1240 """Check prerequisites.
1242 This checks that the fields required are valid output fields.
1245 self.dynamic_fields = frozenset(["dtotal", "dfree",
1246 "mtotal", "mnode", "mfree",
1249 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250 "pinst_list", "sinst_list",
1252 dynamic=self.dynamic_fields,
1253 selected=self.op.output_fields)
1255 self.wanted = _GetWantedNodes(self, self.op.names)
1257 def Exec(self, feedback_fn):
1258 """Computes the list of nodes and their attributes.
1261 nodenames = self.wanted
1262 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264 # begin data gathering
1266 if self.dynamic_fields.intersection(self.op.output_fields):
1268 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269 for name in nodenames:
1270 nodeinfo = node_data.get(name, None)
1273 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278 "bootid": nodeinfo['bootid'],
1281 live_data[name] = {}
1283 live_data = dict.fromkeys(nodenames, {})
1285 node_to_primary = dict([(name, set()) for name in nodenames])
1286 node_to_secondary = dict([(name, set()) for name in nodenames])
1288 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289 "sinst_cnt", "sinst_list"))
1290 if inst_fields & frozenset(self.op.output_fields):
1291 instancelist = self.cfg.GetInstanceList()
1293 for instance_name in instancelist:
1294 inst = self.cfg.GetInstanceInfo(instance_name)
1295 if inst.primary_node in node_to_primary:
1296 node_to_primary[inst.primary_node].add(inst.name)
1297 for secnode in inst.secondary_nodes:
1298 if secnode in node_to_secondary:
1299 node_to_secondary[secnode].add(inst.name)
1301 # end data gathering
1304 for node in nodelist:
1306 for field in self.op.output_fields:
1309 elif field == "pinst_list":
1310 val = list(node_to_primary[node.name])
1311 elif field == "sinst_list":
1312 val = list(node_to_secondary[node.name])
1313 elif field == "pinst_cnt":
1314 val = len(node_to_primary[node.name])
1315 elif field == "sinst_cnt":
1316 val = len(node_to_secondary[node.name])
1317 elif field == "pip":
1318 val = node.primary_ip
1319 elif field == "sip":
1320 val = node.secondary_ip
1321 elif field in self.dynamic_fields:
1322 val = live_data[node.name].get(field, None)
1324 raise errors.ParameterError(field)
1325 node_output.append(val)
1326 output.append(node_output)
1331 class LUQueryNodeVolumes(NoHooksLU):
1332 """Logical unit for getting volumes on node(s).
1335 _OP_REQP = ["nodes", "output_fields"]
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1340 This checks that the fields required are valid output fields.
1343 self.nodes = _GetWantedNodes(self, self.op.nodes)
1345 _CheckOutputFields(static=["node"],
1346 dynamic=["phys", "vg", "name", "size", "instance"],
1347 selected=self.op.output_fields)
1350 def Exec(self, feedback_fn):
1351 """Computes the list of nodes and their attributes.
1354 nodenames = self.nodes
1355 volumes = rpc.call_node_volumes(nodenames)
1357 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358 in self.cfg.GetInstanceList()]
1360 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1363 for node in nodenames:
1364 if node not in volumes or not volumes[node]:
1367 node_vols = volumes[node][:]
1368 node_vols.sort(key=lambda vol: vol['dev'])
1370 for vol in node_vols:
1372 for field in self.op.output_fields:
1375 elif field == "phys":
1379 elif field == "name":
1381 elif field == "size":
1382 val = int(float(vol['size']))
1383 elif field == "instance":
1385 if node not in lv_by_node[inst]:
1387 if vol['name'] in lv_by_node[inst][node]:
1393 raise errors.ParameterError(field)
1394 node_output.append(str(val))
1396 output.append(node_output)
1401 class LUAddNode(LogicalUnit):
1402 """Logical unit for adding node to the cluster.
1406 HTYPE = constants.HTYPE_NODE
1407 _OP_REQP = ["node_name"]
1409 def BuildHooksEnv(self):
1412 This will run on all nodes before, and on all nodes + the new node after.
1416 "OP_TARGET": self.op.node_name,
1417 "NODE_NAME": self.op.node_name,
1418 "NODE_PIP": self.op.primary_ip,
1419 "NODE_SIP": self.op.secondary_ip,
1421 nodes_0 = self.cfg.GetNodeList()
1422 nodes_1 = nodes_0 + [self.op.node_name, ]
1423 return env, nodes_0, nodes_1
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the new node is not already in the config
1431 - its parameters (single/dual homed) matches the cluster
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node_name = self.op.node_name
1439 dns_data = utils.HostInfo(node_name)
1441 node = dns_data.name
1442 primary_ip = self.op.primary_ip = dns_data.ip
1443 secondary_ip = getattr(self.op, "secondary_ip", None)
1444 if secondary_ip is None:
1445 secondary_ip = primary_ip
1446 if not utils.IsValidIP(secondary_ip):
1447 raise errors.OpPrereqError("Invalid secondary IP given")
1448 self.op.secondary_ip = secondary_ip
1449 node_list = cfg.GetNodeList()
1450 if node in node_list:
1451 raise errors.OpPrereqError("Node %s is already in the configuration"
1454 for existing_node_name in node_list:
1455 existing_node = cfg.GetNodeInfo(existing_node_name)
1456 if (existing_node.primary_ip == primary_ip or
1457 existing_node.secondary_ip == primary_ip or
1458 existing_node.primary_ip == secondary_ip or
1459 existing_node.secondary_ip == secondary_ip):
1460 raise errors.OpPrereqError("New node ip address(es) conflict with"
1461 " existing node %s" % existing_node.name)
1463 # check that the type of the node (single versus dual homed) is the
1464 # same as for the master
1465 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466 master_singlehomed = myself.secondary_ip == myself.primary_ip
1467 newbie_singlehomed = secondary_ip == primary_ip
1468 if master_singlehomed != newbie_singlehomed:
1469 if master_singlehomed:
1470 raise errors.OpPrereqError("The master has no private ip but the"
1471 " new node has one")
1473 raise errors.OpPrereqError("The master has a private ip but the"
1474 " new node doesn't have one")
1476 # checks reachablity
1477 if not utils.TcpPing(utils.HostInfo().name,
1479 constants.DEFAULT_NODED_PORT):
1480 raise errors.OpPrereqError("Node not reachable by ping")
1482 if not newbie_singlehomed:
1483 # check reachability from my secondary ip to newbie's secondary ip
1484 if not utils.TcpPing(myself.secondary_ip,
1486 constants.DEFAULT_NODED_PORT):
1487 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488 " based ping to noded port")
1490 self.new_node = objects.Node(name=node,
1491 primary_ip=primary_ip,
1492 secondary_ip=secondary_ip)
1494 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497 constants.VNC_PASSWORD_FILE)
1499 def Exec(self, feedback_fn):
1500 """Adds the new node to the cluster.
1503 new_node = self.new_node
1504 node = new_node.name
1506 # set up inter-node password and certificate and restarts the node daemon
1507 gntpass = self.sstore.GetNodeDaemonPassword()
1508 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509 raise errors.OpExecError("ganeti password corruption detected")
1510 f = open(constants.SSL_CERT_FILE)
1512 gntpem = f.read(8192)
1515 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516 # so we use this to detect an invalid certificate; as long as the
1517 # cert doesn't contain this, the here-document will be correctly
1518 # parsed by the shell sequence below
1519 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521 if not gntpem.endswith("\n"):
1522 raise errors.OpExecError("PEM must end with newline")
1523 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525 # and then connect with ssh to set password and start ganeti-noded
1526 # note that all the below variables are sanitized at this point,
1527 # either by being constants or by the checks above
1529 mycommand = ("umask 077 && "
1530 "echo '%s' > '%s' && "
1531 "cat > '%s' << '!EOF.' && \n"
1532 "%s!EOF.\n%s restart" %
1533 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534 constants.SSL_CERT_FILE, gntpem,
1535 constants.NODE_INITD_SCRIPT))
1537 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539 raise errors.OpExecError("Remote command on node %s, error: %s,"
1541 (node, result.fail_reason, result.output))
1543 # check connectivity
1546 result = rpc.call_version([node])[node]
1548 if constants.PROTOCOL_VERSION == result:
1549 logger.Info("communication to node %s fine, sw version %s match" %
1552 raise errors.OpExecError("Version mismatch master version %s,"
1553 " node version %s" %
1554 (constants.PROTOCOL_VERSION, result))
1556 raise errors.OpExecError("Cannot get version from the new node")
1559 logger.Info("copy ssh key to node %s" % node)
1560 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1569 keyarray.append(f.read())
1573 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574 keyarray[3], keyarray[4], keyarray[5])
1577 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579 # Add node to our /etc/hosts, and add key to known_hosts
1580 _AddHostToEtcHosts(new_node.name)
1582 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583 self.cfg.GetHostKey())
1585 if new_node.secondary_ip != new_node.primary_ip:
1586 if not rpc.call_node_tcp_ping(new_node.name,
1587 constants.LOCALHOST_IP_ADDRESS,
1588 new_node.secondary_ip,
1589 constants.DEFAULT_NODED_PORT,
1591 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592 " you gave (%s). Please fix and re-run this"
1593 " command." % new_node.secondary_ip)
1595 success, msg = ssh.VerifyNodeHostname(node)
1597 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598 " than the one the resolver gives: %s."
1599 " Please fix and re-run this command." %
1602 # Distribute updated /etc/hosts and known_hosts to all nodes,
1603 # including the node just added
1604 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605 dist_nodes = self.cfg.GetNodeList() + [node]
1606 if myself.name in dist_nodes:
1607 dist_nodes.remove(myself.name)
1609 logger.Debug("Copying hosts and known_hosts to all nodes")
1610 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611 result = rpc.call_upload_file(dist_nodes, fname)
1612 for to_node in dist_nodes:
1613 if not result[to_node]:
1614 logger.Error("copy of file %s to node %s failed" %
1617 to_copy = ss.GetFileList()
1618 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619 to_copy.append(constants.VNC_PASSWORD_FILE)
1620 for fname in to_copy:
1621 if not ssh.CopyFileToNode(node, fname):
1622 logger.Error("could not copy file %s to node %s" % (fname, node))
1624 logger.Info("adding node %s to cluster.conf" % node)
1625 self.cfg.AddNode(new_node)
1628 class LUMasterFailover(LogicalUnit):
1629 """Failover the master node to the current node.
1631 This is a special LU in that it must run on a non-master node.
1634 HPATH = "master-failover"
1635 HTYPE = constants.HTYPE_CLUSTER
1639 def BuildHooksEnv(self):
1642 This will run on the new master only in the pre phase, and on all
1643 the nodes in the post phase.
1647 "OP_TARGET": self.new_master,
1648 "NEW_MASTER": self.new_master,
1649 "OLD_MASTER": self.old_master,
1651 return env, [self.new_master], self.cfg.GetNodeList()
1653 def CheckPrereq(self):
1654 """Check prerequisites.
1656 This checks that we are not already the master.
1659 self.new_master = utils.HostInfo().name
1660 self.old_master = self.sstore.GetMasterNode()
1662 if self.old_master == self.new_master:
1663 raise errors.OpPrereqError("This commands must be run on the node"
1664 " where you want the new master to be."
1665 " %s is already the master" %
1668 def Exec(self, feedback_fn):
1669 """Failover the master node.
1671 This command, when run on a non-master node, will cause the current
1672 master to cease being master, and the non-master to become new
1676 #TODO: do not rely on gethostname returning the FQDN
1677 logger.Info("setting master to %s, old master: %s" %
1678 (self.new_master, self.old_master))
1680 if not rpc.call_node_stop_master(self.old_master):
1681 logger.Error("could disable the master role on the old master"
1682 " %s, please disable manually" % self.old_master)
1685 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688 logger.Error("could not distribute the new simple store master file"
1689 " to the other nodes, please check.")
1691 if not rpc.call_node_start_master(self.new_master):
1692 logger.Error("could not start the master role on the new master"
1693 " %s, please check" % self.new_master)
1694 feedback_fn("Error in activating the master IP on the new master,"
1695 " please fix manually.")
1699 class LUQueryClusterInfo(NoHooksLU):
1700 """Query cluster configuration.
1706 def CheckPrereq(self):
1707 """No prerequsites needed for this LU.
1712 def Exec(self, feedback_fn):
1713 """Return cluster config.
1717 "name": self.sstore.GetClusterName(),
1718 "software_version": constants.RELEASE_VERSION,
1719 "protocol_version": constants.PROTOCOL_VERSION,
1720 "config_version": constants.CONFIG_VERSION,
1721 "os_api_version": constants.OS_API_VERSION,
1722 "export_version": constants.EXPORT_VERSION,
1723 "master": self.sstore.GetMasterNode(),
1724 "architecture": (platform.architecture()[0], platform.machine()),
1730 class LUClusterCopyFile(NoHooksLU):
1731 """Copy file to cluster.
1734 _OP_REQP = ["nodes", "filename"]
1736 def CheckPrereq(self):
1737 """Check prerequisites.
1739 It should check that the named file exists and that the given list
1743 if not os.path.exists(self.op.filename):
1744 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746 self.nodes = _GetWantedNodes(self, self.op.nodes)
1748 def Exec(self, feedback_fn):
1749 """Copy a file from master to some nodes.
1752 opts - class with options as members
1753 args - list containing a single element, the file name
1755 nodes - list containing the name of target nodes; if empty, all nodes
1758 filename = self.op.filename
1760 myname = utils.HostInfo().name
1762 for node in self.nodes:
1765 if not ssh.CopyFileToNode(node, filename):
1766 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1769 class LUDumpClusterConfig(NoHooksLU):
1770 """Return a text-representation of the cluster-config.
1775 def CheckPrereq(self):
1776 """No prerequisites.
1781 def Exec(self, feedback_fn):
1782 """Dump a representation of the cluster config to the standard output.
1785 return self.cfg.DumpConfig()
1788 class LURunClusterCommand(NoHooksLU):
1789 """Run a command on some nodes.
1792 _OP_REQP = ["command", "nodes"]
1794 def CheckPrereq(self):
1795 """Check prerequisites.
1797 It checks that the given list of nodes is valid.
1800 self.nodes = _GetWantedNodes(self, self.op.nodes)
1802 def Exec(self, feedback_fn):
1803 """Run a command on some nodes.
1807 for node in self.nodes:
1808 result = ssh.SSHCall(node, "root", self.op.command)
1809 data.append((node, result.output, result.exit_code))
1814 class LUActivateInstanceDisks(NoHooksLU):
1815 """Bring up an instance's disks.
1818 _OP_REQP = ["instance_name"]
1820 def CheckPrereq(self):
1821 """Check prerequisites.
1823 This checks that the instance is in the cluster.
1826 instance = self.cfg.GetInstanceInfo(
1827 self.cfg.ExpandInstanceName(self.op.instance_name))
1828 if instance is None:
1829 raise errors.OpPrereqError("Instance '%s' not known" %
1830 self.op.instance_name)
1831 self.instance = instance
1834 def Exec(self, feedback_fn):
1835 """Activate the disks.
1838 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840 raise errors.OpExecError("Cannot activate block devices")
1845 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846 """Prepare the block devices for an instance.
1848 This sets up the block devices on all nodes.
1851 instance: a ganeti.objects.Instance object
1852 ignore_secondaries: if true, errors on secondary nodes won't result
1853 in an error return from the function
1856 false if the operation failed
1857 list of (host, instance_visible_name, node_visible_name) if the operation
1858 suceeded with the mapping from node devices to instance devices
1862 for inst_disk in instance.disks:
1863 master_result = None
1864 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865 cfg.SetDiskID(node_disk, node)
1866 is_primary = node == instance.primary_node
1867 result = rpc.call_blockdev_assemble(node, node_disk,
1868 instance.name, is_primary)
1870 logger.Error("could not prepare block device %s on node %s"
1871 " (is_primary=%s)" %
1872 (inst_disk.iv_name, node, is_primary))
1873 if is_primary or not ignore_secondaries:
1876 master_result = result
1877 device_info.append((instance.primary_node, inst_disk.iv_name,
1880 # leave the disks configured for the primary node
1881 # this is a workaround that would be fixed better by
1882 # improving the logical/physical id handling
1883 for disk in instance.disks:
1884 cfg.SetDiskID(disk, instance.primary_node)
1886 return disks_ok, device_info
1889 def _StartInstanceDisks(cfg, instance, force):
1890 """Start the disks of an instance.
1893 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894 ignore_secondaries=force)
1896 _ShutdownInstanceDisks(instance, cfg)
1897 if force is not None and not force:
1898 logger.Error("If the message above refers to a secondary node,"
1899 " you can retry the operation using '--force'.")
1900 raise errors.OpExecError("Disk consistency error")
1903 class LUDeactivateInstanceDisks(NoHooksLU):
1904 """Shutdown an instance's disks.
1907 _OP_REQP = ["instance_name"]
1909 def CheckPrereq(self):
1910 """Check prerequisites.
1912 This checks that the instance is in the cluster.
1915 instance = self.cfg.GetInstanceInfo(
1916 self.cfg.ExpandInstanceName(self.op.instance_name))
1917 if instance is None:
1918 raise errors.OpPrereqError("Instance '%s' not known" %
1919 self.op.instance_name)
1920 self.instance = instance
1922 def Exec(self, feedback_fn):
1923 """Deactivate the disks
1926 instance = self.instance
1927 ins_l = rpc.call_instance_list([instance.primary_node])
1928 ins_l = ins_l[instance.primary_node]
1929 if not type(ins_l) is list:
1930 raise errors.OpExecError("Can't contact node '%s'" %
1931 instance.primary_node)
1933 if self.instance.name in ins_l:
1934 raise errors.OpExecError("Instance is running, can't shutdown"
1937 _ShutdownInstanceDisks(instance, self.cfg)
1940 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941 """Shutdown block devices of an instance.
1943 This does the shutdown on all nodes of the instance.
1945 If the ignore_primary is false, errors on the primary node are
1950 for disk in instance.disks:
1951 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952 cfg.SetDiskID(top_disk, node)
1953 if not rpc.call_blockdev_shutdown(node, top_disk):
1954 logger.Error("could not shutdown block device %s on node %s" %
1955 (disk.iv_name, node))
1956 if not ignore_primary or node != instance.primary_node:
1961 class LUStartupInstance(LogicalUnit):
1962 """Starts an instance.
1965 HPATH = "instance-start"
1966 HTYPE = constants.HTYPE_INSTANCE
1967 _OP_REQP = ["instance_name", "force"]
1969 def BuildHooksEnv(self):
1972 This runs on master, primary and secondary nodes of the instance.
1976 "FORCE": self.op.force,
1978 env.update(_BuildInstanceHookEnvByObject(self.instance))
1979 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1980 list(self.instance.secondary_nodes))
1983 def CheckPrereq(self):
1984 """Check prerequisites.
1986 This checks that the instance is in the cluster.
1989 instance = self.cfg.GetInstanceInfo(
1990 self.cfg.ExpandInstanceName(self.op.instance_name))
1991 if instance is None:
1992 raise errors.OpPrereqError("Instance '%s' not known" %
1993 self.op.instance_name)
1995 # check bridges existance
1996 _CheckInstanceBridgesExist(instance)
1998 self.instance = instance
1999 self.op.instance_name = instance.name
2001 def Exec(self, feedback_fn):
2002 """Start the instance.
2005 instance = self.instance
2006 force = self.op.force
2007 extra_args = getattr(self.op, "extra_args", "")
2009 node_current = instance.primary_node
2011 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
2013 raise errors.OpExecError("Could not contact node %s for infos" %
2016 freememory = nodeinfo[node_current]['memory_free']
2017 memory = instance.memory
2018 if memory > freememory:
2019 raise errors.OpExecError("Not enough memory to start instance"
2021 " needed %s MiB, available %s MiB" %
2022 (instance.name, node_current, memory,
2025 _StartInstanceDisks(self.cfg, instance, force)
2027 if not rpc.call_instance_start(node_current, instance, extra_args):
2028 _ShutdownInstanceDisks(instance, self.cfg)
2029 raise errors.OpExecError("Could not start instance")
2031 self.cfg.MarkInstanceUp(instance.name)
2034 class LURebootInstance(LogicalUnit):
2035 """Reboot an instance.
2038 HPATH = "instance-reboot"
2039 HTYPE = constants.HTYPE_INSTANCE
2040 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2042 def BuildHooksEnv(self):
2045 This runs on master, primary and secondary nodes of the instance.
2049 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2051 env.update(_BuildInstanceHookEnvByObject(self.instance))
2052 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053 list(self.instance.secondary_nodes))
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2059 This checks that the instance is in the cluster.
2062 instance = self.cfg.GetInstanceInfo(
2063 self.cfg.ExpandInstanceName(self.op.instance_name))
2064 if instance is None:
2065 raise errors.OpPrereqError("Instance '%s' not known" %
2066 self.op.instance_name)
2068 # check bridges existance
2069 _CheckInstanceBridgesExist(instance)
2071 self.instance = instance
2072 self.op.instance_name = instance.name
2074 def Exec(self, feedback_fn):
2075 """Reboot the instance.
2078 instance = self.instance
2079 ignore_secondaries = self.op.ignore_secondaries
2080 reboot_type = self.op.reboot_type
2081 extra_args = getattr(self.op, "extra_args", "")
2083 node_current = instance.primary_node
2085 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2086 constants.INSTANCE_REBOOT_HARD,
2087 constants.INSTANCE_REBOOT_FULL]:
2088 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2089 (constants.INSTANCE_REBOOT_SOFT,
2090 constants.INSTANCE_REBOOT_HARD,
2091 constants.INSTANCE_REBOOT_FULL))
2093 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2094 constants.INSTANCE_REBOOT_HARD]:
2095 if not rpc.call_instance_reboot(node_current, instance,
2096 reboot_type, extra_args):
2097 raise errors.OpExecError("Could not reboot instance")
2099 if not rpc.call_instance_shutdown(node_current, instance):
2100 raise errors.OpExecError("could not shutdown instance for full reboot")
2101 _ShutdownInstanceDisks(instance, self.cfg)
2102 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2103 if not rpc.call_instance_start(node_current, instance, extra_args):
2104 _ShutdownInstanceDisks(instance, self.cfg)
2105 raise errors.OpExecError("Could not start instance for full reboot")
2107 self.cfg.MarkInstanceUp(instance.name)
2110 class LUShutdownInstance(LogicalUnit):
2111 """Shutdown an instance.
2114 HPATH = "instance-stop"
2115 HTYPE = constants.HTYPE_INSTANCE
2116 _OP_REQP = ["instance_name"]
2118 def BuildHooksEnv(self):
2121 This runs on master, primary and secondary nodes of the instance.
2124 env = _BuildInstanceHookEnvByObject(self.instance)
2125 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126 list(self.instance.secondary_nodes))
2129 def CheckPrereq(self):
2130 """Check prerequisites.
2132 This checks that the instance is in the cluster.
2135 instance = self.cfg.GetInstanceInfo(
2136 self.cfg.ExpandInstanceName(self.op.instance_name))
2137 if instance is None:
2138 raise errors.OpPrereqError("Instance '%s' not known" %
2139 self.op.instance_name)
2140 self.instance = instance
2142 def Exec(self, feedback_fn):
2143 """Shutdown the instance.
2146 instance = self.instance
2147 node_current = instance.primary_node
2148 if not rpc.call_instance_shutdown(node_current, instance):
2149 logger.Error("could not shutdown instance")
2151 self.cfg.MarkInstanceDown(instance.name)
2152 _ShutdownInstanceDisks(instance, self.cfg)
2155 class LUReinstallInstance(LogicalUnit):
2156 """Reinstall an instance.
2159 HPATH = "instance-reinstall"
2160 HTYPE = constants.HTYPE_INSTANCE
2161 _OP_REQP = ["instance_name"]
2163 def BuildHooksEnv(self):
2166 This runs on master, primary and secondary nodes of the instance.
2169 env = _BuildInstanceHookEnvByObject(self.instance)
2170 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2171 list(self.instance.secondary_nodes))
2174 def CheckPrereq(self):
2175 """Check prerequisites.
2177 This checks that the instance is in the cluster and is not running.
2180 instance = self.cfg.GetInstanceInfo(
2181 self.cfg.ExpandInstanceName(self.op.instance_name))
2182 if instance is None:
2183 raise errors.OpPrereqError("Instance '%s' not known" %
2184 self.op.instance_name)
2185 if instance.disk_template == constants.DT_DISKLESS:
2186 raise errors.OpPrereqError("Instance '%s' has no disks" %
2187 self.op.instance_name)
2188 if instance.status != "down":
2189 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2190 self.op.instance_name)
2191 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2193 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2194 (self.op.instance_name,
2195 instance.primary_node))
2197 self.op.os_type = getattr(self.op, "os_type", None)
2198 if self.op.os_type is not None:
2200 pnode = self.cfg.GetNodeInfo(
2201 self.cfg.ExpandNodeName(instance.primary_node))
2203 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2205 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2207 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2208 " primary node" % self.op.os_type)
2210 self.instance = instance
2212 def Exec(self, feedback_fn):
2213 """Reinstall the instance.
2216 inst = self.instance
2218 if self.op.os_type is not None:
2219 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2220 inst.os = self.op.os_type
2221 self.cfg.AddInstance(inst)
2223 _StartInstanceDisks(self.cfg, inst, None)
2225 feedback_fn("Running the instance OS create scripts...")
2226 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2227 raise errors.OpExecError("Could not install OS for instance %s"
2229 (inst.name, inst.primary_node))
2231 _ShutdownInstanceDisks(inst, self.cfg)
2234 class LURenameInstance(LogicalUnit):
2235 """Rename an instance.
2238 HPATH = "instance-rename"
2239 HTYPE = constants.HTYPE_INSTANCE
2240 _OP_REQP = ["instance_name", "new_name"]
2242 def BuildHooksEnv(self):
2245 This runs on master, primary and secondary nodes of the instance.
2248 env = _BuildInstanceHookEnvByObject(self.instance)
2249 env["INSTANCE_NEW_NAME"] = self.op.new_name
2250 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2251 list(self.instance.secondary_nodes))
2254 def CheckPrereq(self):
2255 """Check prerequisites.
2257 This checks that the instance is in the cluster and is not running.
2260 instance = self.cfg.GetInstanceInfo(
2261 self.cfg.ExpandInstanceName(self.op.instance_name))
2262 if instance is None:
2263 raise errors.OpPrereqError("Instance '%s' not known" %
2264 self.op.instance_name)
2265 if instance.status != "down":
2266 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2267 self.op.instance_name)
2268 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2270 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2271 (self.op.instance_name,
2272 instance.primary_node))
2273 self.instance = instance
2275 # new name verification
2276 name_info = utils.HostInfo(self.op.new_name)
2278 self.op.new_name = new_name = name_info.name
2279 if not getattr(self.op, "ignore_ip", False):
2280 command = ["fping", "-q", name_info.ip]
2281 result = utils.RunCmd(command)
2282 if not result.failed:
2283 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2284 (name_info.ip, new_name))
2287 def Exec(self, feedback_fn):
2288 """Reinstall the instance.
2291 inst = self.instance
2292 old_name = inst.name
2294 self.cfg.RenameInstance(inst.name, self.op.new_name)
2296 # re-read the instance from the configuration after rename
2297 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2299 _StartInstanceDisks(self.cfg, inst, None)
2301 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2303 msg = ("Could run OS rename script for instance %s on node %s (but the"
2304 " instance has been renamed in Ganeti)" %
2305 (inst.name, inst.primary_node))
2308 _ShutdownInstanceDisks(inst, self.cfg)
2311 class LURemoveInstance(LogicalUnit):
2312 """Remove an instance.
2315 HPATH = "instance-remove"
2316 HTYPE = constants.HTYPE_INSTANCE
2317 _OP_REQP = ["instance_name"]
2319 def BuildHooksEnv(self):
2322 This runs on master, primary and secondary nodes of the instance.
2325 env = _BuildInstanceHookEnvByObject(self.instance)
2326 nl = [self.sstore.GetMasterNode()]
2329 def CheckPrereq(self):
2330 """Check prerequisites.
2332 This checks that the instance is in the cluster.
2335 instance = self.cfg.GetInstanceInfo(
2336 self.cfg.ExpandInstanceName(self.op.instance_name))
2337 if instance is None:
2338 raise errors.OpPrereqError("Instance '%s' not known" %
2339 self.op.instance_name)
2340 self.instance = instance
2342 def Exec(self, feedback_fn):
2343 """Remove the instance.
2346 instance = self.instance
2347 logger.Info("shutting down instance %s on node %s" %
2348 (instance.name, instance.primary_node))
2350 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351 if self.op.ignore_failures:
2352 feedback_fn("Warning: can't shutdown instance")
2354 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355 (instance.name, instance.primary_node))
2357 logger.Info("removing block devices for instance %s" % instance.name)
2359 if not _RemoveDisks(instance, self.cfg):
2360 if self.op.ignore_failures:
2361 feedback_fn("Warning: can't remove instance's disks")
2363 raise errors.OpExecError("Can't remove instance's disks")
2365 logger.Info("removing instance %s out of cluster config" % instance.name)
2367 self.cfg.RemoveInstance(instance.name)
2370 class LUQueryInstances(NoHooksLU):
2371 """Logical unit for querying instances.
2374 _OP_REQP = ["output_fields", "names"]
2376 def CheckPrereq(self):
2377 """Check prerequisites.
2379 This checks that the fields required are valid output fields.
2382 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2383 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384 "admin_state", "admin_ram",
2385 "disk_template", "ip", "mac", "bridge",
2386 "sda_size", "sdb_size"],
2387 dynamic=self.dynamic_fields,
2388 selected=self.op.output_fields)
2390 self.wanted = _GetWantedInstances(self, self.op.names)
2392 def Exec(self, feedback_fn):
2393 """Computes the list of nodes and their attributes.
2396 instance_names = self.wanted
2397 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2400 # begin data gathering
2402 nodes = frozenset([inst.primary_node for inst in instance_list])
2405 if self.dynamic_fields.intersection(self.op.output_fields):
2407 node_data = rpc.call_all_instances_info(nodes)
2409 result = node_data[name]
2411 live_data.update(result)
2412 elif result == False:
2413 bad_nodes.append(name)
2414 # else no instance is alive
2416 live_data = dict([(name, {}) for name in instance_names])
2418 # end data gathering
2421 for instance in instance_list:
2423 for field in self.op.output_fields:
2428 elif field == "pnode":
2429 val = instance.primary_node
2430 elif field == "snodes":
2431 val = list(instance.secondary_nodes)
2432 elif field == "admin_state":
2433 val = (instance.status != "down")
2434 elif field == "oper_state":
2435 if instance.primary_node in bad_nodes:
2438 val = bool(live_data.get(instance.name))
2439 elif field == "admin_ram":
2440 val = instance.memory
2441 elif field == "oper_ram":
2442 if instance.primary_node in bad_nodes:
2444 elif instance.name in live_data:
2445 val = live_data[instance.name].get("memory", "?")
2448 elif field == "disk_template":
2449 val = instance.disk_template
2451 val = instance.nics[0].ip
2452 elif field == "bridge":
2453 val = instance.nics[0].bridge
2454 elif field == "mac":
2455 val = instance.nics[0].mac
2456 elif field == "sda_size" or field == "sdb_size":
2457 disk = instance.FindDisk(field[:3])
2463 raise errors.ParameterError(field)
2470 class LUFailoverInstance(LogicalUnit):
2471 """Failover an instance.
2474 HPATH = "instance-failover"
2475 HTYPE = constants.HTYPE_INSTANCE
2476 _OP_REQP = ["instance_name", "ignore_consistency"]
2478 def BuildHooksEnv(self):
2481 This runs on master, primary and secondary nodes of the instance.
2485 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2487 env.update(_BuildInstanceHookEnvByObject(self.instance))
2488 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2491 def CheckPrereq(self):
2492 """Check prerequisites.
2494 This checks that the instance is in the cluster.
2497 instance = self.cfg.GetInstanceInfo(
2498 self.cfg.ExpandInstanceName(self.op.instance_name))
2499 if instance is None:
2500 raise errors.OpPrereqError("Instance '%s' not known" %
2501 self.op.instance_name)
2503 if instance.disk_template not in constants.DTS_NET_MIRROR:
2504 raise errors.OpPrereqError("Instance's disk layout is not"
2505 " network mirrored, cannot failover.")
2507 secondary_nodes = instance.secondary_nodes
2508 if not secondary_nodes:
2509 raise errors.ProgrammerError("no secondary node but using "
2510 "DT_REMOTE_RAID1 template")
2512 # check memory requirements on the secondary node
2513 target_node = secondary_nodes[0]
2514 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2515 info = nodeinfo.get(target_node, None)
2517 raise errors.OpPrereqError("Cannot get current information"
2518 " from node '%s'" % nodeinfo)
2519 if instance.memory > info['memory_free']:
2520 raise errors.OpPrereqError("Not enough memory on target node %s."
2521 " %d MB available, %d MB required" %
2522 (target_node, info['memory_free'],
2525 # check bridge existance
2526 brlist = [nic.bridge for nic in instance.nics]
2527 if not rpc.call_bridges_exist(target_node, brlist):
2528 raise errors.OpPrereqError("One or more target bridges %s does not"
2529 " exist on destination node '%s'" %
2530 (brlist, target_node))
2532 self.instance = instance
2534 def Exec(self, feedback_fn):
2535 """Failover an instance.
2537 The failover is done by shutting it down on its present node and
2538 starting it on the secondary.
2541 instance = self.instance
2543 source_node = instance.primary_node
2544 target_node = instance.secondary_nodes[0]
2546 feedback_fn("* checking disk consistency between source and target")
2547 for dev in instance.disks:
2548 # for remote_raid1, these are md over drbd
2549 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2550 if not self.op.ignore_consistency:
2551 raise errors.OpExecError("Disk %s is degraded on target node,"
2552 " aborting failover." % dev.iv_name)
2554 feedback_fn("* checking target node resource availability")
2555 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2558 raise errors.OpExecError("Could not contact target node %s." %
2561 free_memory = int(nodeinfo[target_node]['memory_free'])
2562 memory = instance.memory
2563 if memory > free_memory:
2564 raise errors.OpExecError("Not enough memory to create instance %s on"
2565 " node %s. needed %s MiB, available %s MiB" %
2566 (instance.name, target_node, memory,
2569 feedback_fn("* shutting down instance on source node")
2570 logger.Info("Shutting down instance %s on node %s" %
2571 (instance.name, source_node))
2573 if not rpc.call_instance_shutdown(source_node, instance):
2574 if self.op.ignore_consistency:
2575 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576 " anyway. Please make sure node %s is down" %
2577 (instance.name, source_node, source_node))
2579 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580 (instance.name, source_node))
2582 feedback_fn("* deactivating the instance's disks on source node")
2583 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584 raise errors.OpExecError("Can't shut down the instance's disks.")
2586 instance.primary_node = target_node
2587 # distribute new instance config to the other nodes
2588 self.cfg.AddInstance(instance)
2590 feedback_fn("* activating the instance's disks on target node")
2591 logger.Info("Starting instance %s on node %s" %
2592 (instance.name, target_node))
2594 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595 ignore_secondaries=True)
2597 _ShutdownInstanceDisks(instance, self.cfg)
2598 raise errors.OpExecError("Can't activate the instance's disks")
2600 feedback_fn("* starting the instance on the target node")
2601 if not rpc.call_instance_start(target_node, instance, None):
2602 _ShutdownInstanceDisks(instance, self.cfg)
2603 raise errors.OpExecError("Could not start instance %s on node %s." %
2604 (instance.name, target_node))
2607 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608 """Create a tree of block devices on the primary node.
2610 This always creates all devices.
2614 for child in device.children:
2615 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2618 cfg.SetDiskID(device, node)
2619 new_id = rpc.call_blockdev_create(node, device, device.size,
2620 instance.name, True, info)
2623 if device.physical_id is None:
2624 device.physical_id = new_id
2628 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629 """Create a tree of block devices on a secondary node.
2631 If this device type has to be created on secondaries, create it and
2634 If not, just recurse to children keeping the same 'force' value.
2637 if device.CreateOnSecondary():
2640 for child in device.children:
2641 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642 child, force, info):
2647 cfg.SetDiskID(device, node)
2648 new_id = rpc.call_blockdev_create(node, device, device.size,
2649 instance.name, False, info)
2652 if device.physical_id is None:
2653 device.physical_id = new_id
2657 def _GenerateUniqueNames(cfg, exts):
2658 """Generate a suitable LV name.
2660 This will generate a logical volume name for the given instance.
2665 new_id = cfg.GenerateUniqueID()
2666 results.append("%s%s" % (new_id, val))
2670 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671 """Generate a drbd device complete with its children.
2674 port = cfg.AllocatePort()
2675 vgname = cfg.GetVGName()
2676 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677 logical_id=(vgname, names[0]))
2678 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679 logical_id=(vgname, names[1]))
2680 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681 logical_id = (primary, secondary, port),
2682 children = [dev_data, dev_meta])
2686 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687 """Generate a drbd8 device complete with its children.
2690 port = cfg.AllocatePort()
2691 vgname = cfg.GetVGName()
2692 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693 logical_id=(vgname, names[0]))
2694 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695 logical_id=(vgname, names[1]))
2696 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697 logical_id = (primary, secondary, port),
2698 children = [dev_data, dev_meta],
2702 def _GenerateDiskTemplate(cfg, template_name,
2703 instance_name, primary_node,
2704 secondary_nodes, disk_sz, swap_sz):
2705 """Generate the entire disk layout for a given template type.
2708 #TODO: compute space requirements
2710 vgname = cfg.GetVGName()
2711 if template_name == "diskless":
2713 elif template_name == "plain":
2714 if len(secondary_nodes) != 0:
2715 raise errors.ProgrammerError("Wrong template configuration")
2717 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719 logical_id=(vgname, names[0]),
2721 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722 logical_id=(vgname, names[1]),
2724 disks = [sda_dev, sdb_dev]
2725 elif template_name == "local_raid1":
2726 if len(secondary_nodes) != 0:
2727 raise errors.ProgrammerError("Wrong template configuration")
2730 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731 ".sdb_m1", ".sdb_m2"])
2732 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733 logical_id=(vgname, names[0]))
2734 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735 logical_id=(vgname, names[1]))
2736 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2738 children = [sda_dev_m1, sda_dev_m2])
2739 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740 logical_id=(vgname, names[2]))
2741 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742 logical_id=(vgname, names[3]))
2743 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2745 children = [sdb_dev_m1, sdb_dev_m2])
2746 disks = [md_sda_dev, md_sdb_dev]
2747 elif template_name == constants.DT_REMOTE_RAID1:
2748 if len(secondary_nodes) != 1:
2749 raise errors.ProgrammerError("Wrong template configuration")
2750 remote_node = secondary_nodes[0]
2751 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752 ".sdb_data", ".sdb_meta"])
2753 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754 disk_sz, names[0:2])
2755 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756 children = [drbd_sda_dev], size=disk_sz)
2757 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758 swap_sz, names[2:4])
2759 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760 children = [drbd_sdb_dev], size=swap_sz)
2761 disks = [md_sda_dev, md_sdb_dev]
2762 elif template_name == constants.DT_DRBD8:
2763 if len(secondary_nodes) != 1:
2764 raise errors.ProgrammerError("Wrong template configuration")
2765 remote_node = secondary_nodes[0]
2766 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767 ".sdb_data", ".sdb_meta"])
2768 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769 disk_sz, names[0:2], "sda")
2770 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771 swap_sz, names[2:4], "sdb")
2772 disks = [drbd_sda_dev, drbd_sdb_dev]
2774 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2778 def _GetInstanceInfoText(instance):
2779 """Compute that text that should be added to the disk's metadata.
2782 return "originstname+%s" % instance.name
2785 def _CreateDisks(cfg, instance):
2786 """Create all disks for an instance.
2788 This abstracts away some work from AddInstance.
2791 instance: the instance object
2794 True or False showing the success of the creation process
2797 info = _GetInstanceInfoText(instance)
2799 for device in instance.disks:
2800 logger.Info("creating volume %s for instance %s" %
2801 (device.iv_name, instance.name))
2803 for secondary_node in instance.secondary_nodes:
2804 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805 device, False, info):
2806 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807 (device.iv_name, device, secondary_node))
2810 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811 instance, device, info):
2812 logger.Error("failed to create volume %s on primary!" %
2818 def _RemoveDisks(instance, cfg):
2819 """Remove all disks for an instance.
2821 This abstracts away some work from `AddInstance()` and
2822 `RemoveInstance()`. Note that in case some of the devices couldn't
2823 be removed, the removal will continue with the other ones (compare
2824 with `_CreateDisks()`).
2827 instance: the instance object
2830 True or False showing the success of the removal proces
2833 logger.Info("removing block devices for instance %s" % instance.name)
2836 for device in instance.disks:
2837 for node, disk in device.ComputeNodeTree(instance.primary_node):
2838 cfg.SetDiskID(disk, node)
2839 if not rpc.call_blockdev_remove(node, disk):
2840 logger.Error("could not remove block device %s on node %s,"
2841 " continuing anyway" %
2842 (device.iv_name, node))
2847 class LUCreateInstance(LogicalUnit):
2848 """Create an instance.
2851 HPATH = "instance-add"
2852 HTYPE = constants.HTYPE_INSTANCE
2853 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854 "disk_template", "swap_size", "mode", "start", "vcpus",
2855 "wait_for_sync", "ip_check", "mac"]
2857 def BuildHooksEnv(self):
2860 This runs on master, primary and secondary nodes of the instance.
2864 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865 "INSTANCE_DISK_SIZE": self.op.disk_size,
2866 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867 "INSTANCE_ADD_MODE": self.op.mode,
2869 if self.op.mode == constants.INSTANCE_IMPORT:
2870 env["INSTANCE_SRC_NODE"] = self.op.src_node
2871 env["INSTANCE_SRC_PATH"] = self.op.src_path
2872 env["INSTANCE_SRC_IMAGE"] = self.src_image
2874 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875 primary_node=self.op.pnode,
2876 secondary_nodes=self.secondaries,
2877 status=self.instance_status,
2878 os_type=self.op.os_type,
2879 memory=self.op.mem_size,
2880 vcpus=self.op.vcpus,
2881 nics=[(self.inst_ip, self.op.bridge)],
2884 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2889 def CheckPrereq(self):
2890 """Check prerequisites.
2893 if self.op.mode not in (constants.INSTANCE_CREATE,
2894 constants.INSTANCE_IMPORT):
2895 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2898 if self.op.mode == constants.INSTANCE_IMPORT:
2899 src_node = getattr(self.op, "src_node", None)
2900 src_path = getattr(self.op, "src_path", None)
2901 if src_node is None or src_path is None:
2902 raise errors.OpPrereqError("Importing an instance requires source"
2903 " node and path options")
2904 src_node_full = self.cfg.ExpandNodeName(src_node)
2905 if src_node_full is None:
2906 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2907 self.op.src_node = src_node = src_node_full
2909 if not os.path.isabs(src_path):
2910 raise errors.OpPrereqError("The source path must be absolute")
2912 export_info = rpc.call_export_info(src_node, src_path)
2915 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2917 if not export_info.has_section(constants.INISECT_EXP):
2918 raise errors.ProgrammerError("Corrupted export config")
2920 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2921 if (int(ei_version) != constants.EXPORT_VERSION):
2922 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2923 (ei_version, constants.EXPORT_VERSION))
2925 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2926 raise errors.OpPrereqError("Can't import instance with more than"
2929 # FIXME: are the old os-es, disk sizes, etc. useful?
2930 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2931 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2933 self.src_image = diskimage
2934 else: # INSTANCE_CREATE
2935 if getattr(self.op, "os_type", None) is None:
2936 raise errors.OpPrereqError("No guest OS specified")
2938 # check primary node
2939 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2941 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2943 self.op.pnode = pnode.name
2945 self.secondaries = []
2946 # disk template and mirror node verification
2947 if self.op.disk_template not in constants.DISK_TEMPLATES:
2948 raise errors.OpPrereqError("Invalid disk template name")
2950 if self.op.disk_template in constants.DTS_NET_MIRROR:
2951 if getattr(self.op, "snode", None) is None:
2952 raise errors.OpPrereqError("The networked disk templates need"
2955 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2956 if snode_name is None:
2957 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2959 elif snode_name == pnode.name:
2960 raise errors.OpPrereqError("The secondary node cannot be"
2961 " the primary node.")
2962 self.secondaries.append(snode_name)
2964 # Check lv size requirements
2965 nodenames = [pnode.name] + self.secondaries
2966 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2968 # Required free disk space as a function of disk and swap space
2970 constants.DT_DISKLESS: 0,
2971 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973 # 256 MB are added for drbd metadata, 128MB for each drbd device
2974 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2978 if self.op.disk_template not in req_size_dict:
2979 raise errors.ProgrammerError("Disk template '%s' size requirement"
2980 " is unknown" % self.op.disk_template)
2982 req_size = req_size_dict[self.op.disk_template]
2984 for node in nodenames:
2985 info = nodeinfo.get(node, None)
2987 raise errors.OpPrereqError("Cannot get current information"
2988 " from node '%s'" % nodeinfo)
2989 if req_size > info['vg_free']:
2990 raise errors.OpPrereqError("Not enough disk space on target node %s."
2991 " %d MB available, %d MB required" %
2992 (node, info['vg_free'], req_size))
2995 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2997 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2998 " primary node" % self.op.os_type)
3000 # instance verification
3001 hostname1 = utils.HostInfo(self.op.instance_name)
3003 self.op.instance_name = instance_name = hostname1.name
3004 instance_list = self.cfg.GetInstanceList()
3005 if instance_name in instance_list:
3006 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3009 ip = getattr(self.op, "ip", None)
3010 if ip is None or ip.lower() == "none":
3012 elif ip.lower() == "auto":
3013 inst_ip = hostname1.ip
3015 if not utils.IsValidIP(ip):
3016 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3017 " like a valid IP" % ip)
3019 self.inst_ip = inst_ip
3021 if self.op.start and not self.op.ip_check:
3022 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3023 " adding an instance in start mode")
3025 if self.op.ip_check:
3026 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3027 constants.DEFAULT_NODED_PORT):
3028 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3029 (hostname1.ip, instance_name))
3031 # MAC address verification
3032 if self.op.mac != "auto":
3033 if not utils.IsValidMac(self.op.mac.lower()):
3034 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3037 # bridge verification
3038 bridge = getattr(self.op, "bridge", None)
3040 self.op.bridge = self.cfg.GetDefBridge()
3042 self.op.bridge = bridge
3044 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3045 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3046 " destination node '%s'" %
3047 (self.op.bridge, pnode.name))
3050 self.instance_status = 'up'
3052 self.instance_status = 'down'
3054 def Exec(self, feedback_fn):
3055 """Create and add the instance to the cluster.
3058 instance = self.op.instance_name
3059 pnode_name = self.pnode.name
3061 if self.op.mac == "auto":
3062 mac_address=self.cfg.GenerateMAC()
3064 mac_address=self.op.mac
3066 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3067 if self.inst_ip is not None:
3068 nic.ip = self.inst_ip
3070 ht_kind = self.sstore.GetHypervisorType()
3071 if ht_kind in constants.HTS_REQ_PORT:
3072 network_port = self.cfg.AllocatePort()
3076 disks = _GenerateDiskTemplate(self.cfg,
3077 self.op.disk_template,
3078 instance, pnode_name,
3079 self.secondaries, self.op.disk_size,
3082 iobj = objects.Instance(name=instance, os=self.op.os_type,
3083 primary_node=pnode_name,
3084 memory=self.op.mem_size,
3085 vcpus=self.op.vcpus,
3086 nics=[nic], disks=disks,
3087 disk_template=self.op.disk_template,
3088 status=self.instance_status,
3089 network_port=network_port,
3092 feedback_fn("* creating instance disks...")
3093 if not _CreateDisks(self.cfg, iobj):
3094 _RemoveDisks(iobj, self.cfg)
3095 raise errors.OpExecError("Device creation failed, reverting...")
3097 feedback_fn("adding instance %s to cluster config" % instance)
3099 self.cfg.AddInstance(iobj)
3101 if self.op.wait_for_sync:
3102 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3103 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3104 # make sure the disks are not degraded (still sync-ing is ok)
3106 feedback_fn("* checking mirrors status")
3107 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3112 _RemoveDisks(iobj, self.cfg)
3113 self.cfg.RemoveInstance(iobj.name)
3114 raise errors.OpExecError("There are some degraded disks for"
3117 feedback_fn("creating os for instance %s on node %s" %
3118 (instance, pnode_name))
3120 if iobj.disk_template != constants.DT_DISKLESS:
3121 if self.op.mode == constants.INSTANCE_CREATE:
3122 feedback_fn("* running the instance OS create scripts...")
3123 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3124 raise errors.OpExecError("could not add os for instance %s"
3126 (instance, pnode_name))
3128 elif self.op.mode == constants.INSTANCE_IMPORT:
3129 feedback_fn("* running the instance OS import scripts...")
3130 src_node = self.op.src_node
3131 src_image = self.src_image
3132 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3133 src_node, src_image):
3134 raise errors.OpExecError("Could not import os for instance"
3136 (instance, pnode_name))
3138 # also checked in the prereq part
3139 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3143 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3144 feedback_fn("* starting instance...")
3145 if not rpc.call_instance_start(pnode_name, iobj, None):
3146 raise errors.OpExecError("Could not start instance")
3149 class LUConnectConsole(NoHooksLU):
3150 """Connect to an instance's console.
3152 This is somewhat special in that it returns the command line that
3153 you need to run on the master node in order to connect to the
3157 _OP_REQP = ["instance_name"]
3159 def CheckPrereq(self):
3160 """Check prerequisites.
3162 This checks that the instance is in the cluster.
3165 instance = self.cfg.GetInstanceInfo(
3166 self.cfg.ExpandInstanceName(self.op.instance_name))
3167 if instance is None:
3168 raise errors.OpPrereqError("Instance '%s' not known" %
3169 self.op.instance_name)
3170 self.instance = instance
3172 def Exec(self, feedback_fn):
3173 """Connect to the console of an instance
3176 instance = self.instance
3177 node = instance.primary_node
3179 node_insts = rpc.call_instance_list([node])[node]
3180 if node_insts is False:
3181 raise errors.OpExecError("Can't connect to node %s." % node)
3183 if instance.name not in node_insts:
3184 raise errors.OpExecError("Instance %s is not running." % instance.name)
3186 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3188 hyper = hypervisor.GetHypervisor()
3189 console_cmd = hyper.GetShellCommandForConsole(instance)
3191 argv = ["ssh", "-q", "-t"]
3192 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3193 argv.extend(ssh.BATCH_MODE_OPTS)
3195 argv.append(console_cmd)
3199 class LUAddMDDRBDComponent(LogicalUnit):
3200 """Adda new mirror member to an instance's disk.
3203 HPATH = "mirror-add"
3204 HTYPE = constants.HTYPE_INSTANCE
3205 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3207 def BuildHooksEnv(self):
3210 This runs on the master, the primary and all the secondaries.
3214 "NEW_SECONDARY": self.op.remote_node,
3215 "DISK_NAME": self.op.disk_name,
3217 env.update(_BuildInstanceHookEnvByObject(self.instance))
3218 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3219 self.op.remote_node,] + list(self.instance.secondary_nodes)
3222 def CheckPrereq(self):
3223 """Check prerequisites.
3225 This checks that the instance is in the cluster.
3228 instance = self.cfg.GetInstanceInfo(
3229 self.cfg.ExpandInstanceName(self.op.instance_name))
3230 if instance is None:
3231 raise errors.OpPrereqError("Instance '%s' not known" %
3232 self.op.instance_name)
3233 self.instance = instance
3235 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3236 if remote_node is None:
3237 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3238 self.remote_node = remote_node
3240 if remote_node == instance.primary_node:
3241 raise errors.OpPrereqError("The specified node is the primary node of"
3244 if instance.disk_template != constants.DT_REMOTE_RAID1:
3245 raise errors.OpPrereqError("Instance's disk layout is not"
3247 for disk in instance.disks:
3248 if disk.iv_name == self.op.disk_name:
3251 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3252 " instance." % self.op.disk_name)
3253 if len(disk.children) > 1:
3254 raise errors.OpPrereqError("The device already has two slave devices."
3255 " This would create a 3-disk raid1 which we"
3259 def Exec(self, feedback_fn):
3260 """Add the mirror component
3264 instance = self.instance
3266 remote_node = self.remote_node
3267 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3268 names = _GenerateUniqueNames(self.cfg, lv_names)
3269 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3270 remote_node, disk.size, names)
3272 logger.Info("adding new mirror component on secondary")
3274 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3276 _GetInstanceInfoText(instance)):
3277 raise errors.OpExecError("Failed to create new component on secondary"
3278 " node %s" % remote_node)
3280 logger.Info("adding new mirror component on primary")
3282 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3284 _GetInstanceInfoText(instance)):
3285 # remove secondary dev
3286 self.cfg.SetDiskID(new_drbd, remote_node)
3287 rpc.call_blockdev_remove(remote_node, new_drbd)
3288 raise errors.OpExecError("Failed to create volume on primary")
3290 # the device exists now
3291 # call the primary node to add the mirror to md
3292 logger.Info("adding new mirror component to md")
3293 if not rpc.call_blockdev_addchildren(instance.primary_node,
3295 logger.Error("Can't add mirror compoment to md!")
3296 self.cfg.SetDiskID(new_drbd, remote_node)
3297 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3298 logger.Error("Can't rollback on secondary")
3299 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3300 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3301 logger.Error("Can't rollback on primary")
3302 raise errors.OpExecError("Can't add mirror component to md array")
3304 disk.children.append(new_drbd)
3306 self.cfg.AddInstance(instance)
3308 _WaitForSync(self.cfg, instance, self.proc)
3313 class LURemoveMDDRBDComponent(LogicalUnit):
3314 """Remove a component from a remote_raid1 disk.
3317 HPATH = "mirror-remove"
3318 HTYPE = constants.HTYPE_INSTANCE
3319 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3321 def BuildHooksEnv(self):
3324 This runs on the master, the primary and all the secondaries.
3328 "DISK_NAME": self.op.disk_name,
3329 "DISK_ID": self.op.disk_id,
3330 "OLD_SECONDARY": self.old_secondary,
3332 env.update(_BuildInstanceHookEnvByObject(self.instance))
3333 nl = [self.sstore.GetMasterNode(),
3334 self.instance.primary_node] + list(self.instance.secondary_nodes)
3337 def CheckPrereq(self):
3338 """Check prerequisites.
3340 This checks that the instance is in the cluster.
3343 instance = self.cfg.GetInstanceInfo(
3344 self.cfg.ExpandInstanceName(self.op.instance_name))
3345 if instance is None:
3346 raise errors.OpPrereqError("Instance '%s' not known" %
3347 self.op.instance_name)
3348 self.instance = instance
3350 if instance.disk_template != constants.DT_REMOTE_RAID1:
3351 raise errors.OpPrereqError("Instance's disk layout is not"
3353 for disk in instance.disks:
3354 if disk.iv_name == self.op.disk_name:
3357 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3358 " instance." % self.op.disk_name)
3359 for child in disk.children:
3360 if (child.dev_type == constants.LD_DRBD7 and
3361 child.logical_id[2] == self.op.disk_id):
3364 raise errors.OpPrereqError("Can't find the device with this port.")
3366 if len(disk.children) < 2:
3367 raise errors.OpPrereqError("Cannot remove the last component from"
3371 if self.child.logical_id[0] == instance.primary_node:
3375 self.old_secondary = self.child.logical_id[oid]
3377 def Exec(self, feedback_fn):
3378 """Remove the mirror component
3381 instance = self.instance
3384 logger.Info("remove mirror component")
3385 self.cfg.SetDiskID(disk, instance.primary_node)
3386 if not rpc.call_blockdev_removechildren(instance.primary_node,
3388 raise errors.OpExecError("Can't remove child from mirror.")
3390 for node in child.logical_id[:2]:
3391 self.cfg.SetDiskID(child, node)
3392 if not rpc.call_blockdev_remove(node, child):
3393 logger.Error("Warning: failed to remove device from node %s,"
3394 " continuing operation." % node)
3396 disk.children.remove(child)
3397 self.cfg.AddInstance(instance)
3400 class LUReplaceDisks(LogicalUnit):
3401 """Replace the disks of an instance.
3404 HPATH = "mirrors-replace"
3405 HTYPE = constants.HTYPE_INSTANCE
3406 _OP_REQP = ["instance_name", "mode", "disks"]
3408 def BuildHooksEnv(self):
3411 This runs on the master, the primary and all the secondaries.
3415 "MODE": self.op.mode,
3416 "NEW_SECONDARY": self.op.remote_node,
3417 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3419 env.update(_BuildInstanceHookEnvByObject(self.instance))
3421 self.sstore.GetMasterNode(),
3422 self.instance.primary_node,
3424 if self.op.remote_node is not None:
3425 nl.append(self.op.remote_node)
3428 def CheckPrereq(self):
3429 """Check prerequisites.
3431 This checks that the instance is in the cluster.
3434 instance = self.cfg.GetInstanceInfo(
3435 self.cfg.ExpandInstanceName(self.op.instance_name))
3436 if instance is None:
3437 raise errors.OpPrereqError("Instance '%s' not known" %
3438 self.op.instance_name)
3439 self.instance = instance
3440 self.op.instance_name = instance.name
3442 if instance.disk_template not in constants.DTS_NET_MIRROR:
3443 raise errors.OpPrereqError("Instance's disk layout is not"
3444 " network mirrored.")
3446 if len(instance.secondary_nodes) != 1:
3447 raise errors.OpPrereqError("The instance has a strange layout,"
3448 " expected one secondary but found %d" %
3449 len(instance.secondary_nodes))
3451 self.sec_node = instance.secondary_nodes[0]
3453 remote_node = getattr(self.op, "remote_node", None)
3454 if remote_node is not None:
3455 remote_node = self.cfg.ExpandNodeName(remote_node)
3456 if remote_node is None:
3457 raise errors.OpPrereqError("Node '%s' not known" %
3458 self.op.remote_node)
3459 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3461 self.remote_node_info = None
3462 if remote_node == instance.primary_node:
3463 raise errors.OpPrereqError("The specified node is the primary node of"
3465 elif remote_node == self.sec_node:
3466 if self.op.mode == constants.REPLACE_DISK_SEC:
3467 # this is for DRBD8, where we can't execute the same mode of
3468 # replacement as for drbd7 (no different port allocated)
3469 raise errors.OpPrereqError("Same secondary given, cannot execute"
3471 # the user gave the current secondary, switch to
3472 # 'no-replace-secondary' mode for drbd7
3474 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3475 self.op.mode != constants.REPLACE_DISK_ALL):
3476 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3477 " disks replacement, not individual ones")
3478 if instance.disk_template == constants.DT_DRBD8:
3479 if (self.op.mode == constants.REPLACE_DISK_ALL and
3480 remote_node is not None):
3481 # switch to replace secondary mode
3482 self.op.mode = constants.REPLACE_DISK_SEC
3484 if self.op.mode == constants.REPLACE_DISK_ALL:
3485 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3486 " secondary disk replacement, not"
3488 elif self.op.mode == constants.REPLACE_DISK_PRI:
3489 if remote_node is not None:
3490 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3491 " the secondary while doing a primary"
3492 " node disk replacement")
3493 self.tgt_node = instance.primary_node
3494 self.oth_node = instance.secondary_nodes[0]
3495 elif self.op.mode == constants.REPLACE_DISK_SEC:
3496 self.new_node = remote_node # this can be None, in which case
3497 # we don't change the secondary
3498 self.tgt_node = instance.secondary_nodes[0]
3499 self.oth_node = instance.primary_node
3501 raise errors.ProgrammerError("Unhandled disk replace mode")
3503 for name in self.op.disks:
3504 if instance.FindDisk(name) is None:
3505 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3506 (name, instance.name))
3507 self.op.remote_node = remote_node
3509 def _ExecRR1(self, feedback_fn):
3510 """Replace the disks of an instance.
3513 instance = self.instance
3516 if self.op.remote_node is None:
3517 remote_node = self.sec_node
3519 remote_node = self.op.remote_node
3521 for dev in instance.disks:
3523 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3524 names = _GenerateUniqueNames(cfg, lv_names)
3525 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3526 remote_node, size, names)
3527 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3528 logger.Info("adding new mirror component on secondary for %s" %
3531 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3533 _GetInstanceInfoText(instance)):
3534 raise errors.OpExecError("Failed to create new component on secondary"
3535 " node %s. Full abort, cleanup manually!" %
3538 logger.Info("adding new mirror component on primary")
3540 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3542 _GetInstanceInfoText(instance)):
3543 # remove secondary dev
3544 cfg.SetDiskID(new_drbd, remote_node)
3545 rpc.call_blockdev_remove(remote_node, new_drbd)
3546 raise errors.OpExecError("Failed to create volume on primary!"
3547 " Full abort, cleanup manually!!")
3549 # the device exists now
3550 # call the primary node to add the mirror to md
3551 logger.Info("adding new mirror component to md")
3552 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3554 logger.Error("Can't add mirror compoment to md!")
3555 cfg.SetDiskID(new_drbd, remote_node)
3556 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3557 logger.Error("Can't rollback on secondary")
3558 cfg.SetDiskID(new_drbd, instance.primary_node)
3559 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3560 logger.Error("Can't rollback on primary")
3561 raise errors.OpExecError("Full abort, cleanup manually!!")
3563 dev.children.append(new_drbd)
3564 cfg.AddInstance(instance)
3566 # this can fail as the old devices are degraded and _WaitForSync
3567 # does a combined result over all disks, so we don't check its
3569 _WaitForSync(cfg, instance, self.proc, unlock=True)
3571 # so check manually all the devices
3572 for name in iv_names:
3573 dev, child, new_drbd = iv_names[name]
3574 cfg.SetDiskID(dev, instance.primary_node)
3575 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3577 raise errors.OpExecError("MD device %s is degraded!" % name)
3578 cfg.SetDiskID(new_drbd, instance.primary_node)
3579 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3581 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3583 for name in iv_names:
3584 dev, child, new_drbd = iv_names[name]
3585 logger.Info("remove mirror %s component" % name)
3586 cfg.SetDiskID(dev, instance.primary_node)
3587 if not rpc.call_blockdev_removechildren(instance.primary_node,
3589 logger.Error("Can't remove child from mirror, aborting"
3590 " *this device cleanup*.\nYou need to cleanup manually!!")
3593 for node in child.logical_id[:2]:
3594 logger.Info("remove child device on %s" % node)
3595 cfg.SetDiskID(child, node)
3596 if not rpc.call_blockdev_remove(node, child):
3597 logger.Error("Warning: failed to remove device from node %s,"
3598 " continuing operation." % node)
3600 dev.children.remove(child)
3602 cfg.AddInstance(instance)
3604 def _ExecD8DiskOnly(self, feedback_fn):
3605 """Replace a disk on the primary or secondary for dbrd8.
3607 The algorithm for replace is quite complicated:
3608 - for each disk to be replaced:
3609 - create new LVs on the target node with unique names
3610 - detach old LVs from the drbd device
3611 - rename old LVs to name_replaced.<time_t>
3612 - rename new LVs to old LVs
3613 - attach the new LVs (with the old names now) to the drbd device
3614 - wait for sync across all devices
3615 - for each modified disk:
3616 - remove old LVs (which have the name name_replaces.<time_t>)
3618 Failures are not very well handled.
3622 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3623 instance = self.instance
3625 vgname = self.cfg.GetVGName()
3628 tgt_node = self.tgt_node
3629 oth_node = self.oth_node
3631 # Step: check device activation
3632 self.proc.LogStep(1, steps_total, "check device existence")
3633 info("checking volume groups")
3634 my_vg = cfg.GetVGName()
3635 results = rpc.call_vg_list([oth_node, tgt_node])
3637 raise errors.OpExecError("Can't list volume groups on the nodes")
3638 for node in oth_node, tgt_node:
3639 res = results.get(node, False)
3640 if not res or my_vg not in res:
3641 raise errors.OpExecError("Volume group '%s' not found on %s" %
3643 for dev in instance.disks:
3644 if not dev.iv_name in self.op.disks:
3646 for node in tgt_node, oth_node:
3647 info("checking %s on %s" % (dev.iv_name, node))
3648 cfg.SetDiskID(dev, node)
3649 if not rpc.call_blockdev_find(node, dev):
3650 raise errors.OpExecError("Can't find device %s on node %s" %
3651 (dev.iv_name, node))
3653 # Step: check other node consistency
3654 self.proc.LogStep(2, steps_total, "check peer consistency")
3655 for dev in instance.disks:
3656 if not dev.iv_name in self.op.disks:
3658 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3659 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3660 oth_node==instance.primary_node):
3661 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3662 " to replace disks on this node (%s)" %
3663 (oth_node, tgt_node))
3665 # Step: create new storage
3666 self.proc.LogStep(3, steps_total, "allocate new storage")
3667 for dev in instance.disks:
3668 if not dev.iv_name in self.op.disks:
3671 cfg.SetDiskID(dev, tgt_node)
3672 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3673 names = _GenerateUniqueNames(cfg, lv_names)
3674 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3675 logical_id=(vgname, names[0]))
3676 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3677 logical_id=(vgname, names[1]))
3678 new_lvs = [lv_data, lv_meta]
3679 old_lvs = dev.children
3680 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3681 info("creating new local storage on %s for %s" %
3682 (tgt_node, dev.iv_name))
3683 # since we *always* want to create this LV, we use the
3684 # _Create...OnPrimary (which forces the creation), even if we
3685 # are talking about the secondary node
3686 for new_lv in new_lvs:
3687 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3688 _GetInstanceInfoText(instance)):
3689 raise errors.OpExecError("Failed to create new LV named '%s' on"
3691 (new_lv.logical_id[1], tgt_node))
3693 # Step: for each lv, detach+rename*2+attach
3694 self.proc.LogStep(4, steps_total, "change drbd configuration")
3695 for dev, old_lvs, new_lvs in iv_names.itervalues():
3696 info("detaching %s drbd from local storage" % dev.iv_name)
3697 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3698 raise errors.OpExecError("Can't detach drbd from local storage on node"
3699 " %s for device %s" % (tgt_node, dev.iv_name))
3701 #cfg.Update(instance)
3703 # ok, we created the new LVs, so now we know we have the needed
3704 # storage; as such, we proceed on the target node to rename
3705 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3706 # using the assumption than logical_id == physical_id (which in
3707 # turn is the unique_id on that node)
3709 # FIXME(iustin): use a better name for the replaced LVs
3710 temp_suffix = int(time.time())
3711 ren_fn = lambda d, suff: (d.physical_id[0],
3712 d.physical_id[1] + "_replaced-%s" % suff)
3713 # build the rename list based on what LVs exist on the node
3715 for to_ren in old_lvs:
3716 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3717 if find_res is not None: # device exists
3718 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3720 info("renaming the old LVs on the target node")
3721 if not rpc.call_blockdev_rename(tgt_node, rlist):
3722 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3723 # now we rename the new LVs to the old LVs
3724 info("renaming the new LVs on the target node")
3725 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3726 if not rpc.call_blockdev_rename(tgt_node, rlist):
3727 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3729 for old, new in zip(old_lvs, new_lvs):
3730 new.logical_id = old.logical_id
3731 cfg.SetDiskID(new, tgt_node)
3733 for disk in old_lvs:
3734 disk.logical_id = ren_fn(disk, temp_suffix)
3735 cfg.SetDiskID(disk, tgt_node)
3737 # now that the new lvs have the old name, we can add them to the device
3738 info("adding new mirror component on %s" % tgt_node)
3739 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3740 for new_lv in new_lvs:
3741 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3742 warning("Can't rollback device %s", hint="manually cleanup unused"
3744 raise errors.OpExecError("Can't add local storage to drbd")
3746 dev.children = new_lvs
3747 cfg.Update(instance)
3749 # Step: wait for sync
3751 # this can fail as the old devices are degraded and _WaitForSync
3752 # does a combined result over all disks, so we don't check its
3754 self.proc.LogStep(5, steps_total, "sync devices")
3755 _WaitForSync(cfg, instance, self.proc, unlock=True)
3757 # so check manually all the devices
3758 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3759 cfg.SetDiskID(dev, instance.primary_node)
3760 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3762 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3764 # Step: remove old storage
3765 self.proc.LogStep(6, steps_total, "removing old storage")
3766 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3767 info("remove logical volumes for %s" % name)
3769 cfg.SetDiskID(lv, tgt_node)
3770 if not rpc.call_blockdev_remove(tgt_node, lv):
3771 warning("Can't remove old LV", hint="manually remove unused LVs")
3774 def _ExecD8Secondary(self, feedback_fn):
3775 """Replace the secondary node for drbd8.
3777 The algorithm for replace is quite complicated:
3778 - for all disks of the instance:
3779 - create new LVs on the new node with same names
3780 - shutdown the drbd device on the old secondary
3781 - disconnect the drbd network on the primary
3782 - create the drbd device on the new secondary
3783 - network attach the drbd on the primary, using an artifice:
3784 the drbd code for Attach() will connect to the network if it
3785 finds a device which is connected to the good local disks but
3787 - wait for sync across all devices
3788 - remove all disks from the old secondary
3790 Failures are not very well handled.
3794 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3795 instance = self.instance
3797 vgname = self.cfg.GetVGName()
3800 old_node = self.tgt_node
3801 new_node = self.new_node
3802 pri_node = instance.primary_node
3804 # Step: check device activation
3805 self.proc.LogStep(1, steps_total, "check device existence")
3806 info("checking volume groups")
3807 my_vg = cfg.GetVGName()
3808 results = rpc.call_vg_list([pri_node, new_node])
3810 raise errors.OpExecError("Can't list volume groups on the nodes")
3811 for node in pri_node, new_node:
3812 res = results.get(node, False)
3813 if not res or my_vg not in res:
3814 raise errors.OpExecError("Volume group '%s' not found on %s" %
3816 for dev in instance.disks:
3817 if not dev.iv_name in self.op.disks:
3819 info("checking %s on %s" % (dev.iv_name, pri_node))
3820 cfg.SetDiskID(dev, pri_node)
3821 if not rpc.call_blockdev_find(pri_node, dev):
3822 raise errors.OpExecError("Can't find device %s on node %s" %
3823 (dev.iv_name, pri_node))
3825 # Step: check other node consistency
3826 self.proc.LogStep(2, steps_total, "check peer consistency")
3827 for dev in instance.disks:
3828 if not dev.iv_name in self.op.disks:
3830 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3831 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3832 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3833 " unsafe to replace the secondary" %
3836 # Step: create new storage
3837 self.proc.LogStep(3, steps_total, "allocate new storage")
3838 for dev in instance.disks:
3840 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3841 # since we *always* want to create this LV, we use the
3842 # _Create...OnPrimary (which forces the creation), even if we
3843 # are talking about the secondary node
3844 for new_lv in dev.children:
3845 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3846 _GetInstanceInfoText(instance)):
3847 raise errors.OpExecError("Failed to create new LV named '%s' on"
3849 (new_lv.logical_id[1], new_node))
3851 iv_names[dev.iv_name] = (dev, dev.children)
3853 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3854 for dev in instance.disks:
3856 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3857 # create new devices on new_node
3858 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3859 logical_id=(pri_node, new_node,
3861 children=dev.children)
3862 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3864 _GetInstanceInfoText(instance)):
3865 raise errors.OpExecError("Failed to create new DRBD on"
3866 " node '%s'" % new_node)
3868 for dev in instance.disks:
3869 # we have new devices, shutdown the drbd on the old secondary
3870 info("shutting down drbd for %s on old node" % dev.iv_name)
3871 cfg.SetDiskID(dev, old_node)
3872 if not rpc.call_blockdev_shutdown(old_node, dev):
3873 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3874 hint="Please cleanup this device manually as soon as possible")
3876 info("detaching primary drbds from the network (=> standalone)")
3878 for dev in instance.disks:
3879 cfg.SetDiskID(dev, pri_node)
3880 # set the physical (unique in bdev terms) id to None, meaning
3881 # detach from network
3882 dev.physical_id = (None,) * len(dev.physical_id)
3883 # and 'find' the device, which will 'fix' it to match the
3885 if rpc.call_blockdev_find(pri_node, dev):
3888 warning("Failed to detach drbd %s from network, unusual case" %
3892 # no detaches succeeded (very unlikely)
3893 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3895 # if we managed to detach at least one, we update all the disks of
3896 # the instance to point to the new secondary
3897 info("updating instance configuration")
3898 for dev in instance.disks:
3899 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3900 cfg.SetDiskID(dev, pri_node)
3901 cfg.Update(instance)
3903 # and now perform the drbd attach
3904 info("attaching primary drbds to new secondary (standalone => connected)")
3906 for dev in instance.disks:
3907 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3908 # since the attach is smart, it's enough to 'find' the device,
3909 # it will automatically activate the network, if the physical_id
3911 cfg.SetDiskID(dev, pri_node)
3912 if not rpc.call_blockdev_find(pri_node, dev):
3913 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3914 "please do a gnt-instance info to see the status of disks")
3916 # this can fail as the old devices are degraded and _WaitForSync
3917 # does a combined result over all disks, so we don't check its
3919 self.proc.LogStep(5, steps_total, "sync devices")
3920 _WaitForSync(cfg, instance, self.proc, unlock=True)
3922 # so check manually all the devices
3923 for name, (dev, old_lvs) in iv_names.iteritems():
3924 cfg.SetDiskID(dev, pri_node)
3925 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3927 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3929 self.proc.LogStep(6, steps_total, "removing old storage")
3930 for name, (dev, old_lvs) in iv_names.iteritems():
3931 info("remove logical volumes for %s" % name)
3933 cfg.SetDiskID(lv, old_node)
3934 if not rpc.call_blockdev_remove(old_node, lv):
3935 warning("Can't remove LV on old secondary",
3936 hint="Cleanup stale volumes by hand")
3938 def Exec(self, feedback_fn):
3939 """Execute disk replacement.
3941 This dispatches the disk replacement to the appropriate handler.
3944 instance = self.instance
3945 if instance.disk_template == constants.DT_REMOTE_RAID1:
3947 elif instance.disk_template == constants.DT_DRBD8:
3948 if self.op.remote_node is None:
3949 fn = self._ExecD8DiskOnly
3951 fn = self._ExecD8Secondary
3953 raise errors.ProgrammerError("Unhandled disk replacement case")
3954 return fn(feedback_fn)
3957 class LUQueryInstanceData(NoHooksLU):
3958 """Query runtime instance data.
3961 _OP_REQP = ["instances"]
3963 def CheckPrereq(self):
3964 """Check prerequisites.
3966 This only checks the optional instance list against the existing names.
3969 if not isinstance(self.op.instances, list):
3970 raise errors.OpPrereqError("Invalid argument type 'instances'")
3971 if self.op.instances:
3972 self.wanted_instances = []
3973 names = self.op.instances
3975 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3976 if instance is None:
3977 raise errors.OpPrereqError("No such instance name '%s'" % name)
3978 self.wanted_instances.append(instance)
3980 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3981 in self.cfg.GetInstanceList()]
3985 def _ComputeDiskStatus(self, instance, snode, dev):
3986 """Compute block device status.
3989 self.cfg.SetDiskID(dev, instance.primary_node)
3990 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3991 if dev.dev_type in constants.LDS_DRBD:
3992 # we change the snode then (otherwise we use the one passed in)
3993 if dev.logical_id[0] == instance.primary_node:
3994 snode = dev.logical_id[1]
3996 snode = dev.logical_id[0]
3999 self.cfg.SetDiskID(dev, snode)
4000 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4005 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4006 for child in dev.children]
4011 "iv_name": dev.iv_name,
4012 "dev_type": dev.dev_type,
4013 "logical_id": dev.logical_id,
4014 "physical_id": dev.physical_id,
4015 "pstatus": dev_pstatus,
4016 "sstatus": dev_sstatus,
4017 "children": dev_children,
4022 def Exec(self, feedback_fn):
4023 """Gather and return data"""
4025 for instance in self.wanted_instances:
4026 remote_info = rpc.call_instance_info(instance.primary_node,
4028 if remote_info and "state" in remote_info:
4031 remote_state = "down"
4032 if instance.status == "down":
4033 config_state = "down"
4037 disks = [self._ComputeDiskStatus(instance, None, device)
4038 for device in instance.disks]
4041 "name": instance.name,
4042 "config_state": config_state,
4043 "run_state": remote_state,
4044 "pnode": instance.primary_node,
4045 "snodes": instance.secondary_nodes,
4047 "memory": instance.memory,
4048 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4050 "network_port": instance.network_port,
4051 "vcpus": instance.vcpus,
4054 result[instance.name] = idict
4059 class LUSetInstanceParms(LogicalUnit):
4060 """Modifies an instances's parameters.
4063 HPATH = "instance-modify"
4064 HTYPE = constants.HTYPE_INSTANCE
4065 _OP_REQP = ["instance_name"]
4067 def BuildHooksEnv(self):
4070 This runs on the master, primary and secondaries.
4075 args['memory'] = self.mem
4077 args['vcpus'] = self.vcpus
4078 if self.do_ip or self.do_bridge:
4082 ip = self.instance.nics[0].ip
4084 bridge = self.bridge
4086 bridge = self.instance.nics[0].bridge
4087 args['nics'] = [(ip, bridge)]
4088 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4089 nl = [self.sstore.GetMasterNode(),
4090 self.instance.primary_node] + list(self.instance.secondary_nodes)
4093 def CheckPrereq(self):
4094 """Check prerequisites.
4096 This only checks the instance list against the existing names.
4099 self.mem = getattr(self.op, "mem", None)
4100 self.vcpus = getattr(self.op, "vcpus", None)
4101 self.ip = getattr(self.op, "ip", None)
4102 self.mac = getattr(self.op, "mac", None)
4103 self.bridge = getattr(self.op, "bridge", None)
4104 if [self.mem, self.vcpus, self.ip, self.bridge, self.mac].count(None) == 5:
4105 raise errors.OpPrereqError("No changes submitted")
4106 if self.mem is not None:
4108 self.mem = int(self.mem)
4109 except ValueError, err:
4110 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4111 if self.vcpus is not None:
4113 self.vcpus = int(self.vcpus)
4114 except ValueError, err:
4115 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4116 if self.ip is not None:
4118 if self.ip.lower() == "none":
4121 if not utils.IsValidIP(self.ip):
4122 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4125 self.do_bridge = (self.bridge is not None)
4126 if self.mac is not None:
4127 if self.cfg.IsMacInUse(self.mac):
4128 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4130 if not utils.IsValidMac(self.mac):
4131 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4133 instance = self.cfg.GetInstanceInfo(
4134 self.cfg.ExpandInstanceName(self.op.instance_name))
4135 if instance is None:
4136 raise errors.OpPrereqError("No such instance name '%s'" %
4137 self.op.instance_name)
4138 self.op.instance_name = instance.name
4139 self.instance = instance
4142 def Exec(self, feedback_fn):
4143 """Modifies an instance.
4145 All parameters take effect only at the next restart of the instance.
4148 instance = self.instance
4150 instance.memory = self.mem
4151 result.append(("mem", self.mem))
4153 instance.vcpus = self.vcpus
4154 result.append(("vcpus", self.vcpus))
4156 instance.nics[0].ip = self.ip
4157 result.append(("ip", self.ip))
4159 instance.nics[0].bridge = self.bridge
4160 result.append(("bridge", self.bridge))
4162 instance.nics[0].mac = self.mac
4163 result.append(("mac", self.mac))
4165 self.cfg.AddInstance(instance)
4170 class LUQueryExports(NoHooksLU):
4171 """Query the exports list
4176 def CheckPrereq(self):
4177 """Check that the nodelist contains only existing nodes.
4180 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4182 def Exec(self, feedback_fn):
4183 """Compute the list of all the exported system images.
4186 a dictionary with the structure node->(export-list)
4187 where export-list is a list of the instances exported on
4191 return rpc.call_export_list(self.nodes)
4194 class LUExportInstance(LogicalUnit):
4195 """Export an instance to an image in the cluster.
4198 HPATH = "instance-export"
4199 HTYPE = constants.HTYPE_INSTANCE
4200 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4202 def BuildHooksEnv(self):
4205 This will run on the master, primary node and target node.
4209 "EXPORT_NODE": self.op.target_node,
4210 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4212 env.update(_BuildInstanceHookEnvByObject(self.instance))
4213 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4214 self.op.target_node]
4217 def CheckPrereq(self):
4218 """Check prerequisites.
4220 This checks that the instance name is a valid one.
4223 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4224 self.instance = self.cfg.GetInstanceInfo(instance_name)
4225 if self.instance is None:
4226 raise errors.OpPrereqError("Instance '%s' not found" %
4227 self.op.instance_name)
4230 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4231 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4233 if self.dst_node is None:
4234 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4235 self.op.target_node)
4236 self.op.target_node = self.dst_node.name
4238 def Exec(self, feedback_fn):
4239 """Export an instance to an image in the cluster.
4242 instance = self.instance
4243 dst_node = self.dst_node
4244 src_node = instance.primary_node
4245 # shutdown the instance, unless requested not to do so
4246 if self.op.shutdown:
4247 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4248 self.proc.ChainOpCode(op)
4250 vgname = self.cfg.GetVGName()
4255 for disk in instance.disks:
4256 if disk.iv_name == "sda":
4257 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4258 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4260 if not new_dev_name:
4261 logger.Error("could not snapshot block device %s on node %s" %
4262 (disk.logical_id[1], src_node))
4264 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4265 logical_id=(vgname, new_dev_name),
4266 physical_id=(vgname, new_dev_name),
4267 iv_name=disk.iv_name)
4268 snap_disks.append(new_dev)
4271 if self.op.shutdown:
4272 op = opcodes.OpStartupInstance(instance_name=instance.name,
4274 self.proc.ChainOpCode(op)
4276 # TODO: check for size
4278 for dev in snap_disks:
4279 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4281 logger.Error("could not export block device %s from node"
4283 (dev.logical_id[1], src_node, dst_node.name))
4284 if not rpc.call_blockdev_remove(src_node, dev):
4285 logger.Error("could not remove snapshot block device %s from"
4286 " node %s" % (dev.logical_id[1], src_node))
4288 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4289 logger.Error("could not finalize export for instance %s on node %s" %
4290 (instance.name, dst_node.name))
4292 nodelist = self.cfg.GetNodeList()
4293 nodelist.remove(dst_node.name)
4295 # on one-node clusters nodelist will be empty after the removal
4296 # if we proceed the backup would be removed because OpQueryExports
4297 # substitutes an empty list with the full cluster node list.
4299 op = opcodes.OpQueryExports(nodes=nodelist)
4300 exportlist = self.proc.ChainOpCode(op)
4301 for node in exportlist:
4302 if instance.name in exportlist[node]:
4303 if not rpc.call_export_remove(node, instance.name):
4304 logger.Error("could not remove older export for instance %s"
4305 " on node %s" % (instance.name, node))
4308 class TagsLU(NoHooksLU):
4311 This is an abstract class which is the parent of all the other tags LUs.
4314 def CheckPrereq(self):
4315 """Check prerequisites.
4318 if self.op.kind == constants.TAG_CLUSTER:
4319 self.target = self.cfg.GetClusterInfo()
4320 elif self.op.kind == constants.TAG_NODE:
4321 name = self.cfg.ExpandNodeName(self.op.name)
4323 raise errors.OpPrereqError("Invalid node name (%s)" %
4326 self.target = self.cfg.GetNodeInfo(name)
4327 elif self.op.kind == constants.TAG_INSTANCE:
4328 name = self.cfg.ExpandInstanceName(self.op.name)
4330 raise errors.OpPrereqError("Invalid instance name (%s)" %
4333 self.target = self.cfg.GetInstanceInfo(name)
4335 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4339 class LUGetTags(TagsLU):
4340 """Returns the tags of a given object.
4343 _OP_REQP = ["kind", "name"]
4345 def Exec(self, feedback_fn):
4346 """Returns the tag list.
4349 return self.target.GetTags()
4352 class LUSearchTags(NoHooksLU):
4353 """Searches the tags for a given pattern.
4356 _OP_REQP = ["pattern"]
4358 def CheckPrereq(self):
4359 """Check prerequisites.
4361 This checks the pattern passed for validity by compiling it.
4365 self.re = re.compile(self.op.pattern)
4366 except re.error, err:
4367 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4368 (self.op.pattern, err))
4370 def Exec(self, feedback_fn):
4371 """Returns the tag list.
4375 tgts = [("/cluster", cfg.GetClusterInfo())]
4376 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4377 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4378 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4379 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4381 for path, target in tgts:
4382 for tag in target.GetTags():
4383 if self.re.search(tag):
4384 results.append((path, tag))
4388 class LUAddTags(TagsLU):
4389 """Sets a tag on a given object.
4392 _OP_REQP = ["kind", "name", "tags"]
4394 def CheckPrereq(self):
4395 """Check prerequisites.
4397 This checks the type and length of the tag name and value.
4400 TagsLU.CheckPrereq(self)
4401 for tag in self.op.tags:
4402 objects.TaggableObject.ValidateTag(tag)
4404 def Exec(self, feedback_fn):
4409 for tag in self.op.tags:
4410 self.target.AddTag(tag)
4411 except errors.TagError, err:
4412 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4414 self.cfg.Update(self.target)
4415 except errors.ConfigurationError:
4416 raise errors.OpRetryError("There has been a modification to the"
4417 " config file and the operation has been"
4418 " aborted. Please retry.")
4421 class LUDelTags(TagsLU):
4422 """Delete a list of tags from a given object.
4425 _OP_REQP = ["kind", "name", "tags"]
4427 def CheckPrereq(self):
4428 """Check prerequisites.
4430 This checks that we have the given tag.
4433 TagsLU.CheckPrereq(self)
4434 for tag in self.op.tags:
4435 objects.TaggableObject.ValidateTag(tag)
4436 del_tags = frozenset(self.op.tags)
4437 cur_tags = self.target.GetTags()
4438 if not del_tags <= cur_tags:
4439 diff_tags = del_tags - cur_tags
4440 diff_names = ["'%s'" % tag for tag in diff_tags]
4442 raise errors.OpPrereqError("Tag(s) %s not found" %
4443 (",".join(diff_names)))
4445 def Exec(self, feedback_fn):
4446 """Remove the tag from the object.
4449 for tag in self.op.tags:
4450 self.target.RemoveTag(tag)
4452 self.cfg.Update(self.target)
4453 except errors.ConfigurationError:
4454 raise errors.OpRetryError("There has been a modification to the"
4455 " config file and the operation has been"
4456 " aborted. Please retry.")