4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
278 env["INSTANCE_NIC_COUNT"] = nic_count
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284 """Builds instance related env variables for hooks from an object.
287 instance: objects.Instance object of instance
288 override: dict of values to override
291 'name': instance.name,
292 'primary_node': instance.primary_node,
293 'secondary_nodes': instance.secondary_nodes,
294 'os_type': instance.os,
295 'status': instance.os,
296 'memory': instance.memory,
297 'vcpus': instance.vcpus,
298 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
301 args.update(override)
302 return _BuildInstanceHookEnv(**args)
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306 """Ensure a node has a correct known_hosts entry.
309 fullnode - Fully qualified domain name of host. (str)
310 ip - IPv4 address of host (str)
311 pubkey - the public key of the cluster
314 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
326 logger.Debug('read %s' % (repr(rawline),))
328 parts = rawline.rstrip('\r\n').split()
330 # Ignore unwanted lines
331 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332 fields = parts[0].split(',')
337 for spec in [ ip, fullnode ]:
338 if spec not in fields:
343 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344 if haveall and key == pubkey:
346 save_lines.append(rawline)
347 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350 if havesome and (not haveall or key != pubkey):
352 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355 save_lines.append(rawline)
358 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362 save_lines = save_lines + add_lines
364 # Write a new file and replace old.
365 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367 newfile = os.fdopen(fd, 'w')
369 newfile.write(''.join(save_lines))
372 logger.Debug("Wrote new known_hosts.")
373 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376 # Simply appending a new line will do the trick.
378 for add in add_lines:
384 def _HasValidVG(vglist, vgname):
385 """Checks if the volume group list is valid.
387 A non-None return value means there's an error, and the return value
388 is the error message.
391 vgsize = vglist.get(vgname, None)
393 return "volume group '%s' missing" % vgname
395 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
400 def _InitSSHSetup(node):
401 """Setup the SSH configuration for the cluster.
404 This generates a dsa keypair for root, adds the pub key to the
405 permitted hosts and adds the hostkey to its own known hosts.
408 node: the name of this host as a fqdn
411 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413 for name in priv_key, pub_key:
414 if os.path.exists(name):
415 utils.CreateBackup(name)
416 utils.RemoveFile(name)
418 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425 f = open(pub_key, 'r')
427 utils.AddAuthorizedKey(auth_keys, f.read(8192))
432 def _InitGanetiServerSetup(ss):
433 """Setup the necessary configuration for the initial node daemon.
435 This creates the nodepass file containing the shared password for
436 the cluster and also generates the SSL certificate.
439 # Create pseudo random password
440 randpass = sha.new(os.urandom(64)).hexdigest()
441 # and write it into sstore
442 ss.SetKey(ss.SS_NODED_PASS, randpass)
444 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445 "-days", str(365*5), "-nodes", "-x509",
446 "-keyout", constants.SSL_CERT_FILE,
447 "-out", constants.SSL_CERT_FILE, "-batch"])
449 raise errors.OpExecError("could not generate server ssl cert, command"
450 " %s had exitcode %s and error message %s" %
451 (result.cmd, result.exit_code, result.output))
453 os.chmod(constants.SSL_CERT_FILE, 0400)
455 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458 raise errors.OpExecError("Could not start the node daemon, command %s"
459 " had exitcode %s and error %s" %
460 (result.cmd, result.exit_code, result.output))
463 def _CheckInstanceBridgesExist(instance):
464 """Check that the brigdes needed by an instance exist.
467 # check bridges existance
468 brlist = [nic.bridge for nic in instance.nics]
469 if not rpc.call_bridges_exist(instance.primary_node, brlist):
470 raise errors.OpPrereqError("one or more target bridges %s does not"
471 " exist on destination node '%s'" %
472 (brlist, instance.primary_node))
475 class LUInitCluster(LogicalUnit):
476 """Initialise the cluster.
479 HPATH = "cluster-init"
480 HTYPE = constants.HTYPE_CLUSTER
481 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482 "def_bridge", "master_netdev"]
485 def BuildHooksEnv(self):
488 Notes: Since we don't require a cluster, we must manually add
489 ourselves in the post-run node list.
492 env = {"OP_TARGET": self.op.cluster_name}
493 return env, [], [self.hostname.name]
495 def CheckPrereq(self):
496 """Verify that the passed name is a valid one.
499 if config.ConfigWriter.IsCluster():
500 raise errors.OpPrereqError("Cluster is already initialised")
502 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503 if not os.path.exists(constants.VNC_PASSWORD_FILE):
504 raise errors.OpPrereqError("Please prepare the cluster VNC"
506 constants.VNC_PASSWORD_FILE)
508 self.hostname = hostname = utils.HostInfo()
510 if hostname.ip.startswith("127."):
511 raise errors.OpPrereqError("This host's IP resolves to the private"
512 " range (%s). Please fix DNS or /etc/hosts." %
515 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518 constants.DEFAULT_NODED_PORT):
519 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520 " to %s,\nbut this ip address does not"
521 " belong to this host."
522 " Aborting." % hostname.ip)
524 secondary_ip = getattr(self.op, "secondary_ip", None)
525 if secondary_ip and not utils.IsValidIP(secondary_ip):
526 raise errors.OpPrereqError("Invalid secondary ip given")
528 secondary_ip != hostname.ip and
529 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530 constants.DEFAULT_NODED_PORT))):
531 raise errors.OpPrereqError("You gave %s as secondary IP,"
532 " but it does not belong to this host." %
534 self.secondary_ip = secondary_ip
536 # checks presence of the volume group given
537 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540 raise errors.OpPrereqError("Error: %s" % vgstatus)
542 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547 if self.op.hypervisor_type not in constants.HYPER_TYPES:
548 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549 self.op.hypervisor_type)
551 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554 (self.op.master_netdev,
555 result.output.strip()))
557 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559 raise errors.OpPrereqError("Init.d script '%s' missing or not"
560 " executable." % constants.NODE_INITD_SCRIPT)
562 def Exec(self, feedback_fn):
563 """Initialize the cluster.
566 clustername = self.clustername
567 hostname = self.hostname
569 # set up the simple store
570 self.sstore = ss = ssconf.SimpleStore()
571 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577 # set up the inter-node password and certificate
578 _InitGanetiServerSetup(ss)
580 # start the master ip
581 rpc.call_node_start_master(hostname.name)
583 # set up ssh config and /etc/hosts
584 f = open(constants.SSH_HOST_RSA_PUB, 'r')
589 sshkey = sshline.split(" ")[1]
591 _AddHostToEtcHosts(hostname.name)
593 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595 _InitSSHSetup(hostname.name)
597 # init of cluster config file
598 self.cfg = cfgw = config.ConfigWriter()
599 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600 sshkey, self.op.mac_prefix,
601 self.op.vg_name, self.op.def_bridge)
604 class LUDestroyCluster(NoHooksLU):
605 """Logical unit for destroying the cluster.
610 def CheckPrereq(self):
611 """Check prerequisites.
613 This checks whether the cluster is empty.
615 Any errors are signalled by raising errors.OpPrereqError.
618 master = self.sstore.GetMasterNode()
620 nodelist = self.cfg.GetNodeList()
621 if len(nodelist) != 1 or nodelist[0] != master:
622 raise errors.OpPrereqError("There are still %d node(s) in"
623 " this cluster." % (len(nodelist) - 1))
624 instancelist = self.cfg.GetInstanceList()
626 raise errors.OpPrereqError("There are still %d instance(s) in"
627 " this cluster." % len(instancelist))
629 def Exec(self, feedback_fn):
630 """Destroys the cluster.
633 master = self.sstore.GetMasterNode()
634 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635 utils.CreateBackup(priv_key)
636 utils.CreateBackup(pub_key)
637 rpc.call_node_leave_cluster(master)
640 class LUVerifyCluster(NoHooksLU):
641 """Verifies the cluster status.
646 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647 remote_version, feedback_fn):
648 """Run multiple tests against a node.
651 - compares ganeti version
652 - checks vg existance and size > 20G
653 - checks config file checksum
654 - checks ssh to other nodes
657 node: name of the node to check
658 file_list: required list of files
659 local_cksum: dictionary of local files and their checksums
662 # compares ganeti version
663 local_version = constants.PROTOCOL_VERSION
664 if not remote_version:
665 feedback_fn(" - ERROR: connection to %s failed" % (node))
668 if local_version != remote_version:
669 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
670 (local_version, node, remote_version))
673 # checks vg existance and size > 20G
677 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
681 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
686 # checks config file checksum
689 if 'filelist' not in node_result:
691 feedback_fn(" - ERROR: node hasn't returned file checksum data")
693 remote_cksum = node_result['filelist']
694 for file_name in file_list:
695 if file_name not in remote_cksum:
697 feedback_fn(" - ERROR: file '%s' missing" % file_name)
698 elif remote_cksum[file_name] != local_cksum[file_name]:
700 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
702 if 'nodelist' not in node_result:
704 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
706 if node_result['nodelist']:
708 for node in node_result['nodelist']:
709 feedback_fn(" - ERROR: communication with node '%s': %s" %
710 (node, node_result['nodelist'][node]))
711 hyp_result = node_result.get('hypervisor', None)
712 if hyp_result is not None:
713 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
716 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717 """Verify an instance.
719 This function checks to see if the required block devices are
720 available on the instance's node.
725 instancelist = self.cfg.GetInstanceList()
726 if not instance in instancelist:
727 feedback_fn(" - ERROR: instance %s not in instance list %s" %
728 (instance, instancelist))
731 instanceconfig = self.cfg.GetInstanceInfo(instance)
732 node_current = instanceconfig.primary_node
735 instanceconfig.MapLVsByNode(node_vol_should)
737 for node in node_vol_should:
738 for volume in node_vol_should[node]:
739 if node not in node_vol_is or volume not in node_vol_is[node]:
740 feedback_fn(" - ERROR: volume %s missing on node %s" %
744 if not instanceconfig.status == 'down':
745 if not instance in node_instance[node_current]:
746 feedback_fn(" - ERROR: instance %s not running on node %s" %
747 (instance, node_current))
750 for node in node_instance:
751 if (not node == node_current):
752 if instance in node_instance[node]:
753 feedback_fn(" - ERROR: instance %s should not run on node %s" %
759 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760 """Verify if there are any unknown volumes in the cluster.
762 The .os, .swap and backup volumes are ignored. All other volumes are
768 for node in node_vol_is:
769 for volume in node_vol_is[node]:
770 if node not in node_vol_should or volume not in node_vol_should[node]:
771 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
776 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777 """Verify the list of running instances.
779 This checks what instances are running but unknown to the cluster.
783 for node in node_instance:
784 for runninginstance in node_instance[node]:
785 if runninginstance not in instancelist:
786 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
787 (runninginstance, node))
791 def CheckPrereq(self):
792 """Check prerequisites.
794 This has no prerequisites.
799 def Exec(self, feedback_fn):
800 """Verify integrity of cluster, performing various test on nodes.
804 feedback_fn("* Verifying global settings")
805 for msg in self.cfg.VerifyConfig():
806 feedback_fn(" - ERROR: %s" % msg)
808 vg_name = self.cfg.GetVGName()
809 nodelist = utils.NiceSort(self.cfg.GetNodeList())
810 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
814 # FIXME: verify OS list
816 file_names = list(self.sstore.GetFileList())
817 file_names.append(constants.SSL_CERT_FILE)
818 file_names.append(constants.CLUSTER_CONF_FILE)
819 local_checksums = utils.FingerprintFiles(file_names)
821 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823 all_instanceinfo = rpc.call_instance_list(nodelist)
824 all_vglist = rpc.call_vg_list(nodelist)
825 node_verify_param = {
826 'filelist': file_names,
827 'nodelist': nodelist,
830 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831 all_rversion = rpc.call_version(nodelist)
833 for node in nodelist:
834 feedback_fn("* Verifying node %s" % node)
835 result = self._VerifyNode(node, file_names, local_checksums,
836 all_vglist[node], all_nvinfo[node],
837 all_rversion[node], feedback_fn)
841 volumeinfo = all_volumeinfo[node]
843 if isinstance(volumeinfo, basestring):
844 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
845 (node, volumeinfo[-400:].encode('string_escape')))
847 node_volume[node] = {}
848 elif not isinstance(volumeinfo, dict):
849 feedback_fn(" - ERROR: connection to %s failed" % (node,))
853 node_volume[node] = volumeinfo
856 nodeinstance = all_instanceinfo[node]
857 if type(nodeinstance) != list:
858 feedback_fn(" - ERROR: connection to %s failed" % (node,))
862 node_instance[node] = nodeinstance
866 for instance in instancelist:
867 feedback_fn("* Verifying instance %s" % instance)
868 result = self._VerifyInstance(instance, node_volume, node_instance,
872 inst_config = self.cfg.GetInstanceInfo(instance)
874 inst_config.MapLVsByNode(node_vol_should)
876 feedback_fn("* Verifying orphan volumes")
877 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
881 feedback_fn("* Verifying remaining instances")
882 result = self._VerifyOrphanInstances(instancelist, node_instance,
889 class LUVerifyDisks(NoHooksLU):
890 """Verifies the cluster disks status.
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This has no prerequisites.
903 def Exec(self, feedback_fn):
904 """Verify integrity of cluster disks.
907 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909 vg_name = self.cfg.GetVGName()
910 nodes = utils.NiceSort(self.cfg.GetNodeList())
911 instances = [self.cfg.GetInstanceInfo(name)
912 for name in self.cfg.GetInstanceList()]
915 for inst in instances:
917 if (inst.status != "up" or
918 inst.disk_template not in constants.DTS_NET_MIRROR):
920 inst.MapLVsByNode(inst_lvs)
921 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922 for node, vol_list in inst_lvs.iteritems():
924 nv_dict[(node, vol)] = inst
929 node_lvs = rpc.call_volume_list(nodes, vg_name)
936 if isinstance(lvs, basestring):
937 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939 elif not isinstance(lvs, dict):
940 logger.Info("connection to node %s failed or invalid data returned" %
942 res_nodes.append(node)
945 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946 inst = nv_dict.pop((node, lv_name), None)
947 if (not lv_online and inst is not None
948 and inst.name not in res_instances):
949 res_instances.append(inst.name)
951 # any leftover items in nv_dict are missing LVs, let's arrange the
953 for key, inst in nv_dict.iteritems():
954 if inst.name not in res_missing:
955 res_missing[inst.name] = []
956 res_missing[inst.name].append(key)
961 class LURenameCluster(LogicalUnit):
962 """Rename the cluster.
965 HPATH = "cluster-rename"
966 HTYPE = constants.HTYPE_CLUSTER
969 def BuildHooksEnv(self):
974 "OP_TARGET": self.op.sstore.GetClusterName(),
975 "NEW_NAME": self.op.name,
977 mn = self.sstore.GetMasterNode()
978 return env, [mn], [mn]
980 def CheckPrereq(self):
981 """Verify that the passed name is a valid one.
984 hostname = utils.HostInfo(self.op.name)
986 new_name = hostname.name
987 self.ip = new_ip = hostname.ip
988 old_name = self.sstore.GetClusterName()
989 old_ip = self.sstore.GetMasterIP()
990 if new_name == old_name and new_ip == old_ip:
991 raise errors.OpPrereqError("Neither the name nor the IP address of the"
992 " cluster has changed")
994 result = utils.RunCmd(["fping", "-q", new_ip])
995 if not result.failed:
996 raise errors.OpPrereqError("The given cluster IP address (%s) is"
997 " reachable on the network. Aborting." %
1000 self.op.name = new_name
1002 def Exec(self, feedback_fn):
1003 """Rename the cluster.
1006 clustername = self.op.name
1010 # shutdown the master IP
1011 master = ss.GetMasterNode()
1012 if not rpc.call_node_stop_master(master):
1013 raise errors.OpExecError("Could not disable the master role")
1017 ss.SetKey(ss.SS_MASTER_IP, ip)
1018 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020 # Distribute updated ss config to all nodes
1021 myself = self.cfg.GetNodeInfo(master)
1022 dist_nodes = self.cfg.GetNodeList()
1023 if myself.name in dist_nodes:
1024 dist_nodes.remove(myself.name)
1026 logger.Debug("Copying updated ssconf data to all nodes")
1027 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028 fname = ss.KeyToFilename(keyname)
1029 result = rpc.call_upload_file(dist_nodes, fname)
1030 for to_node in dist_nodes:
1031 if not result[to_node]:
1032 logger.Error("copy of file %s to node %s failed" %
1035 if not rpc.call_node_start_master(master):
1036 logger.Error("Could not re-enable the master role on the master,"
1037 " please restart manually.")
1040 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041 """Sleep and poll for an instance's disk to sync.
1044 if not instance.disks:
1048 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050 node = instance.primary_node
1052 for dev in instance.disks:
1053 cfgw.SetDiskID(dev, node)
1059 cumul_degraded = False
1060 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062 proc.LogWarning("Can't get any data from node %s" % node)
1065 raise errors.RemoteError("Can't contact node %s for mirror data,"
1066 " aborting." % node)
1070 for i in range(len(rstats)):
1073 proc.LogWarning("Can't compute data for node %s/%s" %
1074 (node, instance.disks[i].iv_name))
1076 # we ignore the ldisk parameter
1077 perc_done, est_time, is_degraded, _ = mstat
1078 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079 if perc_done is not None:
1081 if est_time is not None:
1082 rem_time = "%d estimated seconds remaining" % est_time
1085 rem_time = "no time estimate"
1086 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087 (instance.disks[i].iv_name, perc_done, rem_time))
1094 time.sleep(min(60, max_time))
1100 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101 return not cumul_degraded
1104 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105 """Check that mirrors are not degraded.
1107 The ldisk parameter, if True, will change the test from the
1108 is_degraded attribute (which represents overall non-ok status for
1109 the device(s)) to the ldisk (representing the local storage status).
1112 cfgw.SetDiskID(dev, node)
1119 if on_primary or dev.AssembleOnSecondary():
1120 rstats = rpc.call_blockdev_find(node, dev)
1122 logger.ToStderr("Can't get any data from node %s" % node)
1125 result = result and (not rstats[idx])
1127 for child in dev.children:
1128 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133 class LUDiagnoseOS(NoHooksLU):
1134 """Logical unit for OS diagnose/query.
1139 def CheckPrereq(self):
1140 """Check prerequisites.
1142 This always succeeds, since this is a pure query LU.
1147 def Exec(self, feedback_fn):
1148 """Compute the list of OSes.
1151 node_list = self.cfg.GetNodeList()
1152 node_data = rpc.call_os_diagnose(node_list)
1153 if node_data == False:
1154 raise errors.OpExecError("Can't gather the list of OSes")
1158 class LURemoveNode(LogicalUnit):
1159 """Logical unit for removing a node.
1162 HPATH = "node-remove"
1163 HTYPE = constants.HTYPE_NODE
1164 _OP_REQP = ["node_name"]
1166 def BuildHooksEnv(self):
1169 This doesn't run on the target node in the pre phase as a failed
1170 node would not allows itself to run.
1174 "OP_TARGET": self.op.node_name,
1175 "NODE_NAME": self.op.node_name,
1177 all_nodes = self.cfg.GetNodeList()
1178 all_nodes.remove(self.op.node_name)
1179 return env, all_nodes, all_nodes
1181 def CheckPrereq(self):
1182 """Check prerequisites.
1185 - the node exists in the configuration
1186 - it does not have primary or secondary instances
1187 - it's not the master
1189 Any errors are signalled by raising errors.OpPrereqError.
1192 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196 instance_list = self.cfg.GetInstanceList()
1198 masternode = self.sstore.GetMasterNode()
1199 if node.name == masternode:
1200 raise errors.OpPrereqError("Node is the master node,"
1201 " you need to failover first.")
1203 for instance_name in instance_list:
1204 instance = self.cfg.GetInstanceInfo(instance_name)
1205 if node.name == instance.primary_node:
1206 raise errors.OpPrereqError("Instance %s still running on the node,"
1207 " please remove first." % instance_name)
1208 if node.name in instance.secondary_nodes:
1209 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210 " please remove first." % instance_name)
1211 self.op.node_name = node.name
1214 def Exec(self, feedback_fn):
1215 """Removes the node from the cluster.
1219 logger.Info("stopping the node daemon and removing configs from node %s" %
1222 rpc.call_node_leave_cluster(node.name)
1224 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226 logger.Info("Removing node %s from config" % node.name)
1228 self.cfg.RemoveNode(node.name)
1230 _RemoveHostFromEtcHosts(node.name)
1233 class LUQueryNodes(NoHooksLU):
1234 """Logical unit for querying nodes.
1237 _OP_REQP = ["output_fields", "names"]
1239 def CheckPrereq(self):
1240 """Check prerequisites.
1242 This checks that the fields required are valid output fields.
1245 self.dynamic_fields = frozenset(["dtotal", "dfree",
1246 "mtotal", "mnode", "mfree",
1249 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250 "pinst_list", "sinst_list",
1252 dynamic=self.dynamic_fields,
1253 selected=self.op.output_fields)
1255 self.wanted = _GetWantedNodes(self, self.op.names)
1257 def Exec(self, feedback_fn):
1258 """Computes the list of nodes and their attributes.
1261 nodenames = self.wanted
1262 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264 # begin data gathering
1266 if self.dynamic_fields.intersection(self.op.output_fields):
1268 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269 for name in nodenames:
1270 nodeinfo = node_data.get(name, None)
1273 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278 "bootid": nodeinfo['bootid'],
1281 live_data[name] = {}
1283 live_data = dict.fromkeys(nodenames, {})
1285 node_to_primary = dict([(name, set()) for name in nodenames])
1286 node_to_secondary = dict([(name, set()) for name in nodenames])
1288 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289 "sinst_cnt", "sinst_list"))
1290 if inst_fields & frozenset(self.op.output_fields):
1291 instancelist = self.cfg.GetInstanceList()
1293 for instance_name in instancelist:
1294 inst = self.cfg.GetInstanceInfo(instance_name)
1295 if inst.primary_node in node_to_primary:
1296 node_to_primary[inst.primary_node].add(inst.name)
1297 for secnode in inst.secondary_nodes:
1298 if secnode in node_to_secondary:
1299 node_to_secondary[secnode].add(inst.name)
1301 # end data gathering
1304 for node in nodelist:
1306 for field in self.op.output_fields:
1309 elif field == "pinst_list":
1310 val = list(node_to_primary[node.name])
1311 elif field == "sinst_list":
1312 val = list(node_to_secondary[node.name])
1313 elif field == "pinst_cnt":
1314 val = len(node_to_primary[node.name])
1315 elif field == "sinst_cnt":
1316 val = len(node_to_secondary[node.name])
1317 elif field == "pip":
1318 val = node.primary_ip
1319 elif field == "sip":
1320 val = node.secondary_ip
1321 elif field in self.dynamic_fields:
1322 val = live_data[node.name].get(field, None)
1324 raise errors.ParameterError(field)
1325 node_output.append(val)
1326 output.append(node_output)
1331 class LUQueryNodeVolumes(NoHooksLU):
1332 """Logical unit for getting volumes on node(s).
1335 _OP_REQP = ["nodes", "output_fields"]
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1340 This checks that the fields required are valid output fields.
1343 self.nodes = _GetWantedNodes(self, self.op.nodes)
1345 _CheckOutputFields(static=["node"],
1346 dynamic=["phys", "vg", "name", "size", "instance"],
1347 selected=self.op.output_fields)
1350 def Exec(self, feedback_fn):
1351 """Computes the list of nodes and their attributes.
1354 nodenames = self.nodes
1355 volumes = rpc.call_node_volumes(nodenames)
1357 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358 in self.cfg.GetInstanceList()]
1360 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1363 for node in nodenames:
1364 if node not in volumes or not volumes[node]:
1367 node_vols = volumes[node][:]
1368 node_vols.sort(key=lambda vol: vol['dev'])
1370 for vol in node_vols:
1372 for field in self.op.output_fields:
1375 elif field == "phys":
1379 elif field == "name":
1381 elif field == "size":
1382 val = int(float(vol['size']))
1383 elif field == "instance":
1385 if node not in lv_by_node[inst]:
1387 if vol['name'] in lv_by_node[inst][node]:
1393 raise errors.ParameterError(field)
1394 node_output.append(str(val))
1396 output.append(node_output)
1401 class LUAddNode(LogicalUnit):
1402 """Logical unit for adding node to the cluster.
1406 HTYPE = constants.HTYPE_NODE
1407 _OP_REQP = ["node_name"]
1409 def BuildHooksEnv(self):
1412 This will run on all nodes before, and on all nodes + the new node after.
1416 "OP_TARGET": self.op.node_name,
1417 "NODE_NAME": self.op.node_name,
1418 "NODE_PIP": self.op.primary_ip,
1419 "NODE_SIP": self.op.secondary_ip,
1421 nodes_0 = self.cfg.GetNodeList()
1422 nodes_1 = nodes_0 + [self.op.node_name, ]
1423 return env, nodes_0, nodes_1
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the new node is not already in the config
1431 - its parameters (single/dual homed) matches the cluster
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node_name = self.op.node_name
1439 dns_data = utils.HostInfo(node_name)
1441 node = dns_data.name
1442 primary_ip = self.op.primary_ip = dns_data.ip
1443 secondary_ip = getattr(self.op, "secondary_ip", None)
1444 if secondary_ip is None:
1445 secondary_ip = primary_ip
1446 if not utils.IsValidIP(secondary_ip):
1447 raise errors.OpPrereqError("Invalid secondary IP given")
1448 self.op.secondary_ip = secondary_ip
1449 node_list = cfg.GetNodeList()
1450 if node in node_list:
1451 raise errors.OpPrereqError("Node %s is already in the configuration"
1454 for existing_node_name in node_list:
1455 existing_node = cfg.GetNodeInfo(existing_node_name)
1456 if (existing_node.primary_ip == primary_ip or
1457 existing_node.secondary_ip == primary_ip or
1458 existing_node.primary_ip == secondary_ip or
1459 existing_node.secondary_ip == secondary_ip):
1460 raise errors.OpPrereqError("New node ip address(es) conflict with"
1461 " existing node %s" % existing_node.name)
1463 # check that the type of the node (single versus dual homed) is the
1464 # same as for the master
1465 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466 master_singlehomed = myself.secondary_ip == myself.primary_ip
1467 newbie_singlehomed = secondary_ip == primary_ip
1468 if master_singlehomed != newbie_singlehomed:
1469 if master_singlehomed:
1470 raise errors.OpPrereqError("The master has no private ip but the"
1471 " new node has one")
1473 raise errors.OpPrereqError("The master has a private ip but the"
1474 " new node doesn't have one")
1476 # checks reachablity
1477 if not utils.TcpPing(utils.HostInfo().name,
1479 constants.DEFAULT_NODED_PORT):
1480 raise errors.OpPrereqError("Node not reachable by ping")
1482 if not newbie_singlehomed:
1483 # check reachability from my secondary ip to newbie's secondary ip
1484 if not utils.TcpPing(myself.secondary_ip,
1486 constants.DEFAULT_NODED_PORT):
1487 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488 " based ping to noded port")
1490 self.new_node = objects.Node(name=node,
1491 primary_ip=primary_ip,
1492 secondary_ip=secondary_ip)
1494 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497 constants.VNC_PASSWORD_FILE)
1499 def Exec(self, feedback_fn):
1500 """Adds the new node to the cluster.
1503 new_node = self.new_node
1504 node = new_node.name
1506 # set up inter-node password and certificate and restarts the node daemon
1507 gntpass = self.sstore.GetNodeDaemonPassword()
1508 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509 raise errors.OpExecError("ganeti password corruption detected")
1510 f = open(constants.SSL_CERT_FILE)
1512 gntpem = f.read(8192)
1515 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516 # so we use this to detect an invalid certificate; as long as the
1517 # cert doesn't contain this, the here-document will be correctly
1518 # parsed by the shell sequence below
1519 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521 if not gntpem.endswith("\n"):
1522 raise errors.OpExecError("PEM must end with newline")
1523 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525 # and then connect with ssh to set password and start ganeti-noded
1526 # note that all the below variables are sanitized at this point,
1527 # either by being constants or by the checks above
1529 mycommand = ("umask 077 && "
1530 "echo '%s' > '%s' && "
1531 "cat > '%s' << '!EOF.' && \n"
1532 "%s!EOF.\n%s restart" %
1533 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534 constants.SSL_CERT_FILE, gntpem,
1535 constants.NODE_INITD_SCRIPT))
1537 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539 raise errors.OpExecError("Remote command on node %s, error: %s,"
1541 (node, result.fail_reason, result.output))
1543 # check connectivity
1546 result = rpc.call_version([node])[node]
1548 if constants.PROTOCOL_VERSION == result:
1549 logger.Info("communication to node %s fine, sw version %s match" %
1552 raise errors.OpExecError("Version mismatch master version %s,"
1553 " node version %s" %
1554 (constants.PROTOCOL_VERSION, result))
1556 raise errors.OpExecError("Cannot get version from the new node")
1559 logger.Info("copy ssh key to node %s" % node)
1560 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1569 keyarray.append(f.read())
1573 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574 keyarray[3], keyarray[4], keyarray[5])
1577 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579 # Add node to our /etc/hosts, and add key to known_hosts
1580 _AddHostToEtcHosts(new_node.name)
1582 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583 self.cfg.GetHostKey())
1585 if new_node.secondary_ip != new_node.primary_ip:
1586 if not rpc.call_node_tcp_ping(new_node.name,
1587 constants.LOCALHOST_IP_ADDRESS,
1588 new_node.secondary_ip,
1589 constants.DEFAULT_NODED_PORT,
1591 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592 " you gave (%s). Please fix and re-run this"
1593 " command." % new_node.secondary_ip)
1595 success, msg = ssh.VerifyNodeHostname(node)
1597 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598 " than the one the resolver gives: %s."
1599 " Please fix and re-run this command." %
1602 # Distribute updated /etc/hosts and known_hosts to all nodes,
1603 # including the node just added
1604 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605 dist_nodes = self.cfg.GetNodeList() + [node]
1606 if myself.name in dist_nodes:
1607 dist_nodes.remove(myself.name)
1609 logger.Debug("Copying hosts and known_hosts to all nodes")
1610 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611 result = rpc.call_upload_file(dist_nodes, fname)
1612 for to_node in dist_nodes:
1613 if not result[to_node]:
1614 logger.Error("copy of file %s to node %s failed" %
1617 to_copy = ss.GetFileList()
1618 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619 to_copy.append(constants.VNC_PASSWORD_FILE)
1620 for fname in to_copy:
1621 if not ssh.CopyFileToNode(node, fname):
1622 logger.Error("could not copy file %s to node %s" % (fname, node))
1624 logger.Info("adding node %s to cluster.conf" % node)
1625 self.cfg.AddNode(new_node)
1628 class LUMasterFailover(LogicalUnit):
1629 """Failover the master node to the current node.
1631 This is a special LU in that it must run on a non-master node.
1634 HPATH = "master-failover"
1635 HTYPE = constants.HTYPE_CLUSTER
1639 def BuildHooksEnv(self):
1642 This will run on the new master only in the pre phase, and on all
1643 the nodes in the post phase.
1647 "OP_TARGET": self.new_master,
1648 "NEW_MASTER": self.new_master,
1649 "OLD_MASTER": self.old_master,
1651 return env, [self.new_master], self.cfg.GetNodeList()
1653 def CheckPrereq(self):
1654 """Check prerequisites.
1656 This checks that we are not already the master.
1659 self.new_master = utils.HostInfo().name
1660 self.old_master = self.sstore.GetMasterNode()
1662 if self.old_master == self.new_master:
1663 raise errors.OpPrereqError("This commands must be run on the node"
1664 " where you want the new master to be."
1665 " %s is already the master" %
1668 def Exec(self, feedback_fn):
1669 """Failover the master node.
1671 This command, when run on a non-master node, will cause the current
1672 master to cease being master, and the non-master to become new
1676 #TODO: do not rely on gethostname returning the FQDN
1677 logger.Info("setting master to %s, old master: %s" %
1678 (self.new_master, self.old_master))
1680 if not rpc.call_node_stop_master(self.old_master):
1681 logger.Error("could disable the master role on the old master"
1682 " %s, please disable manually" % self.old_master)
1685 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688 logger.Error("could not distribute the new simple store master file"
1689 " to the other nodes, please check.")
1691 if not rpc.call_node_start_master(self.new_master):
1692 logger.Error("could not start the master role on the new master"
1693 " %s, please check" % self.new_master)
1694 feedback_fn("Error in activating the master IP on the new master,"
1695 " please fix manually.")
1699 class LUQueryClusterInfo(NoHooksLU):
1700 """Query cluster configuration.
1706 def CheckPrereq(self):
1707 """No prerequsites needed for this LU.
1712 def Exec(self, feedback_fn):
1713 """Return cluster config.
1717 "name": self.sstore.GetClusterName(),
1718 "software_version": constants.RELEASE_VERSION,
1719 "protocol_version": constants.PROTOCOL_VERSION,
1720 "config_version": constants.CONFIG_VERSION,
1721 "os_api_version": constants.OS_API_VERSION,
1722 "export_version": constants.EXPORT_VERSION,
1723 "master": self.sstore.GetMasterNode(),
1724 "architecture": (platform.architecture()[0], platform.machine()),
1730 class LUClusterCopyFile(NoHooksLU):
1731 """Copy file to cluster.
1734 _OP_REQP = ["nodes", "filename"]
1736 def CheckPrereq(self):
1737 """Check prerequisites.
1739 It should check that the named file exists and that the given list
1743 if not os.path.exists(self.op.filename):
1744 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746 self.nodes = _GetWantedNodes(self, self.op.nodes)
1748 def Exec(self, feedback_fn):
1749 """Copy a file from master to some nodes.
1752 opts - class with options as members
1753 args - list containing a single element, the file name
1755 nodes - list containing the name of target nodes; if empty, all nodes
1758 filename = self.op.filename
1760 myname = utils.HostInfo().name
1762 for node in self.nodes:
1765 if not ssh.CopyFileToNode(node, filename):
1766 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1769 class LUDumpClusterConfig(NoHooksLU):
1770 """Return a text-representation of the cluster-config.
1775 def CheckPrereq(self):
1776 """No prerequisites.
1781 def Exec(self, feedback_fn):
1782 """Dump a representation of the cluster config to the standard output.
1785 return self.cfg.DumpConfig()
1788 class LURunClusterCommand(NoHooksLU):
1789 """Run a command on some nodes.
1792 _OP_REQP = ["command", "nodes"]
1794 def CheckPrereq(self):
1795 """Check prerequisites.
1797 It checks that the given list of nodes is valid.
1800 self.nodes = _GetWantedNodes(self, self.op.nodes)
1802 def Exec(self, feedback_fn):
1803 """Run a command on some nodes.
1807 for node in self.nodes:
1808 result = ssh.SSHCall(node, "root", self.op.command)
1809 data.append((node, result.output, result.exit_code))
1814 class LUActivateInstanceDisks(NoHooksLU):
1815 """Bring up an instance's disks.
1818 _OP_REQP = ["instance_name"]
1820 def CheckPrereq(self):
1821 """Check prerequisites.
1823 This checks that the instance is in the cluster.
1826 instance = self.cfg.GetInstanceInfo(
1827 self.cfg.ExpandInstanceName(self.op.instance_name))
1828 if instance is None:
1829 raise errors.OpPrereqError("Instance '%s' not known" %
1830 self.op.instance_name)
1831 self.instance = instance
1834 def Exec(self, feedback_fn):
1835 """Activate the disks.
1838 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840 raise errors.OpExecError("Cannot activate block devices")
1845 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846 """Prepare the block devices for an instance.
1848 This sets up the block devices on all nodes.
1851 instance: a ganeti.objects.Instance object
1852 ignore_secondaries: if true, errors on secondary nodes won't result
1853 in an error return from the function
1856 false if the operation failed
1857 list of (host, instance_visible_name, node_visible_name) if the operation
1858 suceeded with the mapping from node devices to instance devices
1862 for inst_disk in instance.disks:
1863 master_result = None
1864 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865 cfg.SetDiskID(node_disk, node)
1866 is_primary = node == instance.primary_node
1867 result = rpc.call_blockdev_assemble(node, node_disk,
1868 instance.name, is_primary)
1870 logger.Error("could not prepare block device %s on node %s"
1871 " (is_primary=%s)" %
1872 (inst_disk.iv_name, node, is_primary))
1873 if is_primary or not ignore_secondaries:
1876 master_result = result
1877 device_info.append((instance.primary_node, inst_disk.iv_name,
1880 # leave the disks configured for the primary node
1881 # this is a workaround that would be fixed better by
1882 # improving the logical/physical id handling
1883 for disk in instance.disks:
1884 cfg.SetDiskID(disk, instance.primary_node)
1886 return disks_ok, device_info
1889 def _StartInstanceDisks(cfg, instance, force):
1890 """Start the disks of an instance.
1893 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894 ignore_secondaries=force)
1896 _ShutdownInstanceDisks(instance, cfg)
1897 if force is not None and not force:
1898 logger.Error("If the message above refers to a secondary node,"
1899 " you can retry the operation using '--force'.")
1900 raise errors.OpExecError("Disk consistency error")
1903 class LUDeactivateInstanceDisks(NoHooksLU):
1904 """Shutdown an instance's disks.
1907 _OP_REQP = ["instance_name"]
1909 def CheckPrereq(self):
1910 """Check prerequisites.
1912 This checks that the instance is in the cluster.
1915 instance = self.cfg.GetInstanceInfo(
1916 self.cfg.ExpandInstanceName(self.op.instance_name))
1917 if instance is None:
1918 raise errors.OpPrereqError("Instance '%s' not known" %
1919 self.op.instance_name)
1920 self.instance = instance
1922 def Exec(self, feedback_fn):
1923 """Deactivate the disks
1926 instance = self.instance
1927 ins_l = rpc.call_instance_list([instance.primary_node])
1928 ins_l = ins_l[instance.primary_node]
1929 if not type(ins_l) is list:
1930 raise errors.OpExecError("Can't contact node '%s'" %
1931 instance.primary_node)
1933 if self.instance.name in ins_l:
1934 raise errors.OpExecError("Instance is running, can't shutdown"
1937 _ShutdownInstanceDisks(instance, self.cfg)
1940 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941 """Shutdown block devices of an instance.
1943 This does the shutdown on all nodes of the instance.
1945 If the ignore_primary is false, errors on the primary node are
1950 for disk in instance.disks:
1951 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952 cfg.SetDiskID(top_disk, node)
1953 if not rpc.call_blockdev_shutdown(node, top_disk):
1954 logger.Error("could not shutdown block device %s on node %s" %
1955 (disk.iv_name, node))
1956 if not ignore_primary or node != instance.primary_node:
1961 class LUStartupInstance(LogicalUnit):
1962 """Starts an instance.
1965 HPATH = "instance-start"
1966 HTYPE = constants.HTYPE_INSTANCE
1967 _OP_REQP = ["instance_name", "force"]
1969 def BuildHooksEnv(self):
1972 This runs on master, primary and secondary nodes of the instance.
1976 "FORCE": self.op.force,
1978 env.update(_BuildInstanceHookEnvByObject(self.instance))
1979 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1980 list(self.instance.secondary_nodes))
1983 def CheckPrereq(self):
1984 """Check prerequisites.
1986 This checks that the instance is in the cluster.
1989 instance = self.cfg.GetInstanceInfo(
1990 self.cfg.ExpandInstanceName(self.op.instance_name))
1991 if instance is None:
1992 raise errors.OpPrereqError("Instance '%s' not known" %
1993 self.op.instance_name)
1995 # check bridges existance
1996 _CheckInstanceBridgesExist(instance)
1998 self.instance = instance
1999 self.op.instance_name = instance.name
2001 def Exec(self, feedback_fn):
2002 """Start the instance.
2005 instance = self.instance
2006 force = self.op.force
2007 extra_args = getattr(self.op, "extra_args", "")
2009 node_current = instance.primary_node
2011 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
2013 raise errors.OpExecError("Could not contact node %s for infos" %
2016 freememory = nodeinfo[node_current]['memory_free']
2017 memory = instance.memory
2018 if memory > freememory:
2019 raise errors.OpExecError("Not enough memory to start instance"
2021 " needed %s MiB, available %s MiB" %
2022 (instance.name, node_current, memory,
2025 _StartInstanceDisks(self.cfg, instance, force)
2027 if not rpc.call_instance_start(node_current, instance, extra_args):
2028 _ShutdownInstanceDisks(instance, self.cfg)
2029 raise errors.OpExecError("Could not start instance")
2031 self.cfg.MarkInstanceUp(instance.name)
2034 class LURebootInstance(LogicalUnit):
2035 """Reboot an instance.
2038 HPATH = "instance-reboot"
2039 HTYPE = constants.HTYPE_INSTANCE
2040 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2042 def BuildHooksEnv(self):
2045 This runs on master, primary and secondary nodes of the instance.
2049 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2051 env.update(_BuildInstanceHookEnvByObject(self.instance))
2052 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053 list(self.instance.secondary_nodes))
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2059 This checks that the instance is in the cluster.
2062 instance = self.cfg.GetInstanceInfo(
2063 self.cfg.ExpandInstanceName(self.op.instance_name))
2064 if instance is None:
2065 raise errors.OpPrereqError("Instance '%s' not known" %
2066 self.op.instance_name)
2068 # check bridges existance
2069 _CheckInstanceBridgesExist(instance)
2071 self.instance = instance
2072 self.op.instance_name = instance.name
2074 def Exec(self, feedback_fn):
2075 """Reboot the instance.
2078 instance = self.instance
2079 ignore_secondaries = self.op.ignore_secondaries
2080 reboot_type = self.op.reboot_type
2081 extra_args = getattr(self.op, "extra_args", "")
2083 node_current = instance.primary_node
2085 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2086 constants.INSTANCE_REBOOT_HARD,
2087 constants.INSTANCE_REBOOT_FULL]:
2088 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2089 (constants.INSTANCE_REBOOT_SOFT,
2090 constants.INSTANCE_REBOOT_HARD,
2091 constants.INSTANCE_REBOOT_FULL))
2093 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2094 constants.INSTANCE_REBOOT_HARD]:
2095 if not rpc.call_instance_reboot(node_current, instance,
2096 reboot_type, extra_args):
2097 raise errors.OpExecError("Could not reboot instance")
2099 if not rpc.call_instance_shutdown(node_current, instance):
2100 raise errors.OpExecError("could not shutdown instance for full reboot")
2101 _ShutdownInstanceDisks(instance, self.cfg)
2102 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2103 if not rpc.call_instance_start(node_current, instance, extra_args):
2104 _ShutdownInstanceDisks(instance, self.cfg)
2105 raise errors.OpExecError("Could not start instance for full reboot")
2107 self.cfg.MarkInstanceUp(instance.name)
2110 class LUShutdownInstance(LogicalUnit):
2111 """Shutdown an instance.
2114 HPATH = "instance-stop"
2115 HTYPE = constants.HTYPE_INSTANCE
2116 _OP_REQP = ["instance_name"]
2118 def BuildHooksEnv(self):
2121 This runs on master, primary and secondary nodes of the instance.
2124 env = _BuildInstanceHookEnvByObject(self.instance)
2125 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126 list(self.instance.secondary_nodes))
2129 def CheckPrereq(self):
2130 """Check prerequisites.
2132 This checks that the instance is in the cluster.
2135 instance = self.cfg.GetInstanceInfo(
2136 self.cfg.ExpandInstanceName(self.op.instance_name))
2137 if instance is None:
2138 raise errors.OpPrereqError("Instance '%s' not known" %
2139 self.op.instance_name)
2140 self.instance = instance
2142 def Exec(self, feedback_fn):
2143 """Shutdown the instance.
2146 instance = self.instance
2147 node_current = instance.primary_node
2148 if not rpc.call_instance_shutdown(node_current, instance):
2149 logger.Error("could not shutdown instance")
2151 self.cfg.MarkInstanceDown(instance.name)
2152 _ShutdownInstanceDisks(instance, self.cfg)
2155 class LUReinstallInstance(LogicalUnit):
2156 """Reinstall an instance.
2159 HPATH = "instance-reinstall"
2160 HTYPE = constants.HTYPE_INSTANCE
2161 _OP_REQP = ["instance_name"]
2163 def BuildHooksEnv(self):
2166 This runs on master, primary and secondary nodes of the instance.
2169 env = _BuildInstanceHookEnvByObject(self.instance)
2170 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2171 list(self.instance.secondary_nodes))
2174 def CheckPrereq(self):
2175 """Check prerequisites.
2177 This checks that the instance is in the cluster and is not running.
2180 instance = self.cfg.GetInstanceInfo(
2181 self.cfg.ExpandInstanceName(self.op.instance_name))
2182 if instance is None:
2183 raise errors.OpPrereqError("Instance '%s' not known" %
2184 self.op.instance_name)
2185 if instance.disk_template == constants.DT_DISKLESS:
2186 raise errors.OpPrereqError("Instance '%s' has no disks" %
2187 self.op.instance_name)
2188 if instance.status != "down":
2189 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2190 self.op.instance_name)
2191 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2193 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2194 (self.op.instance_name,
2195 instance.primary_node))
2197 self.op.os_type = getattr(self.op, "os_type", None)
2198 if self.op.os_type is not None:
2200 pnode = self.cfg.GetNodeInfo(
2201 self.cfg.ExpandNodeName(instance.primary_node))
2203 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2205 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2207 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2208 " primary node" % self.op.os_type)
2210 self.instance = instance
2212 def Exec(self, feedback_fn):
2213 """Reinstall the instance.
2216 inst = self.instance
2218 if self.op.os_type is not None:
2219 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2220 inst.os = self.op.os_type
2221 self.cfg.AddInstance(inst)
2223 _StartInstanceDisks(self.cfg, inst, None)
2225 feedback_fn("Running the instance OS create scripts...")
2226 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2227 raise errors.OpExecError("Could not install OS for instance %s"
2229 (inst.name, inst.primary_node))
2231 _ShutdownInstanceDisks(inst, self.cfg)
2234 class LURenameInstance(LogicalUnit):
2235 """Rename an instance.
2238 HPATH = "instance-rename"
2239 HTYPE = constants.HTYPE_INSTANCE
2240 _OP_REQP = ["instance_name", "new_name"]
2242 def BuildHooksEnv(self):
2245 This runs on master, primary and secondary nodes of the instance.
2248 env = _BuildInstanceHookEnvByObject(self.instance)
2249 env["INSTANCE_NEW_NAME"] = self.op.new_name
2250 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2251 list(self.instance.secondary_nodes))
2254 def CheckPrereq(self):
2255 """Check prerequisites.
2257 This checks that the instance is in the cluster and is not running.
2260 instance = self.cfg.GetInstanceInfo(
2261 self.cfg.ExpandInstanceName(self.op.instance_name))
2262 if instance is None:
2263 raise errors.OpPrereqError("Instance '%s' not known" %
2264 self.op.instance_name)
2265 if instance.status != "down":
2266 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2267 self.op.instance_name)
2268 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2270 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2271 (self.op.instance_name,
2272 instance.primary_node))
2273 self.instance = instance
2275 # new name verification
2276 name_info = utils.HostInfo(self.op.new_name)
2278 self.op.new_name = new_name = name_info.name
2279 if not getattr(self.op, "ignore_ip", False):
2280 command = ["fping", "-q", name_info.ip]
2281 result = utils.RunCmd(command)
2282 if not result.failed:
2283 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2284 (name_info.ip, new_name))
2287 def Exec(self, feedback_fn):
2288 """Reinstall the instance.
2291 inst = self.instance
2292 old_name = inst.name
2294 self.cfg.RenameInstance(inst.name, self.op.new_name)
2296 # re-read the instance from the configuration after rename
2297 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2299 _StartInstanceDisks(self.cfg, inst, None)
2301 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2303 msg = ("Could run OS rename script for instance %s on node %s (but the"
2304 " instance has been renamed in Ganeti)" %
2305 (inst.name, inst.primary_node))
2308 _ShutdownInstanceDisks(inst, self.cfg)
2311 class LURemoveInstance(LogicalUnit):
2312 """Remove an instance.
2315 HPATH = "instance-remove"
2316 HTYPE = constants.HTYPE_INSTANCE
2317 _OP_REQP = ["instance_name"]
2319 def BuildHooksEnv(self):
2322 This runs on master, primary and secondary nodes of the instance.
2325 env = _BuildInstanceHookEnvByObject(self.instance)
2326 nl = [self.sstore.GetMasterNode()]
2329 def CheckPrereq(self):
2330 """Check prerequisites.
2332 This checks that the instance is in the cluster.
2335 instance = self.cfg.GetInstanceInfo(
2336 self.cfg.ExpandInstanceName(self.op.instance_name))
2337 if instance is None:
2338 raise errors.OpPrereqError("Instance '%s' not known" %
2339 self.op.instance_name)
2340 self.instance = instance
2342 def Exec(self, feedback_fn):
2343 """Remove the instance.
2346 instance = self.instance
2347 logger.Info("shutting down instance %s on node %s" %
2348 (instance.name, instance.primary_node))
2350 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351 if self.op.ignore_failures:
2352 feedback_fn("Warning: can't shutdown instance")
2354 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355 (instance.name, instance.primary_node))
2357 logger.Info("removing block devices for instance %s" % instance.name)
2359 if not _RemoveDisks(instance, self.cfg):
2360 if self.op.ignore_failures:
2361 feedback_fn("Warning: can't remove instance's disks")
2363 raise errors.OpExecError("Can't remove instance's disks")
2365 logger.Info("removing instance %s out of cluster config" % instance.name)
2367 self.cfg.RemoveInstance(instance.name)
2370 class LUQueryInstances(NoHooksLU):
2371 """Logical unit for querying instances.
2374 _OP_REQP = ["output_fields", "names"]
2376 def CheckPrereq(self):
2377 """Check prerequisites.
2379 This checks that the fields required are valid output fields.
2382 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2383 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384 "admin_state", "admin_ram",
2385 "disk_template", "ip", "mac", "bridge",
2386 "sda_size", "sdb_size"],
2387 dynamic=self.dynamic_fields,
2388 selected=self.op.output_fields)
2390 self.wanted = _GetWantedInstances(self, self.op.names)
2392 def Exec(self, feedback_fn):
2393 """Computes the list of nodes and their attributes.
2396 instance_names = self.wanted
2397 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2400 # begin data gathering
2402 nodes = frozenset([inst.primary_node for inst in instance_list])
2405 if self.dynamic_fields.intersection(self.op.output_fields):
2407 node_data = rpc.call_all_instances_info(nodes)
2409 result = node_data[name]
2411 live_data.update(result)
2412 elif result == False:
2413 bad_nodes.append(name)
2414 # else no instance is alive
2416 live_data = dict([(name, {}) for name in instance_names])
2418 # end data gathering
2421 for instance in instance_list:
2423 for field in self.op.output_fields:
2428 elif field == "pnode":
2429 val = instance.primary_node
2430 elif field == "snodes":
2431 val = list(instance.secondary_nodes)
2432 elif field == "admin_state":
2433 val = (instance.status != "down")
2434 elif field == "oper_state":
2435 if instance.primary_node in bad_nodes:
2438 val = bool(live_data.get(instance.name))
2439 elif field == "admin_ram":
2440 val = instance.memory
2441 elif field == "oper_ram":
2442 if instance.primary_node in bad_nodes:
2444 elif instance.name in live_data:
2445 val = live_data[instance.name].get("memory", "?")
2448 elif field == "disk_template":
2449 val = instance.disk_template
2451 val = instance.nics[0].ip
2452 elif field == "bridge":
2453 val = instance.nics[0].bridge
2454 elif field == "mac":
2455 val = instance.nics[0].mac
2456 elif field == "sda_size" or field == "sdb_size":
2457 disk = instance.FindDisk(field[:3])
2463 raise errors.ParameterError(field)
2470 class LUFailoverInstance(LogicalUnit):
2471 """Failover an instance.
2474 HPATH = "instance-failover"
2475 HTYPE = constants.HTYPE_INSTANCE
2476 _OP_REQP = ["instance_name", "ignore_consistency"]
2478 def BuildHooksEnv(self):
2481 This runs on master, primary and secondary nodes of the instance.
2485 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2487 env.update(_BuildInstanceHookEnvByObject(self.instance))
2488 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2491 def CheckPrereq(self):
2492 """Check prerequisites.
2494 This checks that the instance is in the cluster.
2497 instance = self.cfg.GetInstanceInfo(
2498 self.cfg.ExpandInstanceName(self.op.instance_name))
2499 if instance is None:
2500 raise errors.OpPrereqError("Instance '%s' not known" %
2501 self.op.instance_name)
2503 if instance.disk_template not in constants.DTS_NET_MIRROR:
2504 raise errors.OpPrereqError("Instance's disk layout is not"
2505 " network mirrored, cannot failover.")
2507 secondary_nodes = instance.secondary_nodes
2508 if not secondary_nodes:
2509 raise errors.ProgrammerError("no secondary node but using "
2510 "DT_REMOTE_RAID1 template")
2512 # check memory requirements on the secondary node
2513 target_node = secondary_nodes[0]
2514 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2515 info = nodeinfo.get(target_node, None)
2517 raise errors.OpPrereqError("Cannot get current information"
2518 " from node '%s'" % nodeinfo)
2519 if instance.memory > info['memory_free']:
2520 raise errors.OpPrereqError("Not enough memory on target node %s."
2521 " %d MB available, %d MB required" %
2522 (target_node, info['memory_free'],
2525 # check bridge existance
2526 brlist = [nic.bridge for nic in instance.nics]
2527 if not rpc.call_bridges_exist(target_node, brlist):
2528 raise errors.OpPrereqError("One or more target bridges %s does not"
2529 " exist on destination node '%s'" %
2530 (brlist, target_node))
2532 self.instance = instance
2534 def Exec(self, feedback_fn):
2535 """Failover an instance.
2537 The failover is done by shutting it down on its present node and
2538 starting it on the secondary.
2541 instance = self.instance
2543 source_node = instance.primary_node
2544 target_node = instance.secondary_nodes[0]
2546 feedback_fn("* checking disk consistency between source and target")
2547 for dev in instance.disks:
2548 # for remote_raid1, these are md over drbd
2549 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2550 if not self.op.ignore_consistency:
2551 raise errors.OpExecError("Disk %s is degraded on target node,"
2552 " aborting failover." % dev.iv_name)
2554 feedback_fn("* checking target node resource availability")
2555 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2558 raise errors.OpExecError("Could not contact target node %s." %
2561 free_memory = int(nodeinfo[target_node]['memory_free'])
2562 memory = instance.memory
2563 if memory > free_memory:
2564 raise errors.OpExecError("Not enough memory to create instance %s on"
2565 " node %s. needed %s MiB, available %s MiB" %
2566 (instance.name, target_node, memory,
2569 feedback_fn("* shutting down instance on source node")
2570 logger.Info("Shutting down instance %s on node %s" %
2571 (instance.name, source_node))
2573 if not rpc.call_instance_shutdown(source_node, instance):
2574 if self.op.ignore_consistency:
2575 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576 " anyway. Please make sure node %s is down" %
2577 (instance.name, source_node, source_node))
2579 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580 (instance.name, source_node))
2582 feedback_fn("* deactivating the instance's disks on source node")
2583 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584 raise errors.OpExecError("Can't shut down the instance's disks.")
2586 instance.primary_node = target_node
2587 # distribute new instance config to the other nodes
2588 self.cfg.AddInstance(instance)
2590 feedback_fn("* activating the instance's disks on target node")
2591 logger.Info("Starting instance %s on node %s" %
2592 (instance.name, target_node))
2594 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595 ignore_secondaries=True)
2597 _ShutdownInstanceDisks(instance, self.cfg)
2598 raise errors.OpExecError("Can't activate the instance's disks")
2600 feedback_fn("* starting the instance on the target node")
2601 if not rpc.call_instance_start(target_node, instance, None):
2602 _ShutdownInstanceDisks(instance, self.cfg)
2603 raise errors.OpExecError("Could not start instance %s on node %s." %
2604 (instance.name, target_node))
2607 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608 """Create a tree of block devices on the primary node.
2610 This always creates all devices.
2614 for child in device.children:
2615 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2618 cfg.SetDiskID(device, node)
2619 new_id = rpc.call_blockdev_create(node, device, device.size,
2620 instance.name, True, info)
2623 if device.physical_id is None:
2624 device.physical_id = new_id
2628 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629 """Create a tree of block devices on a secondary node.
2631 If this device type has to be created on secondaries, create it and
2634 If not, just recurse to children keeping the same 'force' value.
2637 if device.CreateOnSecondary():
2640 for child in device.children:
2641 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642 child, force, info):
2647 cfg.SetDiskID(device, node)
2648 new_id = rpc.call_blockdev_create(node, device, device.size,
2649 instance.name, False, info)
2652 if device.physical_id is None:
2653 device.physical_id = new_id
2657 def _GenerateUniqueNames(cfg, exts):
2658 """Generate a suitable LV name.
2660 This will generate a logical volume name for the given instance.
2665 new_id = cfg.GenerateUniqueID()
2666 results.append("%s%s" % (new_id, val))
2670 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671 """Generate a drbd device complete with its children.
2674 port = cfg.AllocatePort()
2675 vgname = cfg.GetVGName()
2676 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677 logical_id=(vgname, names[0]))
2678 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679 logical_id=(vgname, names[1]))
2680 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681 logical_id = (primary, secondary, port),
2682 children = [dev_data, dev_meta])
2686 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687 """Generate a drbd8 device complete with its children.
2690 port = cfg.AllocatePort()
2691 vgname = cfg.GetVGName()
2692 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693 logical_id=(vgname, names[0]))
2694 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695 logical_id=(vgname, names[1]))
2696 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697 logical_id = (primary, secondary, port),
2698 children = [dev_data, dev_meta],
2702 def _GenerateDiskTemplate(cfg, template_name,
2703 instance_name, primary_node,
2704 secondary_nodes, disk_sz, swap_sz):
2705 """Generate the entire disk layout for a given template type.
2708 #TODO: compute space requirements
2710 vgname = cfg.GetVGName()
2711 if template_name == "diskless":
2713 elif template_name == "plain":
2714 if len(secondary_nodes) != 0:
2715 raise errors.ProgrammerError("Wrong template configuration")
2717 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719 logical_id=(vgname, names[0]),
2721 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722 logical_id=(vgname, names[1]),
2724 disks = [sda_dev, sdb_dev]
2725 elif template_name == "local_raid1":
2726 if len(secondary_nodes) != 0:
2727 raise errors.ProgrammerError("Wrong template configuration")
2730 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731 ".sdb_m1", ".sdb_m2"])
2732 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733 logical_id=(vgname, names[0]))
2734 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735 logical_id=(vgname, names[1]))
2736 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2738 children = [sda_dev_m1, sda_dev_m2])
2739 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740 logical_id=(vgname, names[2]))
2741 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742 logical_id=(vgname, names[3]))
2743 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2745 children = [sdb_dev_m1, sdb_dev_m2])
2746 disks = [md_sda_dev, md_sdb_dev]
2747 elif template_name == constants.DT_REMOTE_RAID1:
2748 if len(secondary_nodes) != 1:
2749 raise errors.ProgrammerError("Wrong template configuration")
2750 remote_node = secondary_nodes[0]
2751 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752 ".sdb_data", ".sdb_meta"])
2753 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754 disk_sz, names[0:2])
2755 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756 children = [drbd_sda_dev], size=disk_sz)
2757 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758 swap_sz, names[2:4])
2759 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760 children = [drbd_sdb_dev], size=swap_sz)
2761 disks = [md_sda_dev, md_sdb_dev]
2762 elif template_name == constants.DT_DRBD8:
2763 if len(secondary_nodes) != 1:
2764 raise errors.ProgrammerError("Wrong template configuration")
2765 remote_node = secondary_nodes[0]
2766 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767 ".sdb_data", ".sdb_meta"])
2768 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769 disk_sz, names[0:2], "sda")
2770 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771 swap_sz, names[2:4], "sdb")
2772 disks = [drbd_sda_dev, drbd_sdb_dev]
2774 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2778 def _GetInstanceInfoText(instance):
2779 """Compute that text that should be added to the disk's metadata.
2782 return "originstname+%s" % instance.name
2785 def _CreateDisks(cfg, instance):
2786 """Create all disks for an instance.
2788 This abstracts away some work from AddInstance.
2791 instance: the instance object
2794 True or False showing the success of the creation process
2797 info = _GetInstanceInfoText(instance)
2799 for device in instance.disks:
2800 logger.Info("creating volume %s for instance %s" %
2801 (device.iv_name, instance.name))
2803 for secondary_node in instance.secondary_nodes:
2804 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805 device, False, info):
2806 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807 (device.iv_name, device, secondary_node))
2810 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811 instance, device, info):
2812 logger.Error("failed to create volume %s on primary!" %
2818 def _RemoveDisks(instance, cfg):
2819 """Remove all disks for an instance.
2821 This abstracts away some work from `AddInstance()` and
2822 `RemoveInstance()`. Note that in case some of the devices couldn't
2823 be removed, the removal will continue with the other ones (compare
2824 with `_CreateDisks()`).
2827 instance: the instance object
2830 True or False showing the success of the removal proces
2833 logger.Info("removing block devices for instance %s" % instance.name)
2836 for device in instance.disks:
2837 for node, disk in device.ComputeNodeTree(instance.primary_node):
2838 cfg.SetDiskID(disk, node)
2839 if not rpc.call_blockdev_remove(node, disk):
2840 logger.Error("could not remove block device %s on node %s,"
2841 " continuing anyway" %
2842 (device.iv_name, node))
2847 class LUCreateInstance(LogicalUnit):
2848 """Create an instance.
2851 HPATH = "instance-add"
2852 HTYPE = constants.HTYPE_INSTANCE
2853 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854 "disk_template", "swap_size", "mode", "start", "vcpus",
2855 "wait_for_sync", "ip_check", "mac"]
2857 def BuildHooksEnv(self):
2860 This runs on master, primary and secondary nodes of the instance.
2864 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865 "INSTANCE_DISK_SIZE": self.op.disk_size,
2866 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867 "INSTANCE_ADD_MODE": self.op.mode,
2869 if self.op.mode == constants.INSTANCE_IMPORT:
2870 env["INSTANCE_SRC_NODE"] = self.op.src_node
2871 env["INSTANCE_SRC_PATH"] = self.op.src_path
2872 env["INSTANCE_SRC_IMAGE"] = self.src_image
2874 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875 primary_node=self.op.pnode,
2876 secondary_nodes=self.secondaries,
2877 status=self.instance_status,
2878 os_type=self.op.os_type,
2879 memory=self.op.mem_size,
2880 vcpus=self.op.vcpus,
2881 nics=[(self.inst_ip, self.op.bridge)],
2884 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2889 def CheckPrereq(self):
2890 """Check prerequisites.
2893 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2894 if not hasattr(self.op, attr):
2895 setattr(self.op, attr, None)
2897 if self.op.mode not in (constants.INSTANCE_CREATE,
2898 constants.INSTANCE_IMPORT):
2899 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2902 if self.op.mode == constants.INSTANCE_IMPORT:
2903 src_node = getattr(self.op, "src_node", None)
2904 src_path = getattr(self.op, "src_path", None)
2905 if src_node is None or src_path is None:
2906 raise errors.OpPrereqError("Importing an instance requires source"
2907 " node and path options")
2908 src_node_full = self.cfg.ExpandNodeName(src_node)
2909 if src_node_full is None:
2910 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2911 self.op.src_node = src_node = src_node_full
2913 if not os.path.isabs(src_path):
2914 raise errors.OpPrereqError("The source path must be absolute")
2916 export_info = rpc.call_export_info(src_node, src_path)
2919 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2921 if not export_info.has_section(constants.INISECT_EXP):
2922 raise errors.ProgrammerError("Corrupted export config")
2924 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2925 if (int(ei_version) != constants.EXPORT_VERSION):
2926 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2927 (ei_version, constants.EXPORT_VERSION))
2929 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2930 raise errors.OpPrereqError("Can't import instance with more than"
2933 # FIXME: are the old os-es, disk sizes, etc. useful?
2934 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2935 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2937 self.src_image = diskimage
2938 else: # INSTANCE_CREATE
2939 if getattr(self.op, "os_type", None) is None:
2940 raise errors.OpPrereqError("No guest OS specified")
2942 # check primary node
2943 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2945 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2947 self.op.pnode = pnode.name
2949 self.secondaries = []
2950 # disk template and mirror node verification
2951 if self.op.disk_template not in constants.DISK_TEMPLATES:
2952 raise errors.OpPrereqError("Invalid disk template name")
2954 if self.op.disk_template in constants.DTS_NET_MIRROR:
2955 if getattr(self.op, "snode", None) is None:
2956 raise errors.OpPrereqError("The networked disk templates need"
2959 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2960 if snode_name is None:
2961 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2963 elif snode_name == pnode.name:
2964 raise errors.OpPrereqError("The secondary node cannot be"
2965 " the primary node.")
2966 self.secondaries.append(snode_name)
2968 # Check lv size requirements
2969 nodenames = [pnode.name] + self.secondaries
2970 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2972 # Required free disk space as a function of disk and swap space
2974 constants.DT_DISKLESS: 0,
2975 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2976 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2977 # 256 MB are added for drbd metadata, 128MB for each drbd device
2978 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2979 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2982 if self.op.disk_template not in req_size_dict:
2983 raise errors.ProgrammerError("Disk template '%s' size requirement"
2984 " is unknown" % self.op.disk_template)
2986 req_size = req_size_dict[self.op.disk_template]
2988 for node in nodenames:
2989 info = nodeinfo.get(node, None)
2991 raise errors.OpPrereqError("Cannot get current information"
2992 " from node '%s'" % nodeinfo)
2993 if req_size > info['vg_free']:
2994 raise errors.OpPrereqError("Not enough disk space on target node %s."
2995 " %d MB available, %d MB required" %
2996 (node, info['vg_free'], req_size))
2999 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3001 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3002 " primary node" % self.op.os_type)
3004 if self.op.kernel_path == constants.VALUE_NONE:
3005 raise errors.OpPrereqError("Can't set instance kernel to none")
3007 # instance verification
3008 hostname1 = utils.HostInfo(self.op.instance_name)
3010 self.op.instance_name = instance_name = hostname1.name
3011 instance_list = self.cfg.GetInstanceList()
3012 if instance_name in instance_list:
3013 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3016 ip = getattr(self.op, "ip", None)
3017 if ip is None or ip.lower() == "none":
3019 elif ip.lower() == "auto":
3020 inst_ip = hostname1.ip
3022 if not utils.IsValidIP(ip):
3023 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3024 " like a valid IP" % ip)
3026 self.inst_ip = inst_ip
3028 if self.op.start and not self.op.ip_check:
3029 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3030 " adding an instance in start mode")
3032 if self.op.ip_check:
3033 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3034 constants.DEFAULT_NODED_PORT):
3035 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3036 (hostname1.ip, instance_name))
3038 # MAC address verification
3039 if self.op.mac != "auto":
3040 if not utils.IsValidMac(self.op.mac.lower()):
3041 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3044 # bridge verification
3045 bridge = getattr(self.op, "bridge", None)
3047 self.op.bridge = self.cfg.GetDefBridge()
3049 self.op.bridge = bridge
3051 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3052 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3053 " destination node '%s'" %
3054 (self.op.bridge, pnode.name))
3056 # boot order verification
3057 if self.op.hvm_boot_order is not None:
3058 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3059 raise errors.OpPrereqError("invalid boot order specified,"
3060 " must be one or more of [acdn]")
3063 self.instance_status = 'up'
3065 self.instance_status = 'down'
3067 def Exec(self, feedback_fn):
3068 """Create and add the instance to the cluster.
3071 instance = self.op.instance_name
3072 pnode_name = self.pnode.name
3074 if self.op.mac == "auto":
3075 mac_address = self.cfg.GenerateMAC()
3077 mac_address = self.op.mac
3079 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3080 if self.inst_ip is not None:
3081 nic.ip = self.inst_ip
3083 ht_kind = self.sstore.GetHypervisorType()
3084 if ht_kind in constants.HTS_REQ_PORT:
3085 network_port = self.cfg.AllocatePort()
3089 disks = _GenerateDiskTemplate(self.cfg,
3090 self.op.disk_template,
3091 instance, pnode_name,
3092 self.secondaries, self.op.disk_size,
3095 iobj = objects.Instance(name=instance, os=self.op.os_type,
3096 primary_node=pnode_name,
3097 memory=self.op.mem_size,
3098 vcpus=self.op.vcpus,
3099 nics=[nic], disks=disks,
3100 disk_template=self.op.disk_template,
3101 status=self.instance_status,
3102 network_port=network_port,
3103 kernel_path=self.op.kernel_path,
3104 initrd_path=self.op.initrd_path,
3105 hvm_boot_order=self.op.hvm_boot_order,
3108 feedback_fn("* creating instance disks...")
3109 if not _CreateDisks(self.cfg, iobj):
3110 _RemoveDisks(iobj, self.cfg)
3111 raise errors.OpExecError("Device creation failed, reverting...")
3113 feedback_fn("adding instance %s to cluster config" % instance)
3115 self.cfg.AddInstance(iobj)
3117 if self.op.wait_for_sync:
3118 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3119 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3120 # make sure the disks are not degraded (still sync-ing is ok)
3122 feedback_fn("* checking mirrors status")
3123 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3128 _RemoveDisks(iobj, self.cfg)
3129 self.cfg.RemoveInstance(iobj.name)
3130 raise errors.OpExecError("There are some degraded disks for"
3133 feedback_fn("creating os for instance %s on node %s" %
3134 (instance, pnode_name))
3136 if iobj.disk_template != constants.DT_DISKLESS:
3137 if self.op.mode == constants.INSTANCE_CREATE:
3138 feedback_fn("* running the instance OS create scripts...")
3139 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3140 raise errors.OpExecError("could not add os for instance %s"
3142 (instance, pnode_name))
3144 elif self.op.mode == constants.INSTANCE_IMPORT:
3145 feedback_fn("* running the instance OS import scripts...")
3146 src_node = self.op.src_node
3147 src_image = self.src_image
3148 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3149 src_node, src_image):
3150 raise errors.OpExecError("Could not import os for instance"
3152 (instance, pnode_name))
3154 # also checked in the prereq part
3155 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3159 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3160 feedback_fn("* starting instance...")
3161 if not rpc.call_instance_start(pnode_name, iobj, None):
3162 raise errors.OpExecError("Could not start instance")
3165 class LUConnectConsole(NoHooksLU):
3166 """Connect to an instance's console.
3168 This is somewhat special in that it returns the command line that
3169 you need to run on the master node in order to connect to the
3173 _OP_REQP = ["instance_name"]
3175 def CheckPrereq(self):
3176 """Check prerequisites.
3178 This checks that the instance is in the cluster.
3181 instance = self.cfg.GetInstanceInfo(
3182 self.cfg.ExpandInstanceName(self.op.instance_name))
3183 if instance is None:
3184 raise errors.OpPrereqError("Instance '%s' not known" %
3185 self.op.instance_name)
3186 self.instance = instance
3188 def Exec(self, feedback_fn):
3189 """Connect to the console of an instance
3192 instance = self.instance
3193 node = instance.primary_node
3195 node_insts = rpc.call_instance_list([node])[node]
3196 if node_insts is False:
3197 raise errors.OpExecError("Can't connect to node %s." % node)
3199 if instance.name not in node_insts:
3200 raise errors.OpExecError("Instance %s is not running." % instance.name)
3202 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3204 hyper = hypervisor.GetHypervisor()
3205 console_cmd = hyper.GetShellCommandForConsole(instance)
3207 argv = ["ssh", "-q", "-t"]
3208 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3209 argv.extend(ssh.BATCH_MODE_OPTS)
3211 argv.append(console_cmd)
3215 class LUAddMDDRBDComponent(LogicalUnit):
3216 """Adda new mirror member to an instance's disk.
3219 HPATH = "mirror-add"
3220 HTYPE = constants.HTYPE_INSTANCE
3221 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3223 def BuildHooksEnv(self):
3226 This runs on the master, the primary and all the secondaries.
3230 "NEW_SECONDARY": self.op.remote_node,
3231 "DISK_NAME": self.op.disk_name,
3233 env.update(_BuildInstanceHookEnvByObject(self.instance))
3234 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3235 self.op.remote_node,] + list(self.instance.secondary_nodes)
3238 def CheckPrereq(self):
3239 """Check prerequisites.
3241 This checks that the instance is in the cluster.
3244 instance = self.cfg.GetInstanceInfo(
3245 self.cfg.ExpandInstanceName(self.op.instance_name))
3246 if instance is None:
3247 raise errors.OpPrereqError("Instance '%s' not known" %
3248 self.op.instance_name)
3249 self.instance = instance
3251 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3252 if remote_node is None:
3253 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3254 self.remote_node = remote_node
3256 if remote_node == instance.primary_node:
3257 raise errors.OpPrereqError("The specified node is the primary node of"
3260 if instance.disk_template != constants.DT_REMOTE_RAID1:
3261 raise errors.OpPrereqError("Instance's disk layout is not"
3263 for disk in instance.disks:
3264 if disk.iv_name == self.op.disk_name:
3267 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3268 " instance." % self.op.disk_name)
3269 if len(disk.children) > 1:
3270 raise errors.OpPrereqError("The device already has two slave devices."
3271 " This would create a 3-disk raid1 which we"
3275 def Exec(self, feedback_fn):
3276 """Add the mirror component
3280 instance = self.instance
3282 remote_node = self.remote_node
3283 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3284 names = _GenerateUniqueNames(self.cfg, lv_names)
3285 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3286 remote_node, disk.size, names)
3288 logger.Info("adding new mirror component on secondary")
3290 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3292 _GetInstanceInfoText(instance)):
3293 raise errors.OpExecError("Failed to create new component on secondary"
3294 " node %s" % remote_node)
3296 logger.Info("adding new mirror component on primary")
3298 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3300 _GetInstanceInfoText(instance)):
3301 # remove secondary dev
3302 self.cfg.SetDiskID(new_drbd, remote_node)
3303 rpc.call_blockdev_remove(remote_node, new_drbd)
3304 raise errors.OpExecError("Failed to create volume on primary")
3306 # the device exists now
3307 # call the primary node to add the mirror to md
3308 logger.Info("adding new mirror component to md")
3309 if not rpc.call_blockdev_addchildren(instance.primary_node,
3311 logger.Error("Can't add mirror compoment to md!")
3312 self.cfg.SetDiskID(new_drbd, remote_node)
3313 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3314 logger.Error("Can't rollback on secondary")
3315 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3316 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3317 logger.Error("Can't rollback on primary")
3318 raise errors.OpExecError("Can't add mirror component to md array")
3320 disk.children.append(new_drbd)
3322 self.cfg.AddInstance(instance)
3324 _WaitForSync(self.cfg, instance, self.proc)
3329 class LURemoveMDDRBDComponent(LogicalUnit):
3330 """Remove a component from a remote_raid1 disk.
3333 HPATH = "mirror-remove"
3334 HTYPE = constants.HTYPE_INSTANCE
3335 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3337 def BuildHooksEnv(self):
3340 This runs on the master, the primary and all the secondaries.
3344 "DISK_NAME": self.op.disk_name,
3345 "DISK_ID": self.op.disk_id,
3346 "OLD_SECONDARY": self.old_secondary,
3348 env.update(_BuildInstanceHookEnvByObject(self.instance))
3349 nl = [self.sstore.GetMasterNode(),
3350 self.instance.primary_node] + list(self.instance.secondary_nodes)
3353 def CheckPrereq(self):
3354 """Check prerequisites.
3356 This checks that the instance is in the cluster.
3359 instance = self.cfg.GetInstanceInfo(
3360 self.cfg.ExpandInstanceName(self.op.instance_name))
3361 if instance is None:
3362 raise errors.OpPrereqError("Instance '%s' not known" %
3363 self.op.instance_name)
3364 self.instance = instance
3366 if instance.disk_template != constants.DT_REMOTE_RAID1:
3367 raise errors.OpPrereqError("Instance's disk layout is not"
3369 for disk in instance.disks:
3370 if disk.iv_name == self.op.disk_name:
3373 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3374 " instance." % self.op.disk_name)
3375 for child in disk.children:
3376 if (child.dev_type == constants.LD_DRBD7 and
3377 child.logical_id[2] == self.op.disk_id):
3380 raise errors.OpPrereqError("Can't find the device with this port.")
3382 if len(disk.children) < 2:
3383 raise errors.OpPrereqError("Cannot remove the last component from"
3387 if self.child.logical_id[0] == instance.primary_node:
3391 self.old_secondary = self.child.logical_id[oid]
3393 def Exec(self, feedback_fn):
3394 """Remove the mirror component
3397 instance = self.instance
3400 logger.Info("remove mirror component")
3401 self.cfg.SetDiskID(disk, instance.primary_node)
3402 if not rpc.call_blockdev_removechildren(instance.primary_node,
3404 raise errors.OpExecError("Can't remove child from mirror.")
3406 for node in child.logical_id[:2]:
3407 self.cfg.SetDiskID(child, node)
3408 if not rpc.call_blockdev_remove(node, child):
3409 logger.Error("Warning: failed to remove device from node %s,"
3410 " continuing operation." % node)
3412 disk.children.remove(child)
3413 self.cfg.AddInstance(instance)
3416 class LUReplaceDisks(LogicalUnit):
3417 """Replace the disks of an instance.
3420 HPATH = "mirrors-replace"
3421 HTYPE = constants.HTYPE_INSTANCE
3422 _OP_REQP = ["instance_name", "mode", "disks"]
3424 def BuildHooksEnv(self):
3427 This runs on the master, the primary and all the secondaries.
3431 "MODE": self.op.mode,
3432 "NEW_SECONDARY": self.op.remote_node,
3433 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3435 env.update(_BuildInstanceHookEnvByObject(self.instance))
3437 self.sstore.GetMasterNode(),
3438 self.instance.primary_node,
3440 if self.op.remote_node is not None:
3441 nl.append(self.op.remote_node)
3444 def CheckPrereq(self):
3445 """Check prerequisites.
3447 This checks that the instance is in the cluster.
3450 instance = self.cfg.GetInstanceInfo(
3451 self.cfg.ExpandInstanceName(self.op.instance_name))
3452 if instance is None:
3453 raise errors.OpPrereqError("Instance '%s' not known" %
3454 self.op.instance_name)
3455 self.instance = instance
3456 self.op.instance_name = instance.name
3458 if instance.disk_template not in constants.DTS_NET_MIRROR:
3459 raise errors.OpPrereqError("Instance's disk layout is not"
3460 " network mirrored.")
3462 if len(instance.secondary_nodes) != 1:
3463 raise errors.OpPrereqError("The instance has a strange layout,"
3464 " expected one secondary but found %d" %
3465 len(instance.secondary_nodes))
3467 self.sec_node = instance.secondary_nodes[0]
3469 remote_node = getattr(self.op, "remote_node", None)
3470 if remote_node is not None:
3471 remote_node = self.cfg.ExpandNodeName(remote_node)
3472 if remote_node is None:
3473 raise errors.OpPrereqError("Node '%s' not known" %
3474 self.op.remote_node)
3475 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3477 self.remote_node_info = None
3478 if remote_node == instance.primary_node:
3479 raise errors.OpPrereqError("The specified node is the primary node of"
3481 elif remote_node == self.sec_node:
3482 if self.op.mode == constants.REPLACE_DISK_SEC:
3483 # this is for DRBD8, where we can't execute the same mode of
3484 # replacement as for drbd7 (no different port allocated)
3485 raise errors.OpPrereqError("Same secondary given, cannot execute"
3487 # the user gave the current secondary, switch to
3488 # 'no-replace-secondary' mode for drbd7
3490 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3491 self.op.mode != constants.REPLACE_DISK_ALL):
3492 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3493 " disks replacement, not individual ones")
3494 if instance.disk_template == constants.DT_DRBD8:
3495 if (self.op.mode == constants.REPLACE_DISK_ALL and
3496 remote_node is not None):
3497 # switch to replace secondary mode
3498 self.op.mode = constants.REPLACE_DISK_SEC
3500 if self.op.mode == constants.REPLACE_DISK_ALL:
3501 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3502 " secondary disk replacement, not"
3504 elif self.op.mode == constants.REPLACE_DISK_PRI:
3505 if remote_node is not None:
3506 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3507 " the secondary while doing a primary"
3508 " node disk replacement")
3509 self.tgt_node = instance.primary_node
3510 self.oth_node = instance.secondary_nodes[0]
3511 elif self.op.mode == constants.REPLACE_DISK_SEC:
3512 self.new_node = remote_node # this can be None, in which case
3513 # we don't change the secondary
3514 self.tgt_node = instance.secondary_nodes[0]
3515 self.oth_node = instance.primary_node
3517 raise errors.ProgrammerError("Unhandled disk replace mode")
3519 for name in self.op.disks:
3520 if instance.FindDisk(name) is None:
3521 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3522 (name, instance.name))
3523 self.op.remote_node = remote_node
3525 def _ExecRR1(self, feedback_fn):
3526 """Replace the disks of an instance.
3529 instance = self.instance
3532 if self.op.remote_node is None:
3533 remote_node = self.sec_node
3535 remote_node = self.op.remote_node
3537 for dev in instance.disks:
3539 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3540 names = _GenerateUniqueNames(cfg, lv_names)
3541 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3542 remote_node, size, names)
3543 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3544 logger.Info("adding new mirror component on secondary for %s" %
3547 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3549 _GetInstanceInfoText(instance)):
3550 raise errors.OpExecError("Failed to create new component on secondary"
3551 " node %s. Full abort, cleanup manually!" %
3554 logger.Info("adding new mirror component on primary")
3556 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3558 _GetInstanceInfoText(instance)):
3559 # remove secondary dev
3560 cfg.SetDiskID(new_drbd, remote_node)
3561 rpc.call_blockdev_remove(remote_node, new_drbd)
3562 raise errors.OpExecError("Failed to create volume on primary!"
3563 " Full abort, cleanup manually!!")
3565 # the device exists now
3566 # call the primary node to add the mirror to md
3567 logger.Info("adding new mirror component to md")
3568 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3570 logger.Error("Can't add mirror compoment to md!")
3571 cfg.SetDiskID(new_drbd, remote_node)
3572 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3573 logger.Error("Can't rollback on secondary")
3574 cfg.SetDiskID(new_drbd, instance.primary_node)
3575 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3576 logger.Error("Can't rollback on primary")
3577 raise errors.OpExecError("Full abort, cleanup manually!!")
3579 dev.children.append(new_drbd)
3580 cfg.AddInstance(instance)
3582 # this can fail as the old devices are degraded and _WaitForSync
3583 # does a combined result over all disks, so we don't check its
3585 _WaitForSync(cfg, instance, self.proc, unlock=True)
3587 # so check manually all the devices
3588 for name in iv_names:
3589 dev, child, new_drbd = iv_names[name]
3590 cfg.SetDiskID(dev, instance.primary_node)
3591 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3593 raise errors.OpExecError("MD device %s is degraded!" % name)
3594 cfg.SetDiskID(new_drbd, instance.primary_node)
3595 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3597 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3599 for name in iv_names:
3600 dev, child, new_drbd = iv_names[name]
3601 logger.Info("remove mirror %s component" % name)
3602 cfg.SetDiskID(dev, instance.primary_node)
3603 if not rpc.call_blockdev_removechildren(instance.primary_node,
3605 logger.Error("Can't remove child from mirror, aborting"
3606 " *this device cleanup*.\nYou need to cleanup manually!!")
3609 for node in child.logical_id[:2]:
3610 logger.Info("remove child device on %s" % node)
3611 cfg.SetDiskID(child, node)
3612 if not rpc.call_blockdev_remove(node, child):
3613 logger.Error("Warning: failed to remove device from node %s,"
3614 " continuing operation." % node)
3616 dev.children.remove(child)
3618 cfg.AddInstance(instance)
3620 def _ExecD8DiskOnly(self, feedback_fn):
3621 """Replace a disk on the primary or secondary for dbrd8.
3623 The algorithm for replace is quite complicated:
3624 - for each disk to be replaced:
3625 - create new LVs on the target node with unique names
3626 - detach old LVs from the drbd device
3627 - rename old LVs to name_replaced.<time_t>
3628 - rename new LVs to old LVs
3629 - attach the new LVs (with the old names now) to the drbd device
3630 - wait for sync across all devices
3631 - for each modified disk:
3632 - remove old LVs (which have the name name_replaces.<time_t>)
3634 Failures are not very well handled.
3638 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3639 instance = self.instance
3641 vgname = self.cfg.GetVGName()
3644 tgt_node = self.tgt_node
3645 oth_node = self.oth_node
3647 # Step: check device activation
3648 self.proc.LogStep(1, steps_total, "check device existence")
3649 info("checking volume groups")
3650 my_vg = cfg.GetVGName()
3651 results = rpc.call_vg_list([oth_node, tgt_node])
3653 raise errors.OpExecError("Can't list volume groups on the nodes")
3654 for node in oth_node, tgt_node:
3655 res = results.get(node, False)
3656 if not res or my_vg not in res:
3657 raise errors.OpExecError("Volume group '%s' not found on %s" %
3659 for dev in instance.disks:
3660 if not dev.iv_name in self.op.disks:
3662 for node in tgt_node, oth_node:
3663 info("checking %s on %s" % (dev.iv_name, node))
3664 cfg.SetDiskID(dev, node)
3665 if not rpc.call_blockdev_find(node, dev):
3666 raise errors.OpExecError("Can't find device %s on node %s" %
3667 (dev.iv_name, node))
3669 # Step: check other node consistency
3670 self.proc.LogStep(2, steps_total, "check peer consistency")
3671 for dev in instance.disks:
3672 if not dev.iv_name in self.op.disks:
3674 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3675 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3676 oth_node==instance.primary_node):
3677 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3678 " to replace disks on this node (%s)" %
3679 (oth_node, tgt_node))
3681 # Step: create new storage
3682 self.proc.LogStep(3, steps_total, "allocate new storage")
3683 for dev in instance.disks:
3684 if not dev.iv_name in self.op.disks:
3687 cfg.SetDiskID(dev, tgt_node)
3688 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3689 names = _GenerateUniqueNames(cfg, lv_names)
3690 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3691 logical_id=(vgname, names[0]))
3692 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3693 logical_id=(vgname, names[1]))
3694 new_lvs = [lv_data, lv_meta]
3695 old_lvs = dev.children
3696 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3697 info("creating new local storage on %s for %s" %
3698 (tgt_node, dev.iv_name))
3699 # since we *always* want to create this LV, we use the
3700 # _Create...OnPrimary (which forces the creation), even if we
3701 # are talking about the secondary node
3702 for new_lv in new_lvs:
3703 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3704 _GetInstanceInfoText(instance)):
3705 raise errors.OpExecError("Failed to create new LV named '%s' on"
3707 (new_lv.logical_id[1], tgt_node))
3709 # Step: for each lv, detach+rename*2+attach
3710 self.proc.LogStep(4, steps_total, "change drbd configuration")
3711 for dev, old_lvs, new_lvs in iv_names.itervalues():
3712 info("detaching %s drbd from local storage" % dev.iv_name)
3713 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3714 raise errors.OpExecError("Can't detach drbd from local storage on node"
3715 " %s for device %s" % (tgt_node, dev.iv_name))
3717 #cfg.Update(instance)
3719 # ok, we created the new LVs, so now we know we have the needed
3720 # storage; as such, we proceed on the target node to rename
3721 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3722 # using the assumption than logical_id == physical_id (which in
3723 # turn is the unique_id on that node)
3725 # FIXME(iustin): use a better name for the replaced LVs
3726 temp_suffix = int(time.time())
3727 ren_fn = lambda d, suff: (d.physical_id[0],
3728 d.physical_id[1] + "_replaced-%s" % suff)
3729 # build the rename list based on what LVs exist on the node
3731 for to_ren in old_lvs:
3732 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3733 if find_res is not None: # device exists
3734 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3736 info("renaming the old LVs on the target node")
3737 if not rpc.call_blockdev_rename(tgt_node, rlist):
3738 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3739 # now we rename the new LVs to the old LVs
3740 info("renaming the new LVs on the target node")
3741 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3742 if not rpc.call_blockdev_rename(tgt_node, rlist):
3743 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3745 for old, new in zip(old_lvs, new_lvs):
3746 new.logical_id = old.logical_id
3747 cfg.SetDiskID(new, tgt_node)
3749 for disk in old_lvs:
3750 disk.logical_id = ren_fn(disk, temp_suffix)
3751 cfg.SetDiskID(disk, tgt_node)
3753 # now that the new lvs have the old name, we can add them to the device
3754 info("adding new mirror component on %s" % tgt_node)
3755 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3756 for new_lv in new_lvs:
3757 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3758 warning("Can't rollback device %s", hint="manually cleanup unused"
3760 raise errors.OpExecError("Can't add local storage to drbd")
3762 dev.children = new_lvs
3763 cfg.Update(instance)
3765 # Step: wait for sync
3767 # this can fail as the old devices are degraded and _WaitForSync
3768 # does a combined result over all disks, so we don't check its
3770 self.proc.LogStep(5, steps_total, "sync devices")
3771 _WaitForSync(cfg, instance, self.proc, unlock=True)
3773 # so check manually all the devices
3774 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3775 cfg.SetDiskID(dev, instance.primary_node)
3776 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3778 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3780 # Step: remove old storage
3781 self.proc.LogStep(6, steps_total, "removing old storage")
3782 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3783 info("remove logical volumes for %s" % name)
3785 cfg.SetDiskID(lv, tgt_node)
3786 if not rpc.call_blockdev_remove(tgt_node, lv):
3787 warning("Can't remove old LV", hint="manually remove unused LVs")
3790 def _ExecD8Secondary(self, feedback_fn):
3791 """Replace the secondary node for drbd8.
3793 The algorithm for replace is quite complicated:
3794 - for all disks of the instance:
3795 - create new LVs on the new node with same names
3796 - shutdown the drbd device on the old secondary
3797 - disconnect the drbd network on the primary
3798 - create the drbd device on the new secondary
3799 - network attach the drbd on the primary, using an artifice:
3800 the drbd code for Attach() will connect to the network if it
3801 finds a device which is connected to the good local disks but
3803 - wait for sync across all devices
3804 - remove all disks from the old secondary
3806 Failures are not very well handled.
3810 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3811 instance = self.instance
3813 vgname = self.cfg.GetVGName()
3816 old_node = self.tgt_node
3817 new_node = self.new_node
3818 pri_node = instance.primary_node
3820 # Step: check device activation
3821 self.proc.LogStep(1, steps_total, "check device existence")
3822 info("checking volume groups")
3823 my_vg = cfg.GetVGName()
3824 results = rpc.call_vg_list([pri_node, new_node])
3826 raise errors.OpExecError("Can't list volume groups on the nodes")
3827 for node in pri_node, new_node:
3828 res = results.get(node, False)
3829 if not res or my_vg not in res:
3830 raise errors.OpExecError("Volume group '%s' not found on %s" %
3832 for dev in instance.disks:
3833 if not dev.iv_name in self.op.disks:
3835 info("checking %s on %s" % (dev.iv_name, pri_node))
3836 cfg.SetDiskID(dev, pri_node)
3837 if not rpc.call_blockdev_find(pri_node, dev):
3838 raise errors.OpExecError("Can't find device %s on node %s" %
3839 (dev.iv_name, pri_node))
3841 # Step: check other node consistency
3842 self.proc.LogStep(2, steps_total, "check peer consistency")
3843 for dev in instance.disks:
3844 if not dev.iv_name in self.op.disks:
3846 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3847 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3848 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3849 " unsafe to replace the secondary" %
3852 # Step: create new storage
3853 self.proc.LogStep(3, steps_total, "allocate new storage")
3854 for dev in instance.disks:
3856 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3857 # since we *always* want to create this LV, we use the
3858 # _Create...OnPrimary (which forces the creation), even if we
3859 # are talking about the secondary node
3860 for new_lv in dev.children:
3861 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3862 _GetInstanceInfoText(instance)):
3863 raise errors.OpExecError("Failed to create new LV named '%s' on"
3865 (new_lv.logical_id[1], new_node))
3867 iv_names[dev.iv_name] = (dev, dev.children)
3869 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3870 for dev in instance.disks:
3872 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3873 # create new devices on new_node
3874 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3875 logical_id=(pri_node, new_node,
3877 children=dev.children)
3878 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3880 _GetInstanceInfoText(instance)):
3881 raise errors.OpExecError("Failed to create new DRBD on"
3882 " node '%s'" % new_node)
3884 for dev in instance.disks:
3885 # we have new devices, shutdown the drbd on the old secondary
3886 info("shutting down drbd for %s on old node" % dev.iv_name)
3887 cfg.SetDiskID(dev, old_node)
3888 if not rpc.call_blockdev_shutdown(old_node, dev):
3889 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3890 hint="Please cleanup this device manually as soon as possible")
3892 info("detaching primary drbds from the network (=> standalone)")
3894 for dev in instance.disks:
3895 cfg.SetDiskID(dev, pri_node)
3896 # set the physical (unique in bdev terms) id to None, meaning
3897 # detach from network
3898 dev.physical_id = (None,) * len(dev.physical_id)
3899 # and 'find' the device, which will 'fix' it to match the
3901 if rpc.call_blockdev_find(pri_node, dev):
3904 warning("Failed to detach drbd %s from network, unusual case" %
3908 # no detaches succeeded (very unlikely)
3909 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3911 # if we managed to detach at least one, we update all the disks of
3912 # the instance to point to the new secondary
3913 info("updating instance configuration")
3914 for dev in instance.disks:
3915 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3916 cfg.SetDiskID(dev, pri_node)
3917 cfg.Update(instance)
3919 # and now perform the drbd attach
3920 info("attaching primary drbds to new secondary (standalone => connected)")
3922 for dev in instance.disks:
3923 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3924 # since the attach is smart, it's enough to 'find' the device,
3925 # it will automatically activate the network, if the physical_id
3927 cfg.SetDiskID(dev, pri_node)
3928 if not rpc.call_blockdev_find(pri_node, dev):
3929 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3930 "please do a gnt-instance info to see the status of disks")
3932 # this can fail as the old devices are degraded and _WaitForSync
3933 # does a combined result over all disks, so we don't check its
3935 self.proc.LogStep(5, steps_total, "sync devices")
3936 _WaitForSync(cfg, instance, self.proc, unlock=True)
3938 # so check manually all the devices
3939 for name, (dev, old_lvs) in iv_names.iteritems():
3940 cfg.SetDiskID(dev, pri_node)
3941 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3943 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3945 self.proc.LogStep(6, steps_total, "removing old storage")
3946 for name, (dev, old_lvs) in iv_names.iteritems():
3947 info("remove logical volumes for %s" % name)
3949 cfg.SetDiskID(lv, old_node)
3950 if not rpc.call_blockdev_remove(old_node, lv):
3951 warning("Can't remove LV on old secondary",
3952 hint="Cleanup stale volumes by hand")
3954 def Exec(self, feedback_fn):
3955 """Execute disk replacement.
3957 This dispatches the disk replacement to the appropriate handler.
3960 instance = self.instance
3961 if instance.disk_template == constants.DT_REMOTE_RAID1:
3963 elif instance.disk_template == constants.DT_DRBD8:
3964 if self.op.remote_node is None:
3965 fn = self._ExecD8DiskOnly
3967 fn = self._ExecD8Secondary
3969 raise errors.ProgrammerError("Unhandled disk replacement case")
3970 return fn(feedback_fn)
3973 class LUQueryInstanceData(NoHooksLU):
3974 """Query runtime instance data.
3977 _OP_REQP = ["instances"]
3979 def CheckPrereq(self):
3980 """Check prerequisites.
3982 This only checks the optional instance list against the existing names.
3985 if not isinstance(self.op.instances, list):
3986 raise errors.OpPrereqError("Invalid argument type 'instances'")
3987 if self.op.instances:
3988 self.wanted_instances = []
3989 names = self.op.instances
3991 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3992 if instance is None:
3993 raise errors.OpPrereqError("No such instance name '%s'" % name)
3994 self.wanted_instances.append(instance)
3996 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3997 in self.cfg.GetInstanceList()]
4001 def _ComputeDiskStatus(self, instance, snode, dev):
4002 """Compute block device status.
4005 self.cfg.SetDiskID(dev, instance.primary_node)
4006 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4007 if dev.dev_type in constants.LDS_DRBD:
4008 # we change the snode then (otherwise we use the one passed in)
4009 if dev.logical_id[0] == instance.primary_node:
4010 snode = dev.logical_id[1]
4012 snode = dev.logical_id[0]
4015 self.cfg.SetDiskID(dev, snode)
4016 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4021 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4022 for child in dev.children]
4027 "iv_name": dev.iv_name,
4028 "dev_type": dev.dev_type,
4029 "logical_id": dev.logical_id,
4030 "physical_id": dev.physical_id,
4031 "pstatus": dev_pstatus,
4032 "sstatus": dev_sstatus,
4033 "children": dev_children,
4038 def Exec(self, feedback_fn):
4039 """Gather and return data"""
4041 for instance in self.wanted_instances:
4042 remote_info = rpc.call_instance_info(instance.primary_node,
4044 if remote_info and "state" in remote_info:
4047 remote_state = "down"
4048 if instance.status == "down":
4049 config_state = "down"
4053 disks = [self._ComputeDiskStatus(instance, None, device)
4054 for device in instance.disks]
4057 "name": instance.name,
4058 "config_state": config_state,
4059 "run_state": remote_state,
4060 "pnode": instance.primary_node,
4061 "snodes": instance.secondary_nodes,
4063 "memory": instance.memory,
4064 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4066 "network_port": instance.network_port,
4067 "vcpus": instance.vcpus,
4068 "kernel_path": instance.kernel_path,
4069 "initrd_path": instance.initrd_path,
4070 "hvm_boot_order": instance.hvm_boot_order,
4073 result[instance.name] = idict
4078 class LUSetInstanceParms(LogicalUnit):
4079 """Modifies an instances's parameters.
4082 HPATH = "instance-modify"
4083 HTYPE = constants.HTYPE_INSTANCE
4084 _OP_REQP = ["instance_name"]
4086 def BuildHooksEnv(self):
4089 This runs on the master, primary and secondaries.
4094 args['memory'] = self.mem
4096 args['vcpus'] = self.vcpus
4097 if self.do_ip or self.do_bridge:
4101 ip = self.instance.nics[0].ip
4103 bridge = self.bridge
4105 bridge = self.instance.nics[0].bridge
4106 args['nics'] = [(ip, bridge)]
4107 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4108 nl = [self.sstore.GetMasterNode(),
4109 self.instance.primary_node] + list(self.instance.secondary_nodes)
4112 def CheckPrereq(self):
4113 """Check prerequisites.
4115 This only checks the instance list against the existing names.
4118 self.mem = getattr(self.op, "mem", None)
4119 self.vcpus = getattr(self.op, "vcpus", None)
4120 self.ip = getattr(self.op, "ip", None)
4121 self.mac = getattr(self.op, "mac", None)
4122 self.bridge = getattr(self.op, "bridge", None)
4123 self.kernel_path = getattr(self.op, "kernel_path", None)
4124 self.initrd_path = getattr(self.op, "initrd_path", None)
4125 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4126 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4127 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4128 if all_parms.count(None) == len(all_parms):
4129 raise errors.OpPrereqError("No changes submitted")
4130 if self.mem is not None:
4132 self.mem = int(self.mem)
4133 except ValueError, err:
4134 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4135 if self.vcpus is not None:
4137 self.vcpus = int(self.vcpus)
4138 except ValueError, err:
4139 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4140 if self.ip is not None:
4142 if self.ip.lower() == "none":
4145 if not utils.IsValidIP(self.ip):
4146 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4149 self.do_bridge = (self.bridge is not None)
4150 if self.mac is not None:
4151 if self.cfg.IsMacInUse(self.mac):
4152 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4154 if not utils.IsValidMac(self.mac):
4155 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4157 if self.kernel_path is not None:
4158 self.do_kernel_path = True
4159 if self.kernel_path == constants.VALUE_NONE:
4160 raise errors.OpPrereqError("Can't set instance to no kernel")
4162 if self.kernel_path != constants.VALUE_DEFAULT:
4163 if not os.path.isabs(self.kernel_path):
4164 raise errors.OpPrereqError("The kernel path must be an absolute"
4167 self.do_kernel_path = False
4169 if self.initrd_path is not None:
4170 self.do_initrd_path = True
4171 if self.initrd_path not in (constants.VALUE_NONE,
4172 constants.VALUE_DEFAULT):
4173 if not os.path.isabs(self.kernel_path):
4174 raise errors.OpPrereqError("The initrd path must be an absolute"
4177 self.do_initrd_path = False
4179 # boot order verification
4180 if self.hvm_boot_order is not None:
4181 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4182 if len(self.hvm_boot_order.strip("acdn")) != 0:
4183 raise errors.OpPrereqError("invalid boot order specified,"
4184 " must be one or more of [acdn]"
4187 instance = self.cfg.GetInstanceInfo(
4188 self.cfg.ExpandInstanceName(self.op.instance_name))
4189 if instance is None:
4190 raise errors.OpPrereqError("No such instance name '%s'" %
4191 self.op.instance_name)
4192 self.op.instance_name = instance.name
4193 self.instance = instance
4196 def Exec(self, feedback_fn):
4197 """Modifies an instance.
4199 All parameters take effect only at the next restart of the instance.
4202 instance = self.instance
4204 instance.memory = self.mem
4205 result.append(("mem", self.mem))
4207 instance.vcpus = self.vcpus
4208 result.append(("vcpus", self.vcpus))
4210 instance.nics[0].ip = self.ip
4211 result.append(("ip", self.ip))
4213 instance.nics[0].bridge = self.bridge
4214 result.append(("bridge", self.bridge))
4216 instance.nics[0].mac = self.mac
4217 result.append(("mac", self.mac))
4218 if self.do_kernel_path:
4219 instance.kernel_path = self.kernel_path
4220 result.append(("kernel_path", self.kernel_path))
4221 if self.do_initrd_path:
4222 instance.initrd_path = self.initrd_path
4223 result.append(("initrd_path", self.initrd_path))
4224 if self.hvm_boot_order:
4225 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4226 instance.hvm_boot_order = None
4228 instance.hvm_boot_order = self.hvm_boot_order
4229 result.append(("hvm_boot_order", self.hvm_boot_order))
4231 self.cfg.AddInstance(instance)
4236 class LUQueryExports(NoHooksLU):
4237 """Query the exports list
4242 def CheckPrereq(self):
4243 """Check that the nodelist contains only existing nodes.
4246 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4248 def Exec(self, feedback_fn):
4249 """Compute the list of all the exported system images.
4252 a dictionary with the structure node->(export-list)
4253 where export-list is a list of the instances exported on
4257 return rpc.call_export_list(self.nodes)
4260 class LUExportInstance(LogicalUnit):
4261 """Export an instance to an image in the cluster.
4264 HPATH = "instance-export"
4265 HTYPE = constants.HTYPE_INSTANCE
4266 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4268 def BuildHooksEnv(self):
4271 This will run on the master, primary node and target node.
4275 "EXPORT_NODE": self.op.target_node,
4276 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4278 env.update(_BuildInstanceHookEnvByObject(self.instance))
4279 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4280 self.op.target_node]
4283 def CheckPrereq(self):
4284 """Check prerequisites.
4286 This checks that the instance name is a valid one.
4289 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4290 self.instance = self.cfg.GetInstanceInfo(instance_name)
4291 if self.instance is None:
4292 raise errors.OpPrereqError("Instance '%s' not found" %
4293 self.op.instance_name)
4296 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4297 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4299 if self.dst_node is None:
4300 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4301 self.op.target_node)
4302 self.op.target_node = self.dst_node.name
4304 def Exec(self, feedback_fn):
4305 """Export an instance to an image in the cluster.
4308 instance = self.instance
4309 dst_node = self.dst_node
4310 src_node = instance.primary_node
4311 # shutdown the instance, unless requested not to do so
4312 if self.op.shutdown:
4313 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4314 self.proc.ChainOpCode(op)
4316 vgname = self.cfg.GetVGName()
4321 for disk in instance.disks:
4322 if disk.iv_name == "sda":
4323 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4324 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4326 if not new_dev_name:
4327 logger.Error("could not snapshot block device %s on node %s" %
4328 (disk.logical_id[1], src_node))
4330 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4331 logical_id=(vgname, new_dev_name),
4332 physical_id=(vgname, new_dev_name),
4333 iv_name=disk.iv_name)
4334 snap_disks.append(new_dev)
4337 if self.op.shutdown:
4338 op = opcodes.OpStartupInstance(instance_name=instance.name,
4340 self.proc.ChainOpCode(op)
4342 # TODO: check for size
4344 for dev in snap_disks:
4345 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4347 logger.Error("could not export block device %s from node"
4349 (dev.logical_id[1], src_node, dst_node.name))
4350 if not rpc.call_blockdev_remove(src_node, dev):
4351 logger.Error("could not remove snapshot block device %s from"
4352 " node %s" % (dev.logical_id[1], src_node))
4354 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4355 logger.Error("could not finalize export for instance %s on node %s" %
4356 (instance.name, dst_node.name))
4358 nodelist = self.cfg.GetNodeList()
4359 nodelist.remove(dst_node.name)
4361 # on one-node clusters nodelist will be empty after the removal
4362 # if we proceed the backup would be removed because OpQueryExports
4363 # substitutes an empty list with the full cluster node list.
4365 op = opcodes.OpQueryExports(nodes=nodelist)
4366 exportlist = self.proc.ChainOpCode(op)
4367 for node in exportlist:
4368 if instance.name in exportlist[node]:
4369 if not rpc.call_export_remove(node, instance.name):
4370 logger.Error("could not remove older export for instance %s"
4371 " on node %s" % (instance.name, node))
4374 class TagsLU(NoHooksLU):
4377 This is an abstract class which is the parent of all the other tags LUs.
4380 def CheckPrereq(self):
4381 """Check prerequisites.
4384 if self.op.kind == constants.TAG_CLUSTER:
4385 self.target = self.cfg.GetClusterInfo()
4386 elif self.op.kind == constants.TAG_NODE:
4387 name = self.cfg.ExpandNodeName(self.op.name)
4389 raise errors.OpPrereqError("Invalid node name (%s)" %
4392 self.target = self.cfg.GetNodeInfo(name)
4393 elif self.op.kind == constants.TAG_INSTANCE:
4394 name = self.cfg.ExpandInstanceName(self.op.name)
4396 raise errors.OpPrereqError("Invalid instance name (%s)" %
4399 self.target = self.cfg.GetInstanceInfo(name)
4401 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4405 class LUGetTags(TagsLU):
4406 """Returns the tags of a given object.
4409 _OP_REQP = ["kind", "name"]
4411 def Exec(self, feedback_fn):
4412 """Returns the tag list.
4415 return self.target.GetTags()
4418 class LUSearchTags(NoHooksLU):
4419 """Searches the tags for a given pattern.
4422 _OP_REQP = ["pattern"]
4424 def CheckPrereq(self):
4425 """Check prerequisites.
4427 This checks the pattern passed for validity by compiling it.
4431 self.re = re.compile(self.op.pattern)
4432 except re.error, err:
4433 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4434 (self.op.pattern, err))
4436 def Exec(self, feedback_fn):
4437 """Returns the tag list.
4441 tgts = [("/cluster", cfg.GetClusterInfo())]
4442 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4443 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4444 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4445 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4447 for path, target in tgts:
4448 for tag in target.GetTags():
4449 if self.re.search(tag):
4450 results.append((path, tag))
4454 class LUAddTags(TagsLU):
4455 """Sets a tag on a given object.
4458 _OP_REQP = ["kind", "name", "tags"]
4460 def CheckPrereq(self):
4461 """Check prerequisites.
4463 This checks the type and length of the tag name and value.
4466 TagsLU.CheckPrereq(self)
4467 for tag in self.op.tags:
4468 objects.TaggableObject.ValidateTag(tag)
4470 def Exec(self, feedback_fn):
4475 for tag in self.op.tags:
4476 self.target.AddTag(tag)
4477 except errors.TagError, err:
4478 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4480 self.cfg.Update(self.target)
4481 except errors.ConfigurationError:
4482 raise errors.OpRetryError("There has been a modification to the"
4483 " config file and the operation has been"
4484 " aborted. Please retry.")
4487 class LUDelTags(TagsLU):
4488 """Delete a list of tags from a given object.
4491 _OP_REQP = ["kind", "name", "tags"]
4493 def CheckPrereq(self):
4494 """Check prerequisites.
4496 This checks that we have the given tag.
4499 TagsLU.CheckPrereq(self)
4500 for tag in self.op.tags:
4501 objects.TaggableObject.ValidateTag(tag)
4502 del_tags = frozenset(self.op.tags)
4503 cur_tags = self.target.GetTags()
4504 if not del_tags <= cur_tags:
4505 diff_tags = del_tags - cur_tags
4506 diff_names = ["'%s'" % tag for tag in diff_tags]
4508 raise errors.OpPrereqError("Tag(s) %s not found" %
4509 (",".join(diff_names)))
4511 def Exec(self, feedback_fn):
4512 """Remove the tag from the object.
4515 for tag in self.op.tags:
4516 self.target.RemoveTag(tag)
4518 self.cfg.Update(self.target)
4519 except errors.ConfigurationError:
4520 raise errors.OpRetryError("There has been a modification to the"
4521 " config file and the operation has been"
4522 " aborted. Please retry.")