4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
278 env["INSTANCE_NIC_COUNT"] = nic_count
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284 """Builds instance related env variables for hooks from an object.
287 instance: objects.Instance object of instance
288 override: dict of values to override
291 'name': instance.name,
292 'primary_node': instance.primary_node,
293 'secondary_nodes': instance.secondary_nodes,
294 'os_type': instance.os,
295 'status': instance.os,
296 'memory': instance.memory,
297 'vcpus': instance.vcpus,
298 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
301 args.update(override)
302 return _BuildInstanceHookEnv(**args)
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306 """Ensure a node has a correct known_hosts entry.
309 fullnode - Fully qualified domain name of host. (str)
310 ip - IPv4 address of host (str)
311 pubkey - the public key of the cluster
314 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
326 logger.Debug('read %s' % (repr(rawline),))
328 parts = rawline.rstrip('\r\n').split()
330 # Ignore unwanted lines
331 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332 fields = parts[0].split(',')
337 for spec in [ ip, fullnode ]:
338 if spec not in fields:
343 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344 if haveall and key == pubkey:
346 save_lines.append(rawline)
347 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350 if havesome and (not haveall or key != pubkey):
352 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355 save_lines.append(rawline)
358 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362 save_lines = save_lines + add_lines
364 # Write a new file and replace old.
365 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367 newfile = os.fdopen(fd, 'w')
369 newfile.write(''.join(save_lines))
372 logger.Debug("Wrote new known_hosts.")
373 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376 # Simply appending a new line will do the trick.
378 for add in add_lines:
384 def _HasValidVG(vglist, vgname):
385 """Checks if the volume group list is valid.
387 A non-None return value means there's an error, and the return value
388 is the error message.
391 vgsize = vglist.get(vgname, None)
393 return "volume group '%s' missing" % vgname
395 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
400 def _InitSSHSetup(node):
401 """Setup the SSH configuration for the cluster.
404 This generates a dsa keypair for root, adds the pub key to the
405 permitted hosts and adds the hostkey to its own known hosts.
408 node: the name of this host as a fqdn
411 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413 for name in priv_key, pub_key:
414 if os.path.exists(name):
415 utils.CreateBackup(name)
416 utils.RemoveFile(name)
418 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425 f = open(pub_key, 'r')
427 utils.AddAuthorizedKey(auth_keys, f.read(8192))
432 def _InitGanetiServerSetup(ss):
433 """Setup the necessary configuration for the initial node daemon.
435 This creates the nodepass file containing the shared password for
436 the cluster and also generates the SSL certificate.
439 # Create pseudo random password
440 randpass = sha.new(os.urandom(64)).hexdigest()
441 # and write it into sstore
442 ss.SetKey(ss.SS_NODED_PASS, randpass)
444 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445 "-days", str(365*5), "-nodes", "-x509",
446 "-keyout", constants.SSL_CERT_FILE,
447 "-out", constants.SSL_CERT_FILE, "-batch"])
449 raise errors.OpExecError("could not generate server ssl cert, command"
450 " %s had exitcode %s and error message %s" %
451 (result.cmd, result.exit_code, result.output))
453 os.chmod(constants.SSL_CERT_FILE, 0400)
455 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458 raise errors.OpExecError("Could not start the node daemon, command %s"
459 " had exitcode %s and error %s" %
460 (result.cmd, result.exit_code, result.output))
463 def _CheckInstanceBridgesExist(instance):
464 """Check that the brigdes needed by an instance exist.
467 # check bridges existance
468 brlist = [nic.bridge for nic in instance.nics]
469 if not rpc.call_bridges_exist(instance.primary_node, brlist):
470 raise errors.OpPrereqError("one or more target bridges %s does not"
471 " exist on destination node '%s'" %
472 (brlist, instance.primary_node))
475 class LUInitCluster(LogicalUnit):
476 """Initialise the cluster.
479 HPATH = "cluster-init"
480 HTYPE = constants.HTYPE_CLUSTER
481 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482 "def_bridge", "master_netdev"]
485 def BuildHooksEnv(self):
488 Notes: Since we don't require a cluster, we must manually add
489 ourselves in the post-run node list.
492 env = {"OP_TARGET": self.op.cluster_name}
493 return env, [], [self.hostname.name]
495 def CheckPrereq(self):
496 """Verify that the passed name is a valid one.
499 if config.ConfigWriter.IsCluster():
500 raise errors.OpPrereqError("Cluster is already initialised")
502 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503 if not os.path.exists(constants.VNC_PASSWORD_FILE):
504 raise errors.OpPrereqError("Please prepare the cluster VNC"
506 constants.VNC_PASSWORD_FILE)
508 self.hostname = hostname = utils.HostInfo()
510 if hostname.ip.startswith("127."):
511 raise errors.OpPrereqError("This host's IP resolves to the private"
512 " range (%s). Please fix DNS or /etc/hosts." %
515 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518 constants.DEFAULT_NODED_PORT):
519 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520 " to %s,\nbut this ip address does not"
521 " belong to this host."
522 " Aborting." % hostname.ip)
524 secondary_ip = getattr(self.op, "secondary_ip", None)
525 if secondary_ip and not utils.IsValidIP(secondary_ip):
526 raise errors.OpPrereqError("Invalid secondary ip given")
528 secondary_ip != hostname.ip and
529 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530 constants.DEFAULT_NODED_PORT))):
531 raise errors.OpPrereqError("You gave %s as secondary IP,"
532 " but it does not belong to this host." %
534 self.secondary_ip = secondary_ip
536 # checks presence of the volume group given
537 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540 raise errors.OpPrereqError("Error: %s" % vgstatus)
542 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547 if self.op.hypervisor_type not in constants.HYPER_TYPES:
548 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549 self.op.hypervisor_type)
551 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554 (self.op.master_netdev,
555 result.output.strip()))
557 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559 raise errors.OpPrereqError("Init.d script '%s' missing or not"
560 " executable." % constants.NODE_INITD_SCRIPT)
562 def Exec(self, feedback_fn):
563 """Initialize the cluster.
566 clustername = self.clustername
567 hostname = self.hostname
569 # set up the simple store
570 self.sstore = ss = ssconf.SimpleStore()
571 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577 # set up the inter-node password and certificate
578 _InitGanetiServerSetup(ss)
580 # start the master ip
581 rpc.call_node_start_master(hostname.name)
583 # set up ssh config and /etc/hosts
584 f = open(constants.SSH_HOST_RSA_PUB, 'r')
589 sshkey = sshline.split(" ")[1]
591 _AddHostToEtcHosts(hostname.name)
593 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595 _InitSSHSetup(hostname.name)
597 # init of cluster config file
598 self.cfg = cfgw = config.ConfigWriter()
599 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600 sshkey, self.op.mac_prefix,
601 self.op.vg_name, self.op.def_bridge)
604 class LUDestroyCluster(NoHooksLU):
605 """Logical unit for destroying the cluster.
610 def CheckPrereq(self):
611 """Check prerequisites.
613 This checks whether the cluster is empty.
615 Any errors are signalled by raising errors.OpPrereqError.
618 master = self.sstore.GetMasterNode()
620 nodelist = self.cfg.GetNodeList()
621 if len(nodelist) != 1 or nodelist[0] != master:
622 raise errors.OpPrereqError("There are still %d node(s) in"
623 " this cluster." % (len(nodelist) - 1))
624 instancelist = self.cfg.GetInstanceList()
626 raise errors.OpPrereqError("There are still %d instance(s) in"
627 " this cluster." % len(instancelist))
629 def Exec(self, feedback_fn):
630 """Destroys the cluster.
633 master = self.sstore.GetMasterNode()
634 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635 utils.CreateBackup(priv_key)
636 utils.CreateBackup(pub_key)
637 rpc.call_node_leave_cluster(master)
640 class LUVerifyCluster(NoHooksLU):
641 """Verifies the cluster status.
646 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647 remote_version, feedback_fn):
648 """Run multiple tests against a node.
651 - compares ganeti version
652 - checks vg existance and size > 20G
653 - checks config file checksum
654 - checks ssh to other nodes
657 node: name of the node to check
658 file_list: required list of files
659 local_cksum: dictionary of local files and their checksums
662 # compares ganeti version
663 local_version = constants.PROTOCOL_VERSION
664 if not remote_version:
665 feedback_fn(" - ERROR: connection to %s failed" % (node))
668 if local_version != remote_version:
669 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
670 (local_version, node, remote_version))
673 # checks vg existance and size > 20G
677 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
681 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
686 # checks config file checksum
689 if 'filelist' not in node_result:
691 feedback_fn(" - ERROR: node hasn't returned file checksum data")
693 remote_cksum = node_result['filelist']
694 for file_name in file_list:
695 if file_name not in remote_cksum:
697 feedback_fn(" - ERROR: file '%s' missing" % file_name)
698 elif remote_cksum[file_name] != local_cksum[file_name]:
700 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
702 if 'nodelist' not in node_result:
704 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
706 if node_result['nodelist']:
708 for node in node_result['nodelist']:
709 feedback_fn(" - ERROR: communication with node '%s': %s" %
710 (node, node_result['nodelist'][node]))
711 hyp_result = node_result.get('hypervisor', None)
712 if hyp_result is not None:
713 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
716 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717 """Verify an instance.
719 This function checks to see if the required block devices are
720 available on the instance's node.
725 instancelist = self.cfg.GetInstanceList()
726 if not instance in instancelist:
727 feedback_fn(" - ERROR: instance %s not in instance list %s" %
728 (instance, instancelist))
731 instanceconfig = self.cfg.GetInstanceInfo(instance)
732 node_current = instanceconfig.primary_node
735 instanceconfig.MapLVsByNode(node_vol_should)
737 for node in node_vol_should:
738 for volume in node_vol_should[node]:
739 if node not in node_vol_is or volume not in node_vol_is[node]:
740 feedback_fn(" - ERROR: volume %s missing on node %s" %
744 if not instanceconfig.status == 'down':
745 if not instance in node_instance[node_current]:
746 feedback_fn(" - ERROR: instance %s not running on node %s" %
747 (instance, node_current))
750 for node in node_instance:
751 if (not node == node_current):
752 if instance in node_instance[node]:
753 feedback_fn(" - ERROR: instance %s should not run on node %s" %
759 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760 """Verify if there are any unknown volumes in the cluster.
762 The .os, .swap and backup volumes are ignored. All other volumes are
768 for node in node_vol_is:
769 for volume in node_vol_is[node]:
770 if node not in node_vol_should or volume not in node_vol_should[node]:
771 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
776 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777 """Verify the list of running instances.
779 This checks what instances are running but unknown to the cluster.
783 for node in node_instance:
784 for runninginstance in node_instance[node]:
785 if runninginstance not in instancelist:
786 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
787 (runninginstance, node))
791 def CheckPrereq(self):
792 """Check prerequisites.
794 This has no prerequisites.
799 def Exec(self, feedback_fn):
800 """Verify integrity of cluster, performing various test on nodes.
804 feedback_fn("* Verifying global settings")
805 for msg in self.cfg.VerifyConfig():
806 feedback_fn(" - ERROR: %s" % msg)
808 vg_name = self.cfg.GetVGName()
809 nodelist = utils.NiceSort(self.cfg.GetNodeList())
810 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
814 # FIXME: verify OS list
816 file_names = list(self.sstore.GetFileList())
817 file_names.append(constants.SSL_CERT_FILE)
818 file_names.append(constants.CLUSTER_CONF_FILE)
819 local_checksums = utils.FingerprintFiles(file_names)
821 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823 all_instanceinfo = rpc.call_instance_list(nodelist)
824 all_vglist = rpc.call_vg_list(nodelist)
825 node_verify_param = {
826 'filelist': file_names,
827 'nodelist': nodelist,
830 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831 all_rversion = rpc.call_version(nodelist)
833 for node in nodelist:
834 feedback_fn("* Verifying node %s" % node)
835 result = self._VerifyNode(node, file_names, local_checksums,
836 all_vglist[node], all_nvinfo[node],
837 all_rversion[node], feedback_fn)
841 volumeinfo = all_volumeinfo[node]
843 if isinstance(volumeinfo, basestring):
844 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
845 (node, volumeinfo[-400:].encode('string_escape')))
847 node_volume[node] = {}
848 elif not isinstance(volumeinfo, dict):
849 feedback_fn(" - ERROR: connection to %s failed" % (node,))
853 node_volume[node] = volumeinfo
856 nodeinstance = all_instanceinfo[node]
857 if type(nodeinstance) != list:
858 feedback_fn(" - ERROR: connection to %s failed" % (node,))
862 node_instance[node] = nodeinstance
866 for instance in instancelist:
867 feedback_fn("* Verifying instance %s" % instance)
868 result = self._VerifyInstance(instance, node_volume, node_instance,
872 inst_config = self.cfg.GetInstanceInfo(instance)
874 inst_config.MapLVsByNode(node_vol_should)
876 feedback_fn("* Verifying orphan volumes")
877 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
881 feedback_fn("* Verifying remaining instances")
882 result = self._VerifyOrphanInstances(instancelist, node_instance,
889 class LUVerifyDisks(NoHooksLU):
890 """Verifies the cluster disks status.
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This has no prerequisites.
903 def Exec(self, feedback_fn):
904 """Verify integrity of cluster disks.
907 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909 vg_name = self.cfg.GetVGName()
910 nodes = utils.NiceSort(self.cfg.GetNodeList())
911 instances = [self.cfg.GetInstanceInfo(name)
912 for name in self.cfg.GetInstanceList()]
915 for inst in instances:
917 if (inst.status != "up" or
918 inst.disk_template not in constants.DTS_NET_MIRROR):
920 inst.MapLVsByNode(inst_lvs)
921 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922 for node, vol_list in inst_lvs.iteritems():
924 nv_dict[(node, vol)] = inst
929 node_lvs = rpc.call_volume_list(nodes, vg_name)
936 if isinstance(lvs, basestring):
937 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939 elif not isinstance(lvs, dict):
940 logger.Info("connection to node %s failed or invalid data returned" %
942 res_nodes.append(node)
945 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946 inst = nv_dict.pop((node, lv_name), None)
947 if (not lv_online and inst is not None
948 and inst.name not in res_instances):
949 res_instances.append(inst.name)
951 # any leftover items in nv_dict are missing LVs, let's arrange the
953 for key, inst in nv_dict.iteritems():
954 if inst.name not in res_missing:
955 res_missing[inst.name] = []
956 res_missing[inst.name].append(key)
961 class LURenameCluster(LogicalUnit):
962 """Rename the cluster.
965 HPATH = "cluster-rename"
966 HTYPE = constants.HTYPE_CLUSTER
969 def BuildHooksEnv(self):
974 "OP_TARGET": self.op.sstore.GetClusterName(),
975 "NEW_NAME": self.op.name,
977 mn = self.sstore.GetMasterNode()
978 return env, [mn], [mn]
980 def CheckPrereq(self):
981 """Verify that the passed name is a valid one.
984 hostname = utils.HostInfo(self.op.name)
986 new_name = hostname.name
987 self.ip = new_ip = hostname.ip
988 old_name = self.sstore.GetClusterName()
989 old_ip = self.sstore.GetMasterIP()
990 if new_name == old_name and new_ip == old_ip:
991 raise errors.OpPrereqError("Neither the name nor the IP address of the"
992 " cluster has changed")
994 result = utils.RunCmd(["fping", "-q", new_ip])
995 if not result.failed:
996 raise errors.OpPrereqError("The given cluster IP address (%s) is"
997 " reachable on the network. Aborting." %
1000 self.op.name = new_name
1002 def Exec(self, feedback_fn):
1003 """Rename the cluster.
1006 clustername = self.op.name
1010 # shutdown the master IP
1011 master = ss.GetMasterNode()
1012 if not rpc.call_node_stop_master(master):
1013 raise errors.OpExecError("Could not disable the master role")
1017 ss.SetKey(ss.SS_MASTER_IP, ip)
1018 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020 # Distribute updated ss config to all nodes
1021 myself = self.cfg.GetNodeInfo(master)
1022 dist_nodes = self.cfg.GetNodeList()
1023 if myself.name in dist_nodes:
1024 dist_nodes.remove(myself.name)
1026 logger.Debug("Copying updated ssconf data to all nodes")
1027 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028 fname = ss.KeyToFilename(keyname)
1029 result = rpc.call_upload_file(dist_nodes, fname)
1030 for to_node in dist_nodes:
1031 if not result[to_node]:
1032 logger.Error("copy of file %s to node %s failed" %
1035 if not rpc.call_node_start_master(master):
1036 logger.Error("Could not re-enable the master role on the master,"
1037 " please restart manually.")
1040 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041 """Sleep and poll for an instance's disk to sync.
1044 if not instance.disks:
1048 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050 node = instance.primary_node
1052 for dev in instance.disks:
1053 cfgw.SetDiskID(dev, node)
1059 cumul_degraded = False
1060 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062 proc.LogWarning("Can't get any data from node %s" % node)
1065 raise errors.RemoteError("Can't contact node %s for mirror data,"
1066 " aborting." % node)
1070 for i in range(len(rstats)):
1073 proc.LogWarning("Can't compute data for node %s/%s" %
1074 (node, instance.disks[i].iv_name))
1076 # we ignore the ldisk parameter
1077 perc_done, est_time, is_degraded, _ = mstat
1078 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079 if perc_done is not None:
1081 if est_time is not None:
1082 rem_time = "%d estimated seconds remaining" % est_time
1085 rem_time = "no time estimate"
1086 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087 (instance.disks[i].iv_name, perc_done, rem_time))
1094 time.sleep(min(60, max_time))
1100 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101 return not cumul_degraded
1104 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105 """Check that mirrors are not degraded.
1107 The ldisk parameter, if True, will change the test from the
1108 is_degraded attribute (which represents overall non-ok status for
1109 the device(s)) to the ldisk (representing the local storage status).
1112 cfgw.SetDiskID(dev, node)
1119 if on_primary or dev.AssembleOnSecondary():
1120 rstats = rpc.call_blockdev_find(node, dev)
1122 logger.ToStderr("Can't get any data from node %s" % node)
1125 result = result and (not rstats[idx])
1127 for child in dev.children:
1128 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133 class LUDiagnoseOS(NoHooksLU):
1134 """Logical unit for OS diagnose/query.
1139 def CheckPrereq(self):
1140 """Check prerequisites.
1142 This always succeeds, since this is a pure query LU.
1147 def Exec(self, feedback_fn):
1148 """Compute the list of OSes.
1151 node_list = self.cfg.GetNodeList()
1152 node_data = rpc.call_os_diagnose(node_list)
1153 if node_data == False:
1154 raise errors.OpExecError("Can't gather the list of OSes")
1158 class LURemoveNode(LogicalUnit):
1159 """Logical unit for removing a node.
1162 HPATH = "node-remove"
1163 HTYPE = constants.HTYPE_NODE
1164 _OP_REQP = ["node_name"]
1166 def BuildHooksEnv(self):
1169 This doesn't run on the target node in the pre phase as a failed
1170 node would not allows itself to run.
1174 "OP_TARGET": self.op.node_name,
1175 "NODE_NAME": self.op.node_name,
1177 all_nodes = self.cfg.GetNodeList()
1178 all_nodes.remove(self.op.node_name)
1179 return env, all_nodes, all_nodes
1181 def CheckPrereq(self):
1182 """Check prerequisites.
1185 - the node exists in the configuration
1186 - it does not have primary or secondary instances
1187 - it's not the master
1189 Any errors are signalled by raising errors.OpPrereqError.
1192 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196 instance_list = self.cfg.GetInstanceList()
1198 masternode = self.sstore.GetMasterNode()
1199 if node.name == masternode:
1200 raise errors.OpPrereqError("Node is the master node,"
1201 " you need to failover first.")
1203 for instance_name in instance_list:
1204 instance = self.cfg.GetInstanceInfo(instance_name)
1205 if node.name == instance.primary_node:
1206 raise errors.OpPrereqError("Instance %s still running on the node,"
1207 " please remove first." % instance_name)
1208 if node.name in instance.secondary_nodes:
1209 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210 " please remove first." % instance_name)
1211 self.op.node_name = node.name
1214 def Exec(self, feedback_fn):
1215 """Removes the node from the cluster.
1219 logger.Info("stopping the node daemon and removing configs from node %s" %
1222 rpc.call_node_leave_cluster(node.name)
1224 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226 logger.Info("Removing node %s from config" % node.name)
1228 self.cfg.RemoveNode(node.name)
1230 _RemoveHostFromEtcHosts(node.name)
1233 class LUQueryNodes(NoHooksLU):
1234 """Logical unit for querying nodes.
1237 _OP_REQP = ["output_fields", "names"]
1239 def CheckPrereq(self):
1240 """Check prerequisites.
1242 This checks that the fields required are valid output fields.
1245 self.dynamic_fields = frozenset(["dtotal", "dfree",
1246 "mtotal", "mnode", "mfree",
1249 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250 "pinst_list", "sinst_list",
1252 dynamic=self.dynamic_fields,
1253 selected=self.op.output_fields)
1255 self.wanted = _GetWantedNodes(self, self.op.names)
1257 def Exec(self, feedback_fn):
1258 """Computes the list of nodes and their attributes.
1261 nodenames = self.wanted
1262 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264 # begin data gathering
1266 if self.dynamic_fields.intersection(self.op.output_fields):
1268 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269 for name in nodenames:
1270 nodeinfo = node_data.get(name, None)
1273 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278 "bootid": nodeinfo['bootid'],
1281 live_data[name] = {}
1283 live_data = dict.fromkeys(nodenames, {})
1285 node_to_primary = dict([(name, set()) for name in nodenames])
1286 node_to_secondary = dict([(name, set()) for name in nodenames])
1288 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289 "sinst_cnt", "sinst_list"))
1290 if inst_fields & frozenset(self.op.output_fields):
1291 instancelist = self.cfg.GetInstanceList()
1293 for instance_name in instancelist:
1294 inst = self.cfg.GetInstanceInfo(instance_name)
1295 if inst.primary_node in node_to_primary:
1296 node_to_primary[inst.primary_node].add(inst.name)
1297 for secnode in inst.secondary_nodes:
1298 if secnode in node_to_secondary:
1299 node_to_secondary[secnode].add(inst.name)
1301 # end data gathering
1304 for node in nodelist:
1306 for field in self.op.output_fields:
1309 elif field == "pinst_list":
1310 val = list(node_to_primary[node.name])
1311 elif field == "sinst_list":
1312 val = list(node_to_secondary[node.name])
1313 elif field == "pinst_cnt":
1314 val = len(node_to_primary[node.name])
1315 elif field == "sinst_cnt":
1316 val = len(node_to_secondary[node.name])
1317 elif field == "pip":
1318 val = node.primary_ip
1319 elif field == "sip":
1320 val = node.secondary_ip
1321 elif field in self.dynamic_fields:
1322 val = live_data[node.name].get(field, None)
1324 raise errors.ParameterError(field)
1325 node_output.append(val)
1326 output.append(node_output)
1331 class LUQueryNodeVolumes(NoHooksLU):
1332 """Logical unit for getting volumes on node(s).
1335 _OP_REQP = ["nodes", "output_fields"]
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1340 This checks that the fields required are valid output fields.
1343 self.nodes = _GetWantedNodes(self, self.op.nodes)
1345 _CheckOutputFields(static=["node"],
1346 dynamic=["phys", "vg", "name", "size", "instance"],
1347 selected=self.op.output_fields)
1350 def Exec(self, feedback_fn):
1351 """Computes the list of nodes and their attributes.
1354 nodenames = self.nodes
1355 volumes = rpc.call_node_volumes(nodenames)
1357 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358 in self.cfg.GetInstanceList()]
1360 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1363 for node in nodenames:
1364 if node not in volumes or not volumes[node]:
1367 node_vols = volumes[node][:]
1368 node_vols.sort(key=lambda vol: vol['dev'])
1370 for vol in node_vols:
1372 for field in self.op.output_fields:
1375 elif field == "phys":
1379 elif field == "name":
1381 elif field == "size":
1382 val = int(float(vol['size']))
1383 elif field == "instance":
1385 if node not in lv_by_node[inst]:
1387 if vol['name'] in lv_by_node[inst][node]:
1393 raise errors.ParameterError(field)
1394 node_output.append(str(val))
1396 output.append(node_output)
1401 class LUAddNode(LogicalUnit):
1402 """Logical unit for adding node to the cluster.
1406 HTYPE = constants.HTYPE_NODE
1407 _OP_REQP = ["node_name"]
1409 def BuildHooksEnv(self):
1412 This will run on all nodes before, and on all nodes + the new node after.
1416 "OP_TARGET": self.op.node_name,
1417 "NODE_NAME": self.op.node_name,
1418 "NODE_PIP": self.op.primary_ip,
1419 "NODE_SIP": self.op.secondary_ip,
1421 nodes_0 = self.cfg.GetNodeList()
1422 nodes_1 = nodes_0 + [self.op.node_name, ]
1423 return env, nodes_0, nodes_1
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the new node is not already in the config
1431 - its parameters (single/dual homed) matches the cluster
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node_name = self.op.node_name
1439 dns_data = utils.HostInfo(node_name)
1441 node = dns_data.name
1442 primary_ip = self.op.primary_ip = dns_data.ip
1443 secondary_ip = getattr(self.op, "secondary_ip", None)
1444 if secondary_ip is None:
1445 secondary_ip = primary_ip
1446 if not utils.IsValidIP(secondary_ip):
1447 raise errors.OpPrereqError("Invalid secondary IP given")
1448 self.op.secondary_ip = secondary_ip
1449 node_list = cfg.GetNodeList()
1450 if node in node_list:
1451 raise errors.OpPrereqError("Node %s is already in the configuration"
1454 for existing_node_name in node_list:
1455 existing_node = cfg.GetNodeInfo(existing_node_name)
1456 if (existing_node.primary_ip == primary_ip or
1457 existing_node.secondary_ip == primary_ip or
1458 existing_node.primary_ip == secondary_ip or
1459 existing_node.secondary_ip == secondary_ip):
1460 raise errors.OpPrereqError("New node ip address(es) conflict with"
1461 " existing node %s" % existing_node.name)
1463 # check that the type of the node (single versus dual homed) is the
1464 # same as for the master
1465 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466 master_singlehomed = myself.secondary_ip == myself.primary_ip
1467 newbie_singlehomed = secondary_ip == primary_ip
1468 if master_singlehomed != newbie_singlehomed:
1469 if master_singlehomed:
1470 raise errors.OpPrereqError("The master has no private ip but the"
1471 " new node has one")
1473 raise errors.OpPrereqError("The master has a private ip but the"
1474 " new node doesn't have one")
1476 # checks reachablity
1477 if not utils.TcpPing(utils.HostInfo().name,
1479 constants.DEFAULT_NODED_PORT):
1480 raise errors.OpPrereqError("Node not reachable by ping")
1482 if not newbie_singlehomed:
1483 # check reachability from my secondary ip to newbie's secondary ip
1484 if not utils.TcpPing(myself.secondary_ip,
1486 constants.DEFAULT_NODED_PORT):
1487 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488 " based ping to noded port")
1490 self.new_node = objects.Node(name=node,
1491 primary_ip=primary_ip,
1492 secondary_ip=secondary_ip)
1494 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497 constants.VNC_PASSWORD_FILE)
1499 def Exec(self, feedback_fn):
1500 """Adds the new node to the cluster.
1503 new_node = self.new_node
1504 node = new_node.name
1506 # set up inter-node password and certificate and restarts the node daemon
1507 gntpass = self.sstore.GetNodeDaemonPassword()
1508 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509 raise errors.OpExecError("ganeti password corruption detected")
1510 f = open(constants.SSL_CERT_FILE)
1512 gntpem = f.read(8192)
1515 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516 # so we use this to detect an invalid certificate; as long as the
1517 # cert doesn't contain this, the here-document will be correctly
1518 # parsed by the shell sequence below
1519 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521 if not gntpem.endswith("\n"):
1522 raise errors.OpExecError("PEM must end with newline")
1523 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525 # and then connect with ssh to set password and start ganeti-noded
1526 # note that all the below variables are sanitized at this point,
1527 # either by being constants or by the checks above
1529 mycommand = ("umask 077 && "
1530 "echo '%s' > '%s' && "
1531 "cat > '%s' << '!EOF.' && \n"
1532 "%s!EOF.\n%s restart" %
1533 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534 constants.SSL_CERT_FILE, gntpem,
1535 constants.NODE_INITD_SCRIPT))
1537 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539 raise errors.OpExecError("Remote command on node %s, error: %s,"
1541 (node, result.fail_reason, result.output))
1543 # check connectivity
1546 result = rpc.call_version([node])[node]
1548 if constants.PROTOCOL_VERSION == result:
1549 logger.Info("communication to node %s fine, sw version %s match" %
1552 raise errors.OpExecError("Version mismatch master version %s,"
1553 " node version %s" %
1554 (constants.PROTOCOL_VERSION, result))
1556 raise errors.OpExecError("Cannot get version from the new node")
1559 logger.Info("copy ssh key to node %s" % node)
1560 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1569 keyarray.append(f.read())
1573 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574 keyarray[3], keyarray[4], keyarray[5])
1577 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579 # Add node to our /etc/hosts, and add key to known_hosts
1580 _AddHostToEtcHosts(new_node.name)
1582 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583 self.cfg.GetHostKey())
1585 if new_node.secondary_ip != new_node.primary_ip:
1586 if not rpc.call_node_tcp_ping(new_node.name,
1587 constants.LOCALHOST_IP_ADDRESS,
1588 new_node.secondary_ip,
1589 constants.DEFAULT_NODED_PORT,
1591 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592 " you gave (%s). Please fix and re-run this"
1593 " command." % new_node.secondary_ip)
1595 success, msg = ssh.VerifyNodeHostname(node)
1597 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598 " than the one the resolver gives: %s."
1599 " Please fix and re-run this command." %
1602 # Distribute updated /etc/hosts and known_hosts to all nodes,
1603 # including the node just added
1604 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605 dist_nodes = self.cfg.GetNodeList() + [node]
1606 if myself.name in dist_nodes:
1607 dist_nodes.remove(myself.name)
1609 logger.Debug("Copying hosts and known_hosts to all nodes")
1610 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611 result = rpc.call_upload_file(dist_nodes, fname)
1612 for to_node in dist_nodes:
1613 if not result[to_node]:
1614 logger.Error("copy of file %s to node %s failed" %
1617 to_copy = ss.GetFileList()
1618 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619 to_copy.append(constants.VNC_PASSWORD_FILE)
1620 for fname in to_copy:
1621 if not ssh.CopyFileToNode(node, fname):
1622 logger.Error("could not copy file %s to node %s" % (fname, node))
1624 logger.Info("adding node %s to cluster.conf" % node)
1625 self.cfg.AddNode(new_node)
1628 class LUMasterFailover(LogicalUnit):
1629 """Failover the master node to the current node.
1631 This is a special LU in that it must run on a non-master node.
1634 HPATH = "master-failover"
1635 HTYPE = constants.HTYPE_CLUSTER
1639 def BuildHooksEnv(self):
1642 This will run on the new master only in the pre phase, and on all
1643 the nodes in the post phase.
1647 "OP_TARGET": self.new_master,
1648 "NEW_MASTER": self.new_master,
1649 "OLD_MASTER": self.old_master,
1651 return env, [self.new_master], self.cfg.GetNodeList()
1653 def CheckPrereq(self):
1654 """Check prerequisites.
1656 This checks that we are not already the master.
1659 self.new_master = utils.HostInfo().name
1660 self.old_master = self.sstore.GetMasterNode()
1662 if self.old_master == self.new_master:
1663 raise errors.OpPrereqError("This commands must be run on the node"
1664 " where you want the new master to be."
1665 " %s is already the master" %
1668 def Exec(self, feedback_fn):
1669 """Failover the master node.
1671 This command, when run on a non-master node, will cause the current
1672 master to cease being master, and the non-master to become new
1676 #TODO: do not rely on gethostname returning the FQDN
1677 logger.Info("setting master to %s, old master: %s" %
1678 (self.new_master, self.old_master))
1680 if not rpc.call_node_stop_master(self.old_master):
1681 logger.Error("could disable the master role on the old master"
1682 " %s, please disable manually" % self.old_master)
1685 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688 logger.Error("could not distribute the new simple store master file"
1689 " to the other nodes, please check.")
1691 if not rpc.call_node_start_master(self.new_master):
1692 logger.Error("could not start the master role on the new master"
1693 " %s, please check" % self.new_master)
1694 feedback_fn("Error in activating the master IP on the new master,"
1695 " please fix manually.")
1699 class LUQueryClusterInfo(NoHooksLU):
1700 """Query cluster configuration.
1706 def CheckPrereq(self):
1707 """No prerequsites needed for this LU.
1712 def Exec(self, feedback_fn):
1713 """Return cluster config.
1717 "name": self.sstore.GetClusterName(),
1718 "software_version": constants.RELEASE_VERSION,
1719 "protocol_version": constants.PROTOCOL_VERSION,
1720 "config_version": constants.CONFIG_VERSION,
1721 "os_api_version": constants.OS_API_VERSION,
1722 "export_version": constants.EXPORT_VERSION,
1723 "master": self.sstore.GetMasterNode(),
1724 "architecture": (platform.architecture()[0], platform.machine()),
1730 class LUClusterCopyFile(NoHooksLU):
1731 """Copy file to cluster.
1734 _OP_REQP = ["nodes", "filename"]
1736 def CheckPrereq(self):
1737 """Check prerequisites.
1739 It should check that the named file exists and that the given list
1743 if not os.path.exists(self.op.filename):
1744 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746 self.nodes = _GetWantedNodes(self, self.op.nodes)
1748 def Exec(self, feedback_fn):
1749 """Copy a file from master to some nodes.
1752 opts - class with options as members
1753 args - list containing a single element, the file name
1755 nodes - list containing the name of target nodes; if empty, all nodes
1758 filename = self.op.filename
1760 myname = utils.HostInfo().name
1762 for node in self.nodes:
1765 if not ssh.CopyFileToNode(node, filename):
1766 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1769 class LUDumpClusterConfig(NoHooksLU):
1770 """Return a text-representation of the cluster-config.
1775 def CheckPrereq(self):
1776 """No prerequisites.
1781 def Exec(self, feedback_fn):
1782 """Dump a representation of the cluster config to the standard output.
1785 return self.cfg.DumpConfig()
1788 class LURunClusterCommand(NoHooksLU):
1789 """Run a command on some nodes.
1792 _OP_REQP = ["command", "nodes"]
1794 def CheckPrereq(self):
1795 """Check prerequisites.
1797 It checks that the given list of nodes is valid.
1800 self.nodes = _GetWantedNodes(self, self.op.nodes)
1802 def Exec(self, feedback_fn):
1803 """Run a command on some nodes.
1807 for node in self.nodes:
1808 result = ssh.SSHCall(node, "root", self.op.command)
1809 data.append((node, result.output, result.exit_code))
1814 class LUActivateInstanceDisks(NoHooksLU):
1815 """Bring up an instance's disks.
1818 _OP_REQP = ["instance_name"]
1820 def CheckPrereq(self):
1821 """Check prerequisites.
1823 This checks that the instance is in the cluster.
1826 instance = self.cfg.GetInstanceInfo(
1827 self.cfg.ExpandInstanceName(self.op.instance_name))
1828 if instance is None:
1829 raise errors.OpPrereqError("Instance '%s' not known" %
1830 self.op.instance_name)
1831 self.instance = instance
1834 def Exec(self, feedback_fn):
1835 """Activate the disks.
1838 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840 raise errors.OpExecError("Cannot activate block devices")
1845 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846 """Prepare the block devices for an instance.
1848 This sets up the block devices on all nodes.
1851 instance: a ganeti.objects.Instance object
1852 ignore_secondaries: if true, errors on secondary nodes won't result
1853 in an error return from the function
1856 false if the operation failed
1857 list of (host, instance_visible_name, node_visible_name) if the operation
1858 suceeded with the mapping from node devices to instance devices
1862 for inst_disk in instance.disks:
1863 master_result = None
1864 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865 cfg.SetDiskID(node_disk, node)
1866 is_primary = node == instance.primary_node
1867 result = rpc.call_blockdev_assemble(node, node_disk,
1868 instance.name, is_primary)
1870 logger.Error("could not prepare block device %s on node %s"
1871 " (is_primary=%s)" %
1872 (inst_disk.iv_name, node, is_primary))
1873 if is_primary or not ignore_secondaries:
1876 master_result = result
1877 device_info.append((instance.primary_node, inst_disk.iv_name,
1880 # leave the disks configured for the primary node
1881 # this is a workaround that would be fixed better by
1882 # improving the logical/physical id handling
1883 for disk in instance.disks:
1884 cfg.SetDiskID(disk, instance.primary_node)
1886 return disks_ok, device_info
1889 def _StartInstanceDisks(cfg, instance, force):
1890 """Start the disks of an instance.
1893 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894 ignore_secondaries=force)
1896 _ShutdownInstanceDisks(instance, cfg)
1897 if force is not None and not force:
1898 logger.Error("If the message above refers to a secondary node,"
1899 " you can retry the operation using '--force'.")
1900 raise errors.OpExecError("Disk consistency error")
1903 class LUDeactivateInstanceDisks(NoHooksLU):
1904 """Shutdown an instance's disks.
1907 _OP_REQP = ["instance_name"]
1909 def CheckPrereq(self):
1910 """Check prerequisites.
1912 This checks that the instance is in the cluster.
1915 instance = self.cfg.GetInstanceInfo(
1916 self.cfg.ExpandInstanceName(self.op.instance_name))
1917 if instance is None:
1918 raise errors.OpPrereqError("Instance '%s' not known" %
1919 self.op.instance_name)
1920 self.instance = instance
1922 def Exec(self, feedback_fn):
1923 """Deactivate the disks
1926 instance = self.instance
1927 ins_l = rpc.call_instance_list([instance.primary_node])
1928 ins_l = ins_l[instance.primary_node]
1929 if not type(ins_l) is list:
1930 raise errors.OpExecError("Can't contact node '%s'" %
1931 instance.primary_node)
1933 if self.instance.name in ins_l:
1934 raise errors.OpExecError("Instance is running, can't shutdown"
1937 _ShutdownInstanceDisks(instance, self.cfg)
1940 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941 """Shutdown block devices of an instance.
1943 This does the shutdown on all nodes of the instance.
1945 If the ignore_primary is false, errors on the primary node are
1950 for disk in instance.disks:
1951 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952 cfg.SetDiskID(top_disk, node)
1953 if not rpc.call_blockdev_shutdown(node, top_disk):
1954 logger.Error("could not shutdown block device %s on node %s" %
1955 (disk.iv_name, node))
1956 if not ignore_primary or node != instance.primary_node:
1961 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1962 """Checks if a node has enough free memory.
1964 This function check if a given node has the needed amount of free
1965 memory. In case the node has less memory or we cannot get the
1966 information from the node, this function raise an OpPrereqError
1970 - cfg: a ConfigWriter instance
1971 - node: the node name
1972 - reason: string to use in the error message
1973 - requested: the amount of memory in MiB
1976 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1977 if not nodeinfo or not isinstance(nodeinfo, dict):
1978 raise errors.OpPrereqError("Could not contact node %s for resource"
1979 " information" % (node,))
1981 free_mem = nodeinfo[node].get('memory_free')
1982 if not isinstance(free_mem, int):
1983 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1984 " was '%s'" % (node, free_mem))
1985 if requested > free_mem:
1986 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1987 " needed %s MiB, available %s MiB" %
1988 (node, reason, requested, free_mem))
1991 class LUStartupInstance(LogicalUnit):
1992 """Starts an instance.
1995 HPATH = "instance-start"
1996 HTYPE = constants.HTYPE_INSTANCE
1997 _OP_REQP = ["instance_name", "force"]
1999 def BuildHooksEnv(self):
2002 This runs on master, primary and secondary nodes of the instance.
2006 "FORCE": self.op.force,
2008 env.update(_BuildInstanceHookEnvByObject(self.instance))
2009 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2010 list(self.instance.secondary_nodes))
2013 def CheckPrereq(self):
2014 """Check prerequisites.
2016 This checks that the instance is in the cluster.
2019 instance = self.cfg.GetInstanceInfo(
2020 self.cfg.ExpandInstanceName(self.op.instance_name))
2021 if instance is None:
2022 raise errors.OpPrereqError("Instance '%s' not known" %
2023 self.op.instance_name)
2025 # check bridges existance
2026 _CheckInstanceBridgesExist(instance)
2028 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2029 "starting instance %s" % instance.name,
2032 self.instance = instance
2033 self.op.instance_name = instance.name
2035 def Exec(self, feedback_fn):
2036 """Start the instance.
2039 instance = self.instance
2040 force = self.op.force
2041 extra_args = getattr(self.op, "extra_args", "")
2043 node_current = instance.primary_node
2045 _StartInstanceDisks(self.cfg, instance, force)
2047 if not rpc.call_instance_start(node_current, instance, extra_args):
2048 _ShutdownInstanceDisks(instance, self.cfg)
2049 raise errors.OpExecError("Could not start instance")
2051 self.cfg.MarkInstanceUp(instance.name)
2054 class LURebootInstance(LogicalUnit):
2055 """Reboot an instance.
2058 HPATH = "instance-reboot"
2059 HTYPE = constants.HTYPE_INSTANCE
2060 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2062 def BuildHooksEnv(self):
2065 This runs on master, primary and secondary nodes of the instance.
2069 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2071 env.update(_BuildInstanceHookEnvByObject(self.instance))
2072 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2073 list(self.instance.secondary_nodes))
2076 def CheckPrereq(self):
2077 """Check prerequisites.
2079 This checks that the instance is in the cluster.
2082 instance = self.cfg.GetInstanceInfo(
2083 self.cfg.ExpandInstanceName(self.op.instance_name))
2084 if instance is None:
2085 raise errors.OpPrereqError("Instance '%s' not known" %
2086 self.op.instance_name)
2088 # check bridges existance
2089 _CheckInstanceBridgesExist(instance)
2091 self.instance = instance
2092 self.op.instance_name = instance.name
2094 def Exec(self, feedback_fn):
2095 """Reboot the instance.
2098 instance = self.instance
2099 ignore_secondaries = self.op.ignore_secondaries
2100 reboot_type = self.op.reboot_type
2101 extra_args = getattr(self.op, "extra_args", "")
2103 node_current = instance.primary_node
2105 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2106 constants.INSTANCE_REBOOT_HARD,
2107 constants.INSTANCE_REBOOT_FULL]:
2108 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2109 (constants.INSTANCE_REBOOT_SOFT,
2110 constants.INSTANCE_REBOOT_HARD,
2111 constants.INSTANCE_REBOOT_FULL))
2113 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2114 constants.INSTANCE_REBOOT_HARD]:
2115 if not rpc.call_instance_reboot(node_current, instance,
2116 reboot_type, extra_args):
2117 raise errors.OpExecError("Could not reboot instance")
2119 if not rpc.call_instance_shutdown(node_current, instance):
2120 raise errors.OpExecError("could not shutdown instance for full reboot")
2121 _ShutdownInstanceDisks(instance, self.cfg)
2122 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2123 if not rpc.call_instance_start(node_current, instance, extra_args):
2124 _ShutdownInstanceDisks(instance, self.cfg)
2125 raise errors.OpExecError("Could not start instance for full reboot")
2127 self.cfg.MarkInstanceUp(instance.name)
2130 class LUShutdownInstance(LogicalUnit):
2131 """Shutdown an instance.
2134 HPATH = "instance-stop"
2135 HTYPE = constants.HTYPE_INSTANCE
2136 _OP_REQP = ["instance_name"]
2138 def BuildHooksEnv(self):
2141 This runs on master, primary and secondary nodes of the instance.
2144 env = _BuildInstanceHookEnvByObject(self.instance)
2145 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2146 list(self.instance.secondary_nodes))
2149 def CheckPrereq(self):
2150 """Check prerequisites.
2152 This checks that the instance is in the cluster.
2155 instance = self.cfg.GetInstanceInfo(
2156 self.cfg.ExpandInstanceName(self.op.instance_name))
2157 if instance is None:
2158 raise errors.OpPrereqError("Instance '%s' not known" %
2159 self.op.instance_name)
2160 self.instance = instance
2162 def Exec(self, feedback_fn):
2163 """Shutdown the instance.
2166 instance = self.instance
2167 node_current = instance.primary_node
2168 if not rpc.call_instance_shutdown(node_current, instance):
2169 logger.Error("could not shutdown instance")
2171 self.cfg.MarkInstanceDown(instance.name)
2172 _ShutdownInstanceDisks(instance, self.cfg)
2175 class LUReinstallInstance(LogicalUnit):
2176 """Reinstall an instance.
2179 HPATH = "instance-reinstall"
2180 HTYPE = constants.HTYPE_INSTANCE
2181 _OP_REQP = ["instance_name"]
2183 def BuildHooksEnv(self):
2186 This runs on master, primary and secondary nodes of the instance.
2189 env = _BuildInstanceHookEnvByObject(self.instance)
2190 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2191 list(self.instance.secondary_nodes))
2194 def CheckPrereq(self):
2195 """Check prerequisites.
2197 This checks that the instance is in the cluster and is not running.
2200 instance = self.cfg.GetInstanceInfo(
2201 self.cfg.ExpandInstanceName(self.op.instance_name))
2202 if instance is None:
2203 raise errors.OpPrereqError("Instance '%s' not known" %
2204 self.op.instance_name)
2205 if instance.disk_template == constants.DT_DISKLESS:
2206 raise errors.OpPrereqError("Instance '%s' has no disks" %
2207 self.op.instance_name)
2208 if instance.status != "down":
2209 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2210 self.op.instance_name)
2211 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2213 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2214 (self.op.instance_name,
2215 instance.primary_node))
2217 self.op.os_type = getattr(self.op, "os_type", None)
2218 if self.op.os_type is not None:
2220 pnode = self.cfg.GetNodeInfo(
2221 self.cfg.ExpandNodeName(instance.primary_node))
2223 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2225 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2227 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2228 " primary node" % self.op.os_type)
2230 self.instance = instance
2232 def Exec(self, feedback_fn):
2233 """Reinstall the instance.
2236 inst = self.instance
2238 if self.op.os_type is not None:
2239 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2240 inst.os = self.op.os_type
2241 self.cfg.AddInstance(inst)
2243 _StartInstanceDisks(self.cfg, inst, None)
2245 feedback_fn("Running the instance OS create scripts...")
2246 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2247 raise errors.OpExecError("Could not install OS for instance %s"
2249 (inst.name, inst.primary_node))
2251 _ShutdownInstanceDisks(inst, self.cfg)
2254 class LURenameInstance(LogicalUnit):
2255 """Rename an instance.
2258 HPATH = "instance-rename"
2259 HTYPE = constants.HTYPE_INSTANCE
2260 _OP_REQP = ["instance_name", "new_name"]
2262 def BuildHooksEnv(self):
2265 This runs on master, primary and secondary nodes of the instance.
2268 env = _BuildInstanceHookEnvByObject(self.instance)
2269 env["INSTANCE_NEW_NAME"] = self.op.new_name
2270 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2271 list(self.instance.secondary_nodes))
2274 def CheckPrereq(self):
2275 """Check prerequisites.
2277 This checks that the instance is in the cluster and is not running.
2280 instance = self.cfg.GetInstanceInfo(
2281 self.cfg.ExpandInstanceName(self.op.instance_name))
2282 if instance is None:
2283 raise errors.OpPrereqError("Instance '%s' not known" %
2284 self.op.instance_name)
2285 if instance.status != "down":
2286 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2287 self.op.instance_name)
2288 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2290 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2291 (self.op.instance_name,
2292 instance.primary_node))
2293 self.instance = instance
2295 # new name verification
2296 name_info = utils.HostInfo(self.op.new_name)
2298 self.op.new_name = new_name = name_info.name
2299 if not getattr(self.op, "ignore_ip", False):
2300 command = ["fping", "-q", name_info.ip]
2301 result = utils.RunCmd(command)
2302 if not result.failed:
2303 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2304 (name_info.ip, new_name))
2307 def Exec(self, feedback_fn):
2308 """Reinstall the instance.
2311 inst = self.instance
2312 old_name = inst.name
2314 self.cfg.RenameInstance(inst.name, self.op.new_name)
2316 # re-read the instance from the configuration after rename
2317 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2319 _StartInstanceDisks(self.cfg, inst, None)
2321 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2323 msg = ("Could run OS rename script for instance %s on node %s (but the"
2324 " instance has been renamed in Ganeti)" %
2325 (inst.name, inst.primary_node))
2328 _ShutdownInstanceDisks(inst, self.cfg)
2331 class LURemoveInstance(LogicalUnit):
2332 """Remove an instance.
2335 HPATH = "instance-remove"
2336 HTYPE = constants.HTYPE_INSTANCE
2337 _OP_REQP = ["instance_name"]
2339 def BuildHooksEnv(self):
2342 This runs on master, primary and secondary nodes of the instance.
2345 env = _BuildInstanceHookEnvByObject(self.instance)
2346 nl = [self.sstore.GetMasterNode()]
2349 def CheckPrereq(self):
2350 """Check prerequisites.
2352 This checks that the instance is in the cluster.
2355 instance = self.cfg.GetInstanceInfo(
2356 self.cfg.ExpandInstanceName(self.op.instance_name))
2357 if instance is None:
2358 raise errors.OpPrereqError("Instance '%s' not known" %
2359 self.op.instance_name)
2360 self.instance = instance
2362 def Exec(self, feedback_fn):
2363 """Remove the instance.
2366 instance = self.instance
2367 logger.Info("shutting down instance %s on node %s" %
2368 (instance.name, instance.primary_node))
2370 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2371 if self.op.ignore_failures:
2372 feedback_fn("Warning: can't shutdown instance")
2374 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2375 (instance.name, instance.primary_node))
2377 logger.Info("removing block devices for instance %s" % instance.name)
2379 if not _RemoveDisks(instance, self.cfg):
2380 if self.op.ignore_failures:
2381 feedback_fn("Warning: can't remove instance's disks")
2383 raise errors.OpExecError("Can't remove instance's disks")
2385 logger.Info("removing instance %s out of cluster config" % instance.name)
2387 self.cfg.RemoveInstance(instance.name)
2390 class LUQueryInstances(NoHooksLU):
2391 """Logical unit for querying instances.
2394 _OP_REQP = ["output_fields", "names"]
2396 def CheckPrereq(self):
2397 """Check prerequisites.
2399 This checks that the fields required are valid output fields.
2402 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2403 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2404 "admin_state", "admin_ram",
2405 "disk_template", "ip", "mac", "bridge",
2406 "sda_size", "sdb_size", "vcpus"],
2407 dynamic=self.dynamic_fields,
2408 selected=self.op.output_fields)
2410 self.wanted = _GetWantedInstances(self, self.op.names)
2412 def Exec(self, feedback_fn):
2413 """Computes the list of nodes and their attributes.
2416 instance_names = self.wanted
2417 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2420 # begin data gathering
2422 nodes = frozenset([inst.primary_node for inst in instance_list])
2425 if self.dynamic_fields.intersection(self.op.output_fields):
2427 node_data = rpc.call_all_instances_info(nodes)
2429 result = node_data[name]
2431 live_data.update(result)
2432 elif result == False:
2433 bad_nodes.append(name)
2434 # else no instance is alive
2436 live_data = dict([(name, {}) for name in instance_names])
2438 # end data gathering
2441 for instance in instance_list:
2443 for field in self.op.output_fields:
2448 elif field == "pnode":
2449 val = instance.primary_node
2450 elif field == "snodes":
2451 val = list(instance.secondary_nodes)
2452 elif field == "admin_state":
2453 val = (instance.status != "down")
2454 elif field == "oper_state":
2455 if instance.primary_node in bad_nodes:
2458 val = bool(live_data.get(instance.name))
2459 elif field == "admin_ram":
2460 val = instance.memory
2461 elif field == "oper_ram":
2462 if instance.primary_node in bad_nodes:
2464 elif instance.name in live_data:
2465 val = live_data[instance.name].get("memory", "?")
2468 elif field == "disk_template":
2469 val = instance.disk_template
2471 val = instance.nics[0].ip
2472 elif field == "bridge":
2473 val = instance.nics[0].bridge
2474 elif field == "mac":
2475 val = instance.nics[0].mac
2476 elif field == "sda_size" or field == "sdb_size":
2477 disk = instance.FindDisk(field[:3])
2482 elif field == "vcpus":
2483 val = instance.vcpus
2485 raise errors.ParameterError(field)
2492 class LUFailoverInstance(LogicalUnit):
2493 """Failover an instance.
2496 HPATH = "instance-failover"
2497 HTYPE = constants.HTYPE_INSTANCE
2498 _OP_REQP = ["instance_name", "ignore_consistency"]
2500 def BuildHooksEnv(self):
2503 This runs on master, primary and secondary nodes of the instance.
2507 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2509 env.update(_BuildInstanceHookEnvByObject(self.instance))
2510 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2513 def CheckPrereq(self):
2514 """Check prerequisites.
2516 This checks that the instance is in the cluster.
2519 instance = self.cfg.GetInstanceInfo(
2520 self.cfg.ExpandInstanceName(self.op.instance_name))
2521 if instance is None:
2522 raise errors.OpPrereqError("Instance '%s' not known" %
2523 self.op.instance_name)
2525 if instance.disk_template not in constants.DTS_NET_MIRROR:
2526 raise errors.OpPrereqError("Instance's disk layout is not"
2527 " network mirrored, cannot failover.")
2529 secondary_nodes = instance.secondary_nodes
2530 if not secondary_nodes:
2531 raise errors.ProgrammerError("no secondary node but using "
2532 "DT_REMOTE_RAID1 template")
2534 target_node = secondary_nodes[0]
2535 # check memory requirements on the secondary node
2536 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2537 instance.name, instance.memory)
2539 # check bridge existance
2540 brlist = [nic.bridge for nic in instance.nics]
2541 if not rpc.call_bridges_exist(target_node, brlist):
2542 raise errors.OpPrereqError("One or more target bridges %s does not"
2543 " exist on destination node '%s'" %
2544 (brlist, target_node))
2546 self.instance = instance
2548 def Exec(self, feedback_fn):
2549 """Failover an instance.
2551 The failover is done by shutting it down on its present node and
2552 starting it on the secondary.
2555 instance = self.instance
2557 source_node = instance.primary_node
2558 target_node = instance.secondary_nodes[0]
2560 feedback_fn("* checking disk consistency between source and target")
2561 for dev in instance.disks:
2562 # for remote_raid1, these are md over drbd
2563 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2564 if not self.op.ignore_consistency:
2565 raise errors.OpExecError("Disk %s is degraded on target node,"
2566 " aborting failover." % dev.iv_name)
2568 feedback_fn("* shutting down instance on source node")
2569 logger.Info("Shutting down instance %s on node %s" %
2570 (instance.name, source_node))
2572 if not rpc.call_instance_shutdown(source_node, instance):
2573 if self.op.ignore_consistency:
2574 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2575 " anyway. Please make sure node %s is down" %
2576 (instance.name, source_node, source_node))
2578 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2579 (instance.name, source_node))
2581 feedback_fn("* deactivating the instance's disks on source node")
2582 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2583 raise errors.OpExecError("Can't shut down the instance's disks.")
2585 instance.primary_node = target_node
2586 # distribute new instance config to the other nodes
2587 self.cfg.AddInstance(instance)
2589 feedback_fn("* activating the instance's disks on target node")
2590 logger.Info("Starting instance %s on node %s" %
2591 (instance.name, target_node))
2593 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2594 ignore_secondaries=True)
2596 _ShutdownInstanceDisks(instance, self.cfg)
2597 raise errors.OpExecError("Can't activate the instance's disks")
2599 feedback_fn("* starting the instance on the target node")
2600 if not rpc.call_instance_start(target_node, instance, None):
2601 _ShutdownInstanceDisks(instance, self.cfg)
2602 raise errors.OpExecError("Could not start instance %s on node %s." %
2603 (instance.name, target_node))
2606 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2607 """Create a tree of block devices on the primary node.
2609 This always creates all devices.
2613 for child in device.children:
2614 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2617 cfg.SetDiskID(device, node)
2618 new_id = rpc.call_blockdev_create(node, device, device.size,
2619 instance.name, True, info)
2622 if device.physical_id is None:
2623 device.physical_id = new_id
2627 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2628 """Create a tree of block devices on a secondary node.
2630 If this device type has to be created on secondaries, create it and
2633 If not, just recurse to children keeping the same 'force' value.
2636 if device.CreateOnSecondary():
2639 for child in device.children:
2640 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2641 child, force, info):
2646 cfg.SetDiskID(device, node)
2647 new_id = rpc.call_blockdev_create(node, device, device.size,
2648 instance.name, False, info)
2651 if device.physical_id is None:
2652 device.physical_id = new_id
2656 def _GenerateUniqueNames(cfg, exts):
2657 """Generate a suitable LV name.
2659 This will generate a logical volume name for the given instance.
2664 new_id = cfg.GenerateUniqueID()
2665 results.append("%s%s" % (new_id, val))
2669 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2670 """Generate a drbd device complete with its children.
2673 port = cfg.AllocatePort()
2674 vgname = cfg.GetVGName()
2675 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2676 logical_id=(vgname, names[0]))
2677 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2678 logical_id=(vgname, names[1]))
2679 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2680 logical_id = (primary, secondary, port),
2681 children = [dev_data, dev_meta])
2685 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2686 """Generate a drbd8 device complete with its children.
2689 port = cfg.AllocatePort()
2690 vgname = cfg.GetVGName()
2691 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2692 logical_id=(vgname, names[0]))
2693 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2694 logical_id=(vgname, names[1]))
2695 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2696 logical_id = (primary, secondary, port),
2697 children = [dev_data, dev_meta],
2701 def _GenerateDiskTemplate(cfg, template_name,
2702 instance_name, primary_node,
2703 secondary_nodes, disk_sz, swap_sz):
2704 """Generate the entire disk layout for a given template type.
2707 #TODO: compute space requirements
2709 vgname = cfg.GetVGName()
2710 if template_name == "diskless":
2712 elif template_name == "plain":
2713 if len(secondary_nodes) != 0:
2714 raise errors.ProgrammerError("Wrong template configuration")
2716 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2717 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2718 logical_id=(vgname, names[0]),
2720 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2721 logical_id=(vgname, names[1]),
2723 disks = [sda_dev, sdb_dev]
2724 elif template_name == "local_raid1":
2725 if len(secondary_nodes) != 0:
2726 raise errors.ProgrammerError("Wrong template configuration")
2729 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2730 ".sdb_m1", ".sdb_m2"])
2731 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2732 logical_id=(vgname, names[0]))
2733 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2734 logical_id=(vgname, names[1]))
2735 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2737 children = [sda_dev_m1, sda_dev_m2])
2738 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2739 logical_id=(vgname, names[2]))
2740 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2741 logical_id=(vgname, names[3]))
2742 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2744 children = [sdb_dev_m1, sdb_dev_m2])
2745 disks = [md_sda_dev, md_sdb_dev]
2746 elif template_name == constants.DT_REMOTE_RAID1:
2747 if len(secondary_nodes) != 1:
2748 raise errors.ProgrammerError("Wrong template configuration")
2749 remote_node = secondary_nodes[0]
2750 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2751 ".sdb_data", ".sdb_meta"])
2752 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2753 disk_sz, names[0:2])
2754 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2755 children = [drbd_sda_dev], size=disk_sz)
2756 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2757 swap_sz, names[2:4])
2758 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2759 children = [drbd_sdb_dev], size=swap_sz)
2760 disks = [md_sda_dev, md_sdb_dev]
2761 elif template_name == constants.DT_DRBD8:
2762 if len(secondary_nodes) != 1:
2763 raise errors.ProgrammerError("Wrong template configuration")
2764 remote_node = secondary_nodes[0]
2765 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2766 ".sdb_data", ".sdb_meta"])
2767 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2768 disk_sz, names[0:2], "sda")
2769 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2770 swap_sz, names[2:4], "sdb")
2771 disks = [drbd_sda_dev, drbd_sdb_dev]
2773 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2777 def _GetInstanceInfoText(instance):
2778 """Compute that text that should be added to the disk's metadata.
2781 return "originstname+%s" % instance.name
2784 def _CreateDisks(cfg, instance):
2785 """Create all disks for an instance.
2787 This abstracts away some work from AddInstance.
2790 instance: the instance object
2793 True or False showing the success of the creation process
2796 info = _GetInstanceInfoText(instance)
2798 for device in instance.disks:
2799 logger.Info("creating volume %s for instance %s" %
2800 (device.iv_name, instance.name))
2802 for secondary_node in instance.secondary_nodes:
2803 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2804 device, False, info):
2805 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2806 (device.iv_name, device, secondary_node))
2809 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2810 instance, device, info):
2811 logger.Error("failed to create volume %s on primary!" %
2817 def _RemoveDisks(instance, cfg):
2818 """Remove all disks for an instance.
2820 This abstracts away some work from `AddInstance()` and
2821 `RemoveInstance()`. Note that in case some of the devices couldn't
2822 be removed, the removal will continue with the other ones (compare
2823 with `_CreateDisks()`).
2826 instance: the instance object
2829 True or False showing the success of the removal proces
2832 logger.Info("removing block devices for instance %s" % instance.name)
2835 for device in instance.disks:
2836 for node, disk in device.ComputeNodeTree(instance.primary_node):
2837 cfg.SetDiskID(disk, node)
2838 if not rpc.call_blockdev_remove(node, disk):
2839 logger.Error("could not remove block device %s on node %s,"
2840 " continuing anyway" %
2841 (device.iv_name, node))
2846 class LUCreateInstance(LogicalUnit):
2847 """Create an instance.
2850 HPATH = "instance-add"
2851 HTYPE = constants.HTYPE_INSTANCE
2852 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2853 "disk_template", "swap_size", "mode", "start", "vcpus",
2854 "wait_for_sync", "ip_check", "mac"]
2856 def BuildHooksEnv(self):
2859 This runs on master, primary and secondary nodes of the instance.
2863 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2864 "INSTANCE_DISK_SIZE": self.op.disk_size,
2865 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2866 "INSTANCE_ADD_MODE": self.op.mode,
2868 if self.op.mode == constants.INSTANCE_IMPORT:
2869 env["INSTANCE_SRC_NODE"] = self.op.src_node
2870 env["INSTANCE_SRC_PATH"] = self.op.src_path
2871 env["INSTANCE_SRC_IMAGE"] = self.src_image
2873 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2874 primary_node=self.op.pnode,
2875 secondary_nodes=self.secondaries,
2876 status=self.instance_status,
2877 os_type=self.op.os_type,
2878 memory=self.op.mem_size,
2879 vcpus=self.op.vcpus,
2880 nics=[(self.inst_ip, self.op.bridge)],
2883 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2888 def CheckPrereq(self):
2889 """Check prerequisites.
2892 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2893 if not hasattr(self.op, attr):
2894 setattr(self.op, attr, None)
2896 if self.op.mode not in (constants.INSTANCE_CREATE,
2897 constants.INSTANCE_IMPORT):
2898 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2901 if self.op.mode == constants.INSTANCE_IMPORT:
2902 src_node = getattr(self.op, "src_node", None)
2903 src_path = getattr(self.op, "src_path", None)
2904 if src_node is None or src_path is None:
2905 raise errors.OpPrereqError("Importing an instance requires source"
2906 " node and path options")
2907 src_node_full = self.cfg.ExpandNodeName(src_node)
2908 if src_node_full is None:
2909 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2910 self.op.src_node = src_node = src_node_full
2912 if not os.path.isabs(src_path):
2913 raise errors.OpPrereqError("The source path must be absolute")
2915 export_info = rpc.call_export_info(src_node, src_path)
2918 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2920 if not export_info.has_section(constants.INISECT_EXP):
2921 raise errors.ProgrammerError("Corrupted export config")
2923 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2924 if (int(ei_version) != constants.EXPORT_VERSION):
2925 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2926 (ei_version, constants.EXPORT_VERSION))
2928 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2929 raise errors.OpPrereqError("Can't import instance with more than"
2932 # FIXME: are the old os-es, disk sizes, etc. useful?
2933 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2934 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2936 self.src_image = diskimage
2937 else: # INSTANCE_CREATE
2938 if getattr(self.op, "os_type", None) is None:
2939 raise errors.OpPrereqError("No guest OS specified")
2941 # check primary node
2942 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2944 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2946 self.op.pnode = pnode.name
2948 self.secondaries = []
2949 # disk template and mirror node verification
2950 if self.op.disk_template not in constants.DISK_TEMPLATES:
2951 raise errors.OpPrereqError("Invalid disk template name")
2953 if self.op.disk_template in constants.DTS_NET_MIRROR:
2954 if getattr(self.op, "snode", None) is None:
2955 raise errors.OpPrereqError("The networked disk templates need"
2958 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2959 if snode_name is None:
2960 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2962 elif snode_name == pnode.name:
2963 raise errors.OpPrereqError("The secondary node cannot be"
2964 " the primary node.")
2965 self.secondaries.append(snode_name)
2967 # Required free disk space as a function of disk and swap space
2969 constants.DT_DISKLESS: None,
2970 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2971 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2972 # 256 MB are added for drbd metadata, 128MB for each drbd device
2973 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2974 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2977 if self.op.disk_template not in req_size_dict:
2978 raise errors.ProgrammerError("Disk template '%s' size requirement"
2979 " is unknown" % self.op.disk_template)
2981 req_size = req_size_dict[self.op.disk_template]
2983 # Check lv size requirements
2984 if req_size is not None:
2985 nodenames = [pnode.name] + self.secondaries
2986 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2987 for node in nodenames:
2988 info = nodeinfo.get(node, None)
2990 raise errors.OpPrereqError("Cannot get current information"
2991 " from node '%s'" % nodeinfo)
2992 vg_free = info.get('vg_free', None)
2993 if not isinstance(vg_free, int):
2994 raise errors.OpPrereqError("Can't compute free disk space on"
2996 if req_size > info['vg_free']:
2997 raise errors.OpPrereqError("Not enough disk space on target node %s."
2998 " %d MB available, %d MB required" %
2999 (node, info['vg_free'], req_size))
3002 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3004 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3005 " primary node" % self.op.os_type)
3007 if self.op.kernel_path == constants.VALUE_NONE:
3008 raise errors.OpPrereqError("Can't set instance kernel to none")
3010 # instance verification
3011 hostname1 = utils.HostInfo(self.op.instance_name)
3013 self.op.instance_name = instance_name = hostname1.name
3014 instance_list = self.cfg.GetInstanceList()
3015 if instance_name in instance_list:
3016 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3019 ip = getattr(self.op, "ip", None)
3020 if ip is None or ip.lower() == "none":
3022 elif ip.lower() == "auto":
3023 inst_ip = hostname1.ip
3025 if not utils.IsValidIP(ip):
3026 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3027 " like a valid IP" % ip)
3029 self.inst_ip = inst_ip
3031 if self.op.start and not self.op.ip_check:
3032 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3033 " adding an instance in start mode")
3035 if self.op.ip_check:
3036 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3037 constants.DEFAULT_NODED_PORT):
3038 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3039 (hostname1.ip, instance_name))
3041 # MAC address verification
3042 if self.op.mac != "auto":
3043 if not utils.IsValidMac(self.op.mac.lower()):
3044 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3047 # bridge verification
3048 bridge = getattr(self.op, "bridge", None)
3050 self.op.bridge = self.cfg.GetDefBridge()
3052 self.op.bridge = bridge
3054 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3055 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3056 " destination node '%s'" %
3057 (self.op.bridge, pnode.name))
3059 # boot order verification
3060 if self.op.hvm_boot_order is not None:
3061 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3062 raise errors.OpPrereqError("invalid boot order specified,"
3063 " must be one or more of [acdn]")
3066 self.instance_status = 'up'
3068 self.instance_status = 'down'
3070 def Exec(self, feedback_fn):
3071 """Create and add the instance to the cluster.
3074 instance = self.op.instance_name
3075 pnode_name = self.pnode.name
3077 if self.op.mac == "auto":
3078 mac_address = self.cfg.GenerateMAC()
3080 mac_address = self.op.mac
3082 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3083 if self.inst_ip is not None:
3084 nic.ip = self.inst_ip
3086 ht_kind = self.sstore.GetHypervisorType()
3087 if ht_kind in constants.HTS_REQ_PORT:
3088 network_port = self.cfg.AllocatePort()
3092 disks = _GenerateDiskTemplate(self.cfg,
3093 self.op.disk_template,
3094 instance, pnode_name,
3095 self.secondaries, self.op.disk_size,
3098 iobj = objects.Instance(name=instance, os=self.op.os_type,
3099 primary_node=pnode_name,
3100 memory=self.op.mem_size,
3101 vcpus=self.op.vcpus,
3102 nics=[nic], disks=disks,
3103 disk_template=self.op.disk_template,
3104 status=self.instance_status,
3105 network_port=network_port,
3106 kernel_path=self.op.kernel_path,
3107 initrd_path=self.op.initrd_path,
3108 hvm_boot_order=self.op.hvm_boot_order,
3111 feedback_fn("* creating instance disks...")
3112 if not _CreateDisks(self.cfg, iobj):
3113 _RemoveDisks(iobj, self.cfg)
3114 raise errors.OpExecError("Device creation failed, reverting...")
3116 feedback_fn("adding instance %s to cluster config" % instance)
3118 self.cfg.AddInstance(iobj)
3120 if self.op.wait_for_sync:
3121 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3122 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3123 # make sure the disks are not degraded (still sync-ing is ok)
3125 feedback_fn("* checking mirrors status")
3126 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3131 _RemoveDisks(iobj, self.cfg)
3132 self.cfg.RemoveInstance(iobj.name)
3133 raise errors.OpExecError("There are some degraded disks for"
3136 feedback_fn("creating os for instance %s on node %s" %
3137 (instance, pnode_name))
3139 if iobj.disk_template != constants.DT_DISKLESS:
3140 if self.op.mode == constants.INSTANCE_CREATE:
3141 feedback_fn("* running the instance OS create scripts...")
3142 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3143 raise errors.OpExecError("could not add os for instance %s"
3145 (instance, pnode_name))
3147 elif self.op.mode == constants.INSTANCE_IMPORT:
3148 feedback_fn("* running the instance OS import scripts...")
3149 src_node = self.op.src_node
3150 src_image = self.src_image
3151 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3152 src_node, src_image):
3153 raise errors.OpExecError("Could not import os for instance"
3155 (instance, pnode_name))
3157 # also checked in the prereq part
3158 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3162 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3163 feedback_fn("* starting instance...")
3164 if not rpc.call_instance_start(pnode_name, iobj, None):
3165 raise errors.OpExecError("Could not start instance")
3168 class LUConnectConsole(NoHooksLU):
3169 """Connect to an instance's console.
3171 This is somewhat special in that it returns the command line that
3172 you need to run on the master node in order to connect to the
3176 _OP_REQP = ["instance_name"]
3178 def CheckPrereq(self):
3179 """Check prerequisites.
3181 This checks that the instance is in the cluster.
3184 instance = self.cfg.GetInstanceInfo(
3185 self.cfg.ExpandInstanceName(self.op.instance_name))
3186 if instance is None:
3187 raise errors.OpPrereqError("Instance '%s' not known" %
3188 self.op.instance_name)
3189 self.instance = instance
3191 def Exec(self, feedback_fn):
3192 """Connect to the console of an instance
3195 instance = self.instance
3196 node = instance.primary_node
3198 node_insts = rpc.call_instance_list([node])[node]
3199 if node_insts is False:
3200 raise errors.OpExecError("Can't connect to node %s." % node)
3202 if instance.name not in node_insts:
3203 raise errors.OpExecError("Instance %s is not running." % instance.name)
3205 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3207 hyper = hypervisor.GetHypervisor()
3208 console_cmd = hyper.GetShellCommandForConsole(instance)
3210 argv = ["ssh", "-q", "-t"]
3211 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3212 argv.extend(ssh.BATCH_MODE_OPTS)
3214 argv.append(console_cmd)
3218 class LUAddMDDRBDComponent(LogicalUnit):
3219 """Adda new mirror member to an instance's disk.
3222 HPATH = "mirror-add"
3223 HTYPE = constants.HTYPE_INSTANCE
3224 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3226 def BuildHooksEnv(self):
3229 This runs on the master, the primary and all the secondaries.
3233 "NEW_SECONDARY": self.op.remote_node,
3234 "DISK_NAME": self.op.disk_name,
3236 env.update(_BuildInstanceHookEnvByObject(self.instance))
3237 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3238 self.op.remote_node,] + list(self.instance.secondary_nodes)
3241 def CheckPrereq(self):
3242 """Check prerequisites.
3244 This checks that the instance is in the cluster.
3247 instance = self.cfg.GetInstanceInfo(
3248 self.cfg.ExpandInstanceName(self.op.instance_name))
3249 if instance is None:
3250 raise errors.OpPrereqError("Instance '%s' not known" %
3251 self.op.instance_name)
3252 self.instance = instance
3254 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3255 if remote_node is None:
3256 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3257 self.remote_node = remote_node
3259 if remote_node == instance.primary_node:
3260 raise errors.OpPrereqError("The specified node is the primary node of"
3263 if instance.disk_template != constants.DT_REMOTE_RAID1:
3264 raise errors.OpPrereqError("Instance's disk layout is not"
3266 for disk in instance.disks:
3267 if disk.iv_name == self.op.disk_name:
3270 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3271 " instance." % self.op.disk_name)
3272 if len(disk.children) > 1:
3273 raise errors.OpPrereqError("The device already has two slave devices."
3274 " This would create a 3-disk raid1 which we"
3278 def Exec(self, feedback_fn):
3279 """Add the mirror component
3283 instance = self.instance
3285 remote_node = self.remote_node
3286 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3287 names = _GenerateUniqueNames(self.cfg, lv_names)
3288 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3289 remote_node, disk.size, names)
3291 logger.Info("adding new mirror component on secondary")
3293 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3295 _GetInstanceInfoText(instance)):
3296 raise errors.OpExecError("Failed to create new component on secondary"
3297 " node %s" % remote_node)
3299 logger.Info("adding new mirror component on primary")
3301 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3303 _GetInstanceInfoText(instance)):
3304 # remove secondary dev
3305 self.cfg.SetDiskID(new_drbd, remote_node)
3306 rpc.call_blockdev_remove(remote_node, new_drbd)
3307 raise errors.OpExecError("Failed to create volume on primary")
3309 # the device exists now
3310 # call the primary node to add the mirror to md
3311 logger.Info("adding new mirror component to md")
3312 if not rpc.call_blockdev_addchildren(instance.primary_node,
3314 logger.Error("Can't add mirror compoment to md!")
3315 self.cfg.SetDiskID(new_drbd, remote_node)
3316 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3317 logger.Error("Can't rollback on secondary")
3318 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3319 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3320 logger.Error("Can't rollback on primary")
3321 raise errors.OpExecError("Can't add mirror component to md array")
3323 disk.children.append(new_drbd)
3325 self.cfg.AddInstance(instance)
3327 _WaitForSync(self.cfg, instance, self.proc)
3332 class LURemoveMDDRBDComponent(LogicalUnit):
3333 """Remove a component from a remote_raid1 disk.
3336 HPATH = "mirror-remove"
3337 HTYPE = constants.HTYPE_INSTANCE
3338 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3340 def BuildHooksEnv(self):
3343 This runs on the master, the primary and all the secondaries.
3347 "DISK_NAME": self.op.disk_name,
3348 "DISK_ID": self.op.disk_id,
3349 "OLD_SECONDARY": self.old_secondary,
3351 env.update(_BuildInstanceHookEnvByObject(self.instance))
3352 nl = [self.sstore.GetMasterNode(),
3353 self.instance.primary_node] + list(self.instance.secondary_nodes)
3356 def CheckPrereq(self):
3357 """Check prerequisites.
3359 This checks that the instance is in the cluster.
3362 instance = self.cfg.GetInstanceInfo(
3363 self.cfg.ExpandInstanceName(self.op.instance_name))
3364 if instance is None:
3365 raise errors.OpPrereqError("Instance '%s' not known" %
3366 self.op.instance_name)
3367 self.instance = instance
3369 if instance.disk_template != constants.DT_REMOTE_RAID1:
3370 raise errors.OpPrereqError("Instance's disk layout is not"
3372 for disk in instance.disks:
3373 if disk.iv_name == self.op.disk_name:
3376 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3377 " instance." % self.op.disk_name)
3378 for child in disk.children:
3379 if (child.dev_type == constants.LD_DRBD7 and
3380 child.logical_id[2] == self.op.disk_id):
3383 raise errors.OpPrereqError("Can't find the device with this port.")
3385 if len(disk.children) < 2:
3386 raise errors.OpPrereqError("Cannot remove the last component from"
3390 if self.child.logical_id[0] == instance.primary_node:
3394 self.old_secondary = self.child.logical_id[oid]
3396 def Exec(self, feedback_fn):
3397 """Remove the mirror component
3400 instance = self.instance
3403 logger.Info("remove mirror component")
3404 self.cfg.SetDiskID(disk, instance.primary_node)
3405 if not rpc.call_blockdev_removechildren(instance.primary_node,
3407 raise errors.OpExecError("Can't remove child from mirror.")
3409 for node in child.logical_id[:2]:
3410 self.cfg.SetDiskID(child, node)
3411 if not rpc.call_blockdev_remove(node, child):
3412 logger.Error("Warning: failed to remove device from node %s,"
3413 " continuing operation." % node)
3415 disk.children.remove(child)
3416 self.cfg.AddInstance(instance)
3419 class LUReplaceDisks(LogicalUnit):
3420 """Replace the disks of an instance.
3423 HPATH = "mirrors-replace"
3424 HTYPE = constants.HTYPE_INSTANCE
3425 _OP_REQP = ["instance_name", "mode", "disks"]
3427 def BuildHooksEnv(self):
3430 This runs on the master, the primary and all the secondaries.
3434 "MODE": self.op.mode,
3435 "NEW_SECONDARY": self.op.remote_node,
3436 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3438 env.update(_BuildInstanceHookEnvByObject(self.instance))
3440 self.sstore.GetMasterNode(),
3441 self.instance.primary_node,
3443 if self.op.remote_node is not None:
3444 nl.append(self.op.remote_node)
3447 def CheckPrereq(self):
3448 """Check prerequisites.
3450 This checks that the instance is in the cluster.
3453 instance = self.cfg.GetInstanceInfo(
3454 self.cfg.ExpandInstanceName(self.op.instance_name))
3455 if instance is None:
3456 raise errors.OpPrereqError("Instance '%s' not known" %
3457 self.op.instance_name)
3458 self.instance = instance
3459 self.op.instance_name = instance.name
3461 if instance.disk_template not in constants.DTS_NET_MIRROR:
3462 raise errors.OpPrereqError("Instance's disk layout is not"
3463 " network mirrored.")
3465 if len(instance.secondary_nodes) != 1:
3466 raise errors.OpPrereqError("The instance has a strange layout,"
3467 " expected one secondary but found %d" %
3468 len(instance.secondary_nodes))
3470 self.sec_node = instance.secondary_nodes[0]
3472 remote_node = getattr(self.op, "remote_node", None)
3473 if remote_node is not None:
3474 remote_node = self.cfg.ExpandNodeName(remote_node)
3475 if remote_node is None:
3476 raise errors.OpPrereqError("Node '%s' not known" %
3477 self.op.remote_node)
3478 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3480 self.remote_node_info = None
3481 if remote_node == instance.primary_node:
3482 raise errors.OpPrereqError("The specified node is the primary node of"
3484 elif remote_node == self.sec_node:
3485 if self.op.mode == constants.REPLACE_DISK_SEC:
3486 # this is for DRBD8, where we can't execute the same mode of
3487 # replacement as for drbd7 (no different port allocated)
3488 raise errors.OpPrereqError("Same secondary given, cannot execute"
3490 # the user gave the current secondary, switch to
3491 # 'no-replace-secondary' mode for drbd7
3493 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3494 self.op.mode != constants.REPLACE_DISK_ALL):
3495 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3496 " disks replacement, not individual ones")
3497 if instance.disk_template == constants.DT_DRBD8:
3498 if (self.op.mode == constants.REPLACE_DISK_ALL and
3499 remote_node is not None):
3500 # switch to replace secondary mode
3501 self.op.mode = constants.REPLACE_DISK_SEC
3503 if self.op.mode == constants.REPLACE_DISK_ALL:
3504 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3505 " secondary disk replacement, not"
3507 elif self.op.mode == constants.REPLACE_DISK_PRI:
3508 if remote_node is not None:
3509 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3510 " the secondary while doing a primary"
3511 " node disk replacement")
3512 self.tgt_node = instance.primary_node
3513 self.oth_node = instance.secondary_nodes[0]
3514 elif self.op.mode == constants.REPLACE_DISK_SEC:
3515 self.new_node = remote_node # this can be None, in which case
3516 # we don't change the secondary
3517 self.tgt_node = instance.secondary_nodes[0]
3518 self.oth_node = instance.primary_node
3520 raise errors.ProgrammerError("Unhandled disk replace mode")
3522 for name in self.op.disks:
3523 if instance.FindDisk(name) is None:
3524 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3525 (name, instance.name))
3526 self.op.remote_node = remote_node
3528 def _ExecRR1(self, feedback_fn):
3529 """Replace the disks of an instance.
3532 instance = self.instance
3535 if self.op.remote_node is None:
3536 remote_node = self.sec_node
3538 remote_node = self.op.remote_node
3540 for dev in instance.disks:
3542 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3543 names = _GenerateUniqueNames(cfg, lv_names)
3544 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3545 remote_node, size, names)
3546 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3547 logger.Info("adding new mirror component on secondary for %s" %
3550 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3552 _GetInstanceInfoText(instance)):
3553 raise errors.OpExecError("Failed to create new component on secondary"
3554 " node %s. Full abort, cleanup manually!" %
3557 logger.Info("adding new mirror component on primary")
3559 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3561 _GetInstanceInfoText(instance)):
3562 # remove secondary dev
3563 cfg.SetDiskID(new_drbd, remote_node)
3564 rpc.call_blockdev_remove(remote_node, new_drbd)
3565 raise errors.OpExecError("Failed to create volume on primary!"
3566 " Full abort, cleanup manually!!")
3568 # the device exists now
3569 # call the primary node to add the mirror to md
3570 logger.Info("adding new mirror component to md")
3571 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3573 logger.Error("Can't add mirror compoment to md!")
3574 cfg.SetDiskID(new_drbd, remote_node)
3575 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3576 logger.Error("Can't rollback on secondary")
3577 cfg.SetDiskID(new_drbd, instance.primary_node)
3578 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3579 logger.Error("Can't rollback on primary")
3580 raise errors.OpExecError("Full abort, cleanup manually!!")
3582 dev.children.append(new_drbd)
3583 cfg.AddInstance(instance)
3585 # this can fail as the old devices are degraded and _WaitForSync
3586 # does a combined result over all disks, so we don't check its
3588 _WaitForSync(cfg, instance, self.proc, unlock=True)
3590 # so check manually all the devices
3591 for name in iv_names:
3592 dev, child, new_drbd = iv_names[name]
3593 cfg.SetDiskID(dev, instance.primary_node)
3594 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3596 raise errors.OpExecError("MD device %s is degraded!" % name)
3597 cfg.SetDiskID(new_drbd, instance.primary_node)
3598 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3600 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3602 for name in iv_names:
3603 dev, child, new_drbd = iv_names[name]
3604 logger.Info("remove mirror %s component" % name)
3605 cfg.SetDiskID(dev, instance.primary_node)
3606 if not rpc.call_blockdev_removechildren(instance.primary_node,
3608 logger.Error("Can't remove child from mirror, aborting"
3609 " *this device cleanup*.\nYou need to cleanup manually!!")
3612 for node in child.logical_id[:2]:
3613 logger.Info("remove child device on %s" % node)
3614 cfg.SetDiskID(child, node)
3615 if not rpc.call_blockdev_remove(node, child):
3616 logger.Error("Warning: failed to remove device from node %s,"
3617 " continuing operation." % node)
3619 dev.children.remove(child)
3621 cfg.AddInstance(instance)
3623 def _ExecD8DiskOnly(self, feedback_fn):
3624 """Replace a disk on the primary or secondary for dbrd8.
3626 The algorithm for replace is quite complicated:
3627 - for each disk to be replaced:
3628 - create new LVs on the target node with unique names
3629 - detach old LVs from the drbd device
3630 - rename old LVs to name_replaced.<time_t>
3631 - rename new LVs to old LVs
3632 - attach the new LVs (with the old names now) to the drbd device
3633 - wait for sync across all devices
3634 - for each modified disk:
3635 - remove old LVs (which have the name name_replaces.<time_t>)
3637 Failures are not very well handled.
3641 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3642 instance = self.instance
3644 vgname = self.cfg.GetVGName()
3647 tgt_node = self.tgt_node
3648 oth_node = self.oth_node
3650 # Step: check device activation
3651 self.proc.LogStep(1, steps_total, "check device existence")
3652 info("checking volume groups")
3653 my_vg = cfg.GetVGName()
3654 results = rpc.call_vg_list([oth_node, tgt_node])
3656 raise errors.OpExecError("Can't list volume groups on the nodes")
3657 for node in oth_node, tgt_node:
3658 res = results.get(node, False)
3659 if not res or my_vg not in res:
3660 raise errors.OpExecError("Volume group '%s' not found on %s" %
3662 for dev in instance.disks:
3663 if not dev.iv_name in self.op.disks:
3665 for node in tgt_node, oth_node:
3666 info("checking %s on %s" % (dev.iv_name, node))
3667 cfg.SetDiskID(dev, node)
3668 if not rpc.call_blockdev_find(node, dev):
3669 raise errors.OpExecError("Can't find device %s on node %s" %
3670 (dev.iv_name, node))
3672 # Step: check other node consistency
3673 self.proc.LogStep(2, steps_total, "check peer consistency")
3674 for dev in instance.disks:
3675 if not dev.iv_name in self.op.disks:
3677 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3678 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3679 oth_node==instance.primary_node):
3680 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3681 " to replace disks on this node (%s)" %
3682 (oth_node, tgt_node))
3684 # Step: create new storage
3685 self.proc.LogStep(3, steps_total, "allocate new storage")
3686 for dev in instance.disks:
3687 if not dev.iv_name in self.op.disks:
3690 cfg.SetDiskID(dev, tgt_node)
3691 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3692 names = _GenerateUniqueNames(cfg, lv_names)
3693 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3694 logical_id=(vgname, names[0]))
3695 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3696 logical_id=(vgname, names[1]))
3697 new_lvs = [lv_data, lv_meta]
3698 old_lvs = dev.children
3699 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3700 info("creating new local storage on %s for %s" %
3701 (tgt_node, dev.iv_name))
3702 # since we *always* want to create this LV, we use the
3703 # _Create...OnPrimary (which forces the creation), even if we
3704 # are talking about the secondary node
3705 for new_lv in new_lvs:
3706 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3707 _GetInstanceInfoText(instance)):
3708 raise errors.OpExecError("Failed to create new LV named '%s' on"
3710 (new_lv.logical_id[1], tgt_node))
3712 # Step: for each lv, detach+rename*2+attach
3713 self.proc.LogStep(4, steps_total, "change drbd configuration")
3714 for dev, old_lvs, new_lvs in iv_names.itervalues():
3715 info("detaching %s drbd from local storage" % dev.iv_name)
3716 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3717 raise errors.OpExecError("Can't detach drbd from local storage on node"
3718 " %s for device %s" % (tgt_node, dev.iv_name))
3720 #cfg.Update(instance)
3722 # ok, we created the new LVs, so now we know we have the needed
3723 # storage; as such, we proceed on the target node to rename
3724 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3725 # using the assumption than logical_id == physical_id (which in
3726 # turn is the unique_id on that node)
3728 # FIXME(iustin): use a better name for the replaced LVs
3729 temp_suffix = int(time.time())
3730 ren_fn = lambda d, suff: (d.physical_id[0],
3731 d.physical_id[1] + "_replaced-%s" % suff)
3732 # build the rename list based on what LVs exist on the node
3734 for to_ren in old_lvs:
3735 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3736 if find_res is not None: # device exists
3737 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3739 info("renaming the old LVs on the target node")
3740 if not rpc.call_blockdev_rename(tgt_node, rlist):
3741 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3742 # now we rename the new LVs to the old LVs
3743 info("renaming the new LVs on the target node")
3744 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3745 if not rpc.call_blockdev_rename(tgt_node, rlist):
3746 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3748 for old, new in zip(old_lvs, new_lvs):
3749 new.logical_id = old.logical_id
3750 cfg.SetDiskID(new, tgt_node)
3752 for disk in old_lvs:
3753 disk.logical_id = ren_fn(disk, temp_suffix)
3754 cfg.SetDiskID(disk, tgt_node)
3756 # now that the new lvs have the old name, we can add them to the device
3757 info("adding new mirror component on %s" % tgt_node)
3758 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3759 for new_lv in new_lvs:
3760 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3761 warning("Can't rollback device %s", hint="manually cleanup unused"
3763 raise errors.OpExecError("Can't add local storage to drbd")
3765 dev.children = new_lvs
3766 cfg.Update(instance)
3768 # Step: wait for sync
3770 # this can fail as the old devices are degraded and _WaitForSync
3771 # does a combined result over all disks, so we don't check its
3773 self.proc.LogStep(5, steps_total, "sync devices")
3774 _WaitForSync(cfg, instance, self.proc, unlock=True)
3776 # so check manually all the devices
3777 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3778 cfg.SetDiskID(dev, instance.primary_node)
3779 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3781 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3783 # Step: remove old storage
3784 self.proc.LogStep(6, steps_total, "removing old storage")
3785 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3786 info("remove logical volumes for %s" % name)
3788 cfg.SetDiskID(lv, tgt_node)
3789 if not rpc.call_blockdev_remove(tgt_node, lv):
3790 warning("Can't remove old LV", hint="manually remove unused LVs")
3793 def _ExecD8Secondary(self, feedback_fn):
3794 """Replace the secondary node for drbd8.
3796 The algorithm for replace is quite complicated:
3797 - for all disks of the instance:
3798 - create new LVs on the new node with same names
3799 - shutdown the drbd device on the old secondary
3800 - disconnect the drbd network on the primary
3801 - create the drbd device on the new secondary
3802 - network attach the drbd on the primary, using an artifice:
3803 the drbd code for Attach() will connect to the network if it
3804 finds a device which is connected to the good local disks but
3806 - wait for sync across all devices
3807 - remove all disks from the old secondary
3809 Failures are not very well handled.
3813 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3814 instance = self.instance
3816 vgname = self.cfg.GetVGName()
3819 old_node = self.tgt_node
3820 new_node = self.new_node
3821 pri_node = instance.primary_node
3823 # Step: check device activation
3824 self.proc.LogStep(1, steps_total, "check device existence")
3825 info("checking volume groups")
3826 my_vg = cfg.GetVGName()
3827 results = rpc.call_vg_list([pri_node, new_node])
3829 raise errors.OpExecError("Can't list volume groups on the nodes")
3830 for node in pri_node, new_node:
3831 res = results.get(node, False)
3832 if not res or my_vg not in res:
3833 raise errors.OpExecError("Volume group '%s' not found on %s" %
3835 for dev in instance.disks:
3836 if not dev.iv_name in self.op.disks:
3838 info("checking %s on %s" % (dev.iv_name, pri_node))
3839 cfg.SetDiskID(dev, pri_node)
3840 if not rpc.call_blockdev_find(pri_node, dev):
3841 raise errors.OpExecError("Can't find device %s on node %s" %
3842 (dev.iv_name, pri_node))
3844 # Step: check other node consistency
3845 self.proc.LogStep(2, steps_total, "check peer consistency")
3846 for dev in instance.disks:
3847 if not dev.iv_name in self.op.disks:
3849 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3850 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3851 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3852 " unsafe to replace the secondary" %
3855 # Step: create new storage
3856 self.proc.LogStep(3, steps_total, "allocate new storage")
3857 for dev in instance.disks:
3859 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3860 # since we *always* want to create this LV, we use the
3861 # _Create...OnPrimary (which forces the creation), even if we
3862 # are talking about the secondary node
3863 for new_lv in dev.children:
3864 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3865 _GetInstanceInfoText(instance)):
3866 raise errors.OpExecError("Failed to create new LV named '%s' on"
3868 (new_lv.logical_id[1], new_node))
3870 iv_names[dev.iv_name] = (dev, dev.children)
3872 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3873 for dev in instance.disks:
3875 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3876 # create new devices on new_node
3877 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3878 logical_id=(pri_node, new_node,
3880 children=dev.children)
3881 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3883 _GetInstanceInfoText(instance)):
3884 raise errors.OpExecError("Failed to create new DRBD on"
3885 " node '%s'" % new_node)
3887 for dev in instance.disks:
3888 # we have new devices, shutdown the drbd on the old secondary
3889 info("shutting down drbd for %s on old node" % dev.iv_name)
3890 cfg.SetDiskID(dev, old_node)
3891 if not rpc.call_blockdev_shutdown(old_node, dev):
3892 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3893 hint="Please cleanup this device manually as soon as possible")
3895 info("detaching primary drbds from the network (=> standalone)")
3897 for dev in instance.disks:
3898 cfg.SetDiskID(dev, pri_node)
3899 # set the physical (unique in bdev terms) id to None, meaning
3900 # detach from network
3901 dev.physical_id = (None,) * len(dev.physical_id)
3902 # and 'find' the device, which will 'fix' it to match the
3904 if rpc.call_blockdev_find(pri_node, dev):
3907 warning("Failed to detach drbd %s from network, unusual case" %
3911 # no detaches succeeded (very unlikely)
3912 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3914 # if we managed to detach at least one, we update all the disks of
3915 # the instance to point to the new secondary
3916 info("updating instance configuration")
3917 for dev in instance.disks:
3918 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3919 cfg.SetDiskID(dev, pri_node)
3920 cfg.Update(instance)
3922 # and now perform the drbd attach
3923 info("attaching primary drbds to new secondary (standalone => connected)")
3925 for dev in instance.disks:
3926 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3927 # since the attach is smart, it's enough to 'find' the device,
3928 # it will automatically activate the network, if the physical_id
3930 cfg.SetDiskID(dev, pri_node)
3931 if not rpc.call_blockdev_find(pri_node, dev):
3932 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3933 "please do a gnt-instance info to see the status of disks")
3935 # this can fail as the old devices are degraded and _WaitForSync
3936 # does a combined result over all disks, so we don't check its
3938 self.proc.LogStep(5, steps_total, "sync devices")
3939 _WaitForSync(cfg, instance, self.proc, unlock=True)
3941 # so check manually all the devices
3942 for name, (dev, old_lvs) in iv_names.iteritems():
3943 cfg.SetDiskID(dev, pri_node)
3944 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3946 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3948 self.proc.LogStep(6, steps_total, "removing old storage")
3949 for name, (dev, old_lvs) in iv_names.iteritems():
3950 info("remove logical volumes for %s" % name)
3952 cfg.SetDiskID(lv, old_node)
3953 if not rpc.call_blockdev_remove(old_node, lv):
3954 warning("Can't remove LV on old secondary",
3955 hint="Cleanup stale volumes by hand")
3957 def Exec(self, feedback_fn):
3958 """Execute disk replacement.
3960 This dispatches the disk replacement to the appropriate handler.
3963 instance = self.instance
3964 if instance.disk_template == constants.DT_REMOTE_RAID1:
3966 elif instance.disk_template == constants.DT_DRBD8:
3967 if self.op.remote_node is None:
3968 fn = self._ExecD8DiskOnly
3970 fn = self._ExecD8Secondary
3972 raise errors.ProgrammerError("Unhandled disk replacement case")
3973 return fn(feedback_fn)
3976 class LUQueryInstanceData(NoHooksLU):
3977 """Query runtime instance data.
3980 _OP_REQP = ["instances"]
3982 def CheckPrereq(self):
3983 """Check prerequisites.
3985 This only checks the optional instance list against the existing names.
3988 if not isinstance(self.op.instances, list):
3989 raise errors.OpPrereqError("Invalid argument type 'instances'")
3990 if self.op.instances:
3991 self.wanted_instances = []
3992 names = self.op.instances
3994 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3995 if instance is None:
3996 raise errors.OpPrereqError("No such instance name '%s'" % name)
3997 self.wanted_instances.append(instance)
3999 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4000 in self.cfg.GetInstanceList()]
4004 def _ComputeDiskStatus(self, instance, snode, dev):
4005 """Compute block device status.
4008 self.cfg.SetDiskID(dev, instance.primary_node)
4009 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4010 if dev.dev_type in constants.LDS_DRBD:
4011 # we change the snode then (otherwise we use the one passed in)
4012 if dev.logical_id[0] == instance.primary_node:
4013 snode = dev.logical_id[1]
4015 snode = dev.logical_id[0]
4018 self.cfg.SetDiskID(dev, snode)
4019 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4024 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4025 for child in dev.children]
4030 "iv_name": dev.iv_name,
4031 "dev_type": dev.dev_type,
4032 "logical_id": dev.logical_id,
4033 "physical_id": dev.physical_id,
4034 "pstatus": dev_pstatus,
4035 "sstatus": dev_sstatus,
4036 "children": dev_children,
4041 def Exec(self, feedback_fn):
4042 """Gather and return data"""
4044 for instance in self.wanted_instances:
4045 remote_info = rpc.call_instance_info(instance.primary_node,
4047 if remote_info and "state" in remote_info:
4050 remote_state = "down"
4051 if instance.status == "down":
4052 config_state = "down"
4056 disks = [self._ComputeDiskStatus(instance, None, device)
4057 for device in instance.disks]
4060 "name": instance.name,
4061 "config_state": config_state,
4062 "run_state": remote_state,
4063 "pnode": instance.primary_node,
4064 "snodes": instance.secondary_nodes,
4066 "memory": instance.memory,
4067 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4069 "network_port": instance.network_port,
4070 "vcpus": instance.vcpus,
4071 "kernel_path": instance.kernel_path,
4072 "initrd_path": instance.initrd_path,
4073 "hvm_boot_order": instance.hvm_boot_order,
4076 result[instance.name] = idict
4081 class LUSetInstanceParms(LogicalUnit):
4082 """Modifies an instances's parameters.
4085 HPATH = "instance-modify"
4086 HTYPE = constants.HTYPE_INSTANCE
4087 _OP_REQP = ["instance_name"]
4089 def BuildHooksEnv(self):
4092 This runs on the master, primary and secondaries.
4097 args['memory'] = self.mem
4099 args['vcpus'] = self.vcpus
4100 if self.do_ip or self.do_bridge:
4104 ip = self.instance.nics[0].ip
4106 bridge = self.bridge
4108 bridge = self.instance.nics[0].bridge
4109 args['nics'] = [(ip, bridge)]
4110 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4111 nl = [self.sstore.GetMasterNode(),
4112 self.instance.primary_node] + list(self.instance.secondary_nodes)
4115 def CheckPrereq(self):
4116 """Check prerequisites.
4118 This only checks the instance list against the existing names.
4121 self.mem = getattr(self.op, "mem", None)
4122 self.vcpus = getattr(self.op, "vcpus", None)
4123 self.ip = getattr(self.op, "ip", None)
4124 self.mac = getattr(self.op, "mac", None)
4125 self.bridge = getattr(self.op, "bridge", None)
4126 self.kernel_path = getattr(self.op, "kernel_path", None)
4127 self.initrd_path = getattr(self.op, "initrd_path", None)
4128 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4129 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4130 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4131 if all_parms.count(None) == len(all_parms):
4132 raise errors.OpPrereqError("No changes submitted")
4133 if self.mem is not None:
4135 self.mem = int(self.mem)
4136 except ValueError, err:
4137 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4138 if self.vcpus is not None:
4140 self.vcpus = int(self.vcpus)
4141 except ValueError, err:
4142 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4143 if self.ip is not None:
4145 if self.ip.lower() == "none":
4148 if not utils.IsValidIP(self.ip):
4149 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4152 self.do_bridge = (self.bridge is not None)
4153 if self.mac is not None:
4154 if self.cfg.IsMacInUse(self.mac):
4155 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4157 if not utils.IsValidMac(self.mac):
4158 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4160 if self.kernel_path is not None:
4161 self.do_kernel_path = True
4162 if self.kernel_path == constants.VALUE_NONE:
4163 raise errors.OpPrereqError("Can't set instance to no kernel")
4165 if self.kernel_path != constants.VALUE_DEFAULT:
4166 if not os.path.isabs(self.kernel_path):
4167 raise errors.OpPrereqError("The kernel path must be an absolute"
4170 self.do_kernel_path = False
4172 if self.initrd_path is not None:
4173 self.do_initrd_path = True
4174 if self.initrd_path not in (constants.VALUE_NONE,
4175 constants.VALUE_DEFAULT):
4176 if not os.path.isabs(self.initrd_path):
4177 raise errors.OpPrereqError("The initrd path must be an absolute"
4180 self.do_initrd_path = False
4182 # boot order verification
4183 if self.hvm_boot_order is not None:
4184 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4185 if len(self.hvm_boot_order.strip("acdn")) != 0:
4186 raise errors.OpPrereqError("invalid boot order specified,"
4187 " must be one or more of [acdn]"
4190 instance = self.cfg.GetInstanceInfo(
4191 self.cfg.ExpandInstanceName(self.op.instance_name))
4192 if instance is None:
4193 raise errors.OpPrereqError("No such instance name '%s'" %
4194 self.op.instance_name)
4195 self.op.instance_name = instance.name
4196 self.instance = instance
4199 def Exec(self, feedback_fn):
4200 """Modifies an instance.
4202 All parameters take effect only at the next restart of the instance.
4205 instance = self.instance
4207 instance.memory = self.mem
4208 result.append(("mem", self.mem))
4210 instance.vcpus = self.vcpus
4211 result.append(("vcpus", self.vcpus))
4213 instance.nics[0].ip = self.ip
4214 result.append(("ip", self.ip))
4216 instance.nics[0].bridge = self.bridge
4217 result.append(("bridge", self.bridge))
4219 instance.nics[0].mac = self.mac
4220 result.append(("mac", self.mac))
4221 if self.do_kernel_path:
4222 instance.kernel_path = self.kernel_path
4223 result.append(("kernel_path", self.kernel_path))
4224 if self.do_initrd_path:
4225 instance.initrd_path = self.initrd_path
4226 result.append(("initrd_path", self.initrd_path))
4227 if self.hvm_boot_order:
4228 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4229 instance.hvm_boot_order = None
4231 instance.hvm_boot_order = self.hvm_boot_order
4232 result.append(("hvm_boot_order", self.hvm_boot_order))
4234 self.cfg.AddInstance(instance)
4239 class LUQueryExports(NoHooksLU):
4240 """Query the exports list
4245 def CheckPrereq(self):
4246 """Check that the nodelist contains only existing nodes.
4249 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4251 def Exec(self, feedback_fn):
4252 """Compute the list of all the exported system images.
4255 a dictionary with the structure node->(export-list)
4256 where export-list is a list of the instances exported on
4260 return rpc.call_export_list(self.nodes)
4263 class LUExportInstance(LogicalUnit):
4264 """Export an instance to an image in the cluster.
4267 HPATH = "instance-export"
4268 HTYPE = constants.HTYPE_INSTANCE
4269 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4271 def BuildHooksEnv(self):
4274 This will run on the master, primary node and target node.
4278 "EXPORT_NODE": self.op.target_node,
4279 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4281 env.update(_BuildInstanceHookEnvByObject(self.instance))
4282 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4283 self.op.target_node]
4286 def CheckPrereq(self):
4287 """Check prerequisites.
4289 This checks that the instance name is a valid one.
4292 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4293 self.instance = self.cfg.GetInstanceInfo(instance_name)
4294 if self.instance is None:
4295 raise errors.OpPrereqError("Instance '%s' not found" %
4296 self.op.instance_name)
4299 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4300 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4302 if self.dst_node is None:
4303 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4304 self.op.target_node)
4305 self.op.target_node = self.dst_node.name
4307 def Exec(self, feedback_fn):
4308 """Export an instance to an image in the cluster.
4311 instance = self.instance
4312 dst_node = self.dst_node
4313 src_node = instance.primary_node
4314 # shutdown the instance, unless requested not to do so
4315 if self.op.shutdown:
4316 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4317 self.proc.ChainOpCode(op)
4319 vgname = self.cfg.GetVGName()
4324 for disk in instance.disks:
4325 if disk.iv_name == "sda":
4326 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4327 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4329 if not new_dev_name:
4330 logger.Error("could not snapshot block device %s on node %s" %
4331 (disk.logical_id[1], src_node))
4333 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4334 logical_id=(vgname, new_dev_name),
4335 physical_id=(vgname, new_dev_name),
4336 iv_name=disk.iv_name)
4337 snap_disks.append(new_dev)
4340 if self.op.shutdown:
4341 op = opcodes.OpStartupInstance(instance_name=instance.name,
4343 self.proc.ChainOpCode(op)
4345 # TODO: check for size
4347 for dev in snap_disks:
4348 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4350 logger.Error("could not export block device %s from node"
4352 (dev.logical_id[1], src_node, dst_node.name))
4353 if not rpc.call_blockdev_remove(src_node, dev):
4354 logger.Error("could not remove snapshot block device %s from"
4355 " node %s" % (dev.logical_id[1], src_node))
4357 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4358 logger.Error("could not finalize export for instance %s on node %s" %
4359 (instance.name, dst_node.name))
4361 nodelist = self.cfg.GetNodeList()
4362 nodelist.remove(dst_node.name)
4364 # on one-node clusters nodelist will be empty after the removal
4365 # if we proceed the backup would be removed because OpQueryExports
4366 # substitutes an empty list with the full cluster node list.
4368 op = opcodes.OpQueryExports(nodes=nodelist)
4369 exportlist = self.proc.ChainOpCode(op)
4370 for node in exportlist:
4371 if instance.name in exportlist[node]:
4372 if not rpc.call_export_remove(node, instance.name):
4373 logger.Error("could not remove older export for instance %s"
4374 " on node %s" % (instance.name, node))
4377 class TagsLU(NoHooksLU):
4380 This is an abstract class which is the parent of all the other tags LUs.
4383 def CheckPrereq(self):
4384 """Check prerequisites.
4387 if self.op.kind == constants.TAG_CLUSTER:
4388 self.target = self.cfg.GetClusterInfo()
4389 elif self.op.kind == constants.TAG_NODE:
4390 name = self.cfg.ExpandNodeName(self.op.name)
4392 raise errors.OpPrereqError("Invalid node name (%s)" %
4395 self.target = self.cfg.GetNodeInfo(name)
4396 elif self.op.kind == constants.TAG_INSTANCE:
4397 name = self.cfg.ExpandInstanceName(self.op.name)
4399 raise errors.OpPrereqError("Invalid instance name (%s)" %
4402 self.target = self.cfg.GetInstanceInfo(name)
4404 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4408 class LUGetTags(TagsLU):
4409 """Returns the tags of a given object.
4412 _OP_REQP = ["kind", "name"]
4414 def Exec(self, feedback_fn):
4415 """Returns the tag list.
4418 return self.target.GetTags()
4421 class LUSearchTags(NoHooksLU):
4422 """Searches the tags for a given pattern.
4425 _OP_REQP = ["pattern"]
4427 def CheckPrereq(self):
4428 """Check prerequisites.
4430 This checks the pattern passed for validity by compiling it.
4434 self.re = re.compile(self.op.pattern)
4435 except re.error, err:
4436 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4437 (self.op.pattern, err))
4439 def Exec(self, feedback_fn):
4440 """Returns the tag list.
4444 tgts = [("/cluster", cfg.GetClusterInfo())]
4445 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4446 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4447 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4448 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4450 for path, target in tgts:
4451 for tag in target.GetTags():
4452 if self.re.search(tag):
4453 results.append((path, tag))
4457 class LUAddTags(TagsLU):
4458 """Sets a tag on a given object.
4461 _OP_REQP = ["kind", "name", "tags"]
4463 def CheckPrereq(self):
4464 """Check prerequisites.
4466 This checks the type and length of the tag name and value.
4469 TagsLU.CheckPrereq(self)
4470 for tag in self.op.tags:
4471 objects.TaggableObject.ValidateTag(tag)
4473 def Exec(self, feedback_fn):
4478 for tag in self.op.tags:
4479 self.target.AddTag(tag)
4480 except errors.TagError, err:
4481 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4483 self.cfg.Update(self.target)
4484 except errors.ConfigurationError:
4485 raise errors.OpRetryError("There has been a modification to the"
4486 " config file and the operation has been"
4487 " aborted. Please retry.")
4490 class LUDelTags(TagsLU):
4491 """Delete a list of tags from a given object.
4494 _OP_REQP = ["kind", "name", "tags"]
4496 def CheckPrereq(self):
4497 """Check prerequisites.
4499 This checks that we have the given tag.
4502 TagsLU.CheckPrereq(self)
4503 for tag in self.op.tags:
4504 objects.TaggableObject.ValidateTag(tag)
4505 del_tags = frozenset(self.op.tags)
4506 cur_tags = self.target.GetTags()
4507 if not del_tags <= cur_tags:
4508 diff_tags = del_tags - cur_tags
4509 diff_names = ["'%s'" % tag for tag in diff_tags]
4511 raise errors.OpPrereqError("Tag(s) %s not found" %
4512 (",".join(diff_names)))
4514 def Exec(self, feedback_fn):
4515 """Remove the tag from the object.
4518 for tag in self.op.tags:
4519 self.target.RemoveTag(tag)
4521 self.cfg.Update(self.target)
4522 except errors.ConfigurationError:
4523 raise errors.OpRetryError("There has been a modification to the"
4524 " config file and the operation has been"
4525 " aborted. Please retry.")