4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
278 env["INSTANCE_NIC_COUNT"] = nic_count
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284 """Builds instance related env variables for hooks from an object.
287 instance: objects.Instance object of instance
288 override: dict of values to override
291 'name': instance.name,
292 'primary_node': instance.primary_node,
293 'secondary_nodes': instance.secondary_nodes,
294 'os_type': instance.os,
295 'status': instance.os,
296 'memory': instance.memory,
297 'vcpus': instance.vcpus,
298 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
301 args.update(override)
302 return _BuildInstanceHookEnv(**args)
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306 """Ensure a node has a correct known_hosts entry.
309 fullnode - Fully qualified domain name of host. (str)
310 ip - IPv4 address of host (str)
311 pubkey - the public key of the cluster
314 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
326 logger.Debug('read %s' % (repr(rawline),))
328 parts = rawline.rstrip('\r\n').split()
330 # Ignore unwanted lines
331 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332 fields = parts[0].split(',')
337 for spec in [ ip, fullnode ]:
338 if spec not in fields:
343 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344 if haveall and key == pubkey:
346 save_lines.append(rawline)
347 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350 if havesome and (not haveall or key != pubkey):
352 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355 save_lines.append(rawline)
358 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362 save_lines = save_lines + add_lines
364 # Write a new file and replace old.
365 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367 newfile = os.fdopen(fd, 'w')
369 newfile.write(''.join(save_lines))
372 logger.Debug("Wrote new known_hosts.")
373 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376 # Simply appending a new line will do the trick.
378 for add in add_lines:
384 def _HasValidVG(vglist, vgname):
385 """Checks if the volume group list is valid.
387 A non-None return value means there's an error, and the return value
388 is the error message.
391 vgsize = vglist.get(vgname, None)
393 return "volume group '%s' missing" % vgname
395 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
400 def _InitSSHSetup(node):
401 """Setup the SSH configuration for the cluster.
404 This generates a dsa keypair for root, adds the pub key to the
405 permitted hosts and adds the hostkey to its own known hosts.
408 node: the name of this host as a fqdn
411 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413 for name in priv_key, pub_key:
414 if os.path.exists(name):
415 utils.CreateBackup(name)
416 utils.RemoveFile(name)
418 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425 f = open(pub_key, 'r')
427 utils.AddAuthorizedKey(auth_keys, f.read(8192))
432 def _InitGanetiServerSetup(ss):
433 """Setup the necessary configuration for the initial node daemon.
435 This creates the nodepass file containing the shared password for
436 the cluster and also generates the SSL certificate.
439 # Create pseudo random password
440 randpass = sha.new(os.urandom(64)).hexdigest()
441 # and write it into sstore
442 ss.SetKey(ss.SS_NODED_PASS, randpass)
444 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445 "-days", str(365*5), "-nodes", "-x509",
446 "-keyout", constants.SSL_CERT_FILE,
447 "-out", constants.SSL_CERT_FILE, "-batch"])
449 raise errors.OpExecError("could not generate server ssl cert, command"
450 " %s had exitcode %s and error message %s" %
451 (result.cmd, result.exit_code, result.output))
453 os.chmod(constants.SSL_CERT_FILE, 0400)
455 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458 raise errors.OpExecError("Could not start the node daemon, command %s"
459 " had exitcode %s and error %s" %
460 (result.cmd, result.exit_code, result.output))
463 def _CheckInstanceBridgesExist(instance):
464 """Check that the brigdes needed by an instance exist.
467 # check bridges existance
468 brlist = [nic.bridge for nic in instance.nics]
469 if not rpc.call_bridges_exist(instance.primary_node, brlist):
470 raise errors.OpPrereqError("one or more target bridges %s does not"
471 " exist on destination node '%s'" %
472 (brlist, instance.primary_node))
475 class LUInitCluster(LogicalUnit):
476 """Initialise the cluster.
479 HPATH = "cluster-init"
480 HTYPE = constants.HTYPE_CLUSTER
481 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482 "def_bridge", "master_netdev"]
485 def BuildHooksEnv(self):
488 Notes: Since we don't require a cluster, we must manually add
489 ourselves in the post-run node list.
492 env = {"OP_TARGET": self.op.cluster_name}
493 return env, [], [self.hostname.name]
495 def CheckPrereq(self):
496 """Verify that the passed name is a valid one.
499 if config.ConfigWriter.IsCluster():
500 raise errors.OpPrereqError("Cluster is already initialised")
502 self.hostname = hostname = utils.HostInfo()
504 if hostname.ip.startswith("127."):
505 raise errors.OpPrereqError("This host's IP resolves to the private"
506 " range (%s). Please fix DNS or /etc/hosts." %
509 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
511 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512 constants.DEFAULT_NODED_PORT):
513 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514 " to %s,\nbut this ip address does not"
515 " belong to this host."
516 " Aborting." % hostname.ip)
518 secondary_ip = getattr(self.op, "secondary_ip", None)
519 if secondary_ip and not utils.IsValidIP(secondary_ip):
520 raise errors.OpPrereqError("Invalid secondary ip given")
522 secondary_ip != hostname.ip and
523 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524 constants.DEFAULT_NODED_PORT))):
525 raise errors.OpPrereqError("You gave %s as secondary IP,"
526 " but it does not belong to this host." %
528 self.secondary_ip = secondary_ip
530 # checks presence of the volume group given
531 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
534 raise errors.OpPrereqError("Error: %s" % vgstatus)
536 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
538 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
541 if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543 self.op.hypervisor_type)
545 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
547 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548 (self.op.master_netdev,
549 result.output.strip()))
551 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553 raise errors.OpPrereqError("Init.d script '%s' missing or not"
554 " executable." % constants.NODE_INITD_SCRIPT)
556 def Exec(self, feedback_fn):
557 """Initialize the cluster.
560 clustername = self.clustername
561 hostname = self.hostname
563 # set up the simple store
564 self.sstore = ss = ssconf.SimpleStore()
565 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
571 # set up the inter-node password and certificate
572 _InitGanetiServerSetup(ss)
574 # start the master ip
575 rpc.call_node_start_master(hostname.name)
577 # set up ssh config and /etc/hosts
578 f = open(constants.SSH_HOST_RSA_PUB, 'r')
583 sshkey = sshline.split(" ")[1]
585 _AddHostToEtcHosts(hostname.name)
587 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
589 _InitSSHSetup(hostname.name)
591 # init of cluster config file
592 self.cfg = cfgw = config.ConfigWriter()
593 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594 sshkey, self.op.mac_prefix,
595 self.op.vg_name, self.op.def_bridge)
598 class LUDestroyCluster(NoHooksLU):
599 """Logical unit for destroying the cluster.
604 def CheckPrereq(self):
605 """Check prerequisites.
607 This checks whether the cluster is empty.
609 Any errors are signalled by raising errors.OpPrereqError.
612 master = self.sstore.GetMasterNode()
614 nodelist = self.cfg.GetNodeList()
615 if len(nodelist) != 1 or nodelist[0] != master:
616 raise errors.OpPrereqError("There are still %d node(s) in"
617 " this cluster." % (len(nodelist) - 1))
618 instancelist = self.cfg.GetInstanceList()
620 raise errors.OpPrereqError("There are still %d instance(s) in"
621 " this cluster." % len(instancelist))
623 def Exec(self, feedback_fn):
624 """Destroys the cluster.
627 master = self.sstore.GetMasterNode()
628 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629 utils.CreateBackup(priv_key)
630 utils.CreateBackup(pub_key)
631 rpc.call_node_leave_cluster(master)
634 class LUVerifyCluster(NoHooksLU):
635 """Verifies the cluster status.
640 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641 remote_version, feedback_fn):
642 """Run multiple tests against a node.
645 - compares ganeti version
646 - checks vg existance and size > 20G
647 - checks config file checksum
648 - checks ssh to other nodes
651 node: name of the node to check
652 file_list: required list of files
653 local_cksum: dictionary of local files and their checksums
656 # compares ganeti version
657 local_version = constants.PROTOCOL_VERSION
658 if not remote_version:
659 feedback_fn(" - ERROR: connection to %s failed" % (node))
662 if local_version != remote_version:
663 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
664 (local_version, node, remote_version))
667 # checks vg existance and size > 20G
671 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
675 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
677 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
680 # checks config file checksum
683 if 'filelist' not in node_result:
685 feedback_fn(" - ERROR: node hasn't returned file checksum data")
687 remote_cksum = node_result['filelist']
688 for file_name in file_list:
689 if file_name not in remote_cksum:
691 feedback_fn(" - ERROR: file '%s' missing" % file_name)
692 elif remote_cksum[file_name] != local_cksum[file_name]:
694 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
696 if 'nodelist' not in node_result:
698 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
700 if node_result['nodelist']:
702 for node in node_result['nodelist']:
703 feedback_fn(" - ERROR: communication with node '%s': %s" %
704 (node, node_result['nodelist'][node]))
705 hyp_result = node_result.get('hypervisor', None)
706 if hyp_result is not None:
707 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
710 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711 """Verify an instance.
713 This function checks to see if the required block devices are
714 available on the instance's node.
719 instancelist = self.cfg.GetInstanceList()
720 if not instance in instancelist:
721 feedback_fn(" - ERROR: instance %s not in instance list %s" %
722 (instance, instancelist))
725 instanceconfig = self.cfg.GetInstanceInfo(instance)
726 node_current = instanceconfig.primary_node
729 instanceconfig.MapLVsByNode(node_vol_should)
731 for node in node_vol_should:
732 for volume in node_vol_should[node]:
733 if node not in node_vol_is or volume not in node_vol_is[node]:
734 feedback_fn(" - ERROR: volume %s missing on node %s" %
738 if not instanceconfig.status == 'down':
739 if not instance in node_instance[node_current]:
740 feedback_fn(" - ERROR: instance %s not running on node %s" %
741 (instance, node_current))
744 for node in node_instance:
745 if (not node == node_current):
746 if instance in node_instance[node]:
747 feedback_fn(" - ERROR: instance %s should not run on node %s" %
753 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754 """Verify if there are any unknown volumes in the cluster.
756 The .os, .swap and backup volumes are ignored. All other volumes are
762 for node in node_vol_is:
763 for volume in node_vol_is[node]:
764 if node not in node_vol_should or volume not in node_vol_should[node]:
765 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
770 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771 """Verify the list of running instances.
773 This checks what instances are running but unknown to the cluster.
777 for node in node_instance:
778 for runninginstance in node_instance[node]:
779 if runninginstance not in instancelist:
780 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
781 (runninginstance, node))
785 def CheckPrereq(self):
786 """Check prerequisites.
788 This has no prerequisites.
793 def Exec(self, feedback_fn):
794 """Verify integrity of cluster, performing various test on nodes.
798 feedback_fn("* Verifying global settings")
799 for msg in self.cfg.VerifyConfig():
800 feedback_fn(" - ERROR: %s" % msg)
802 vg_name = self.cfg.GetVGName()
803 nodelist = utils.NiceSort(self.cfg.GetNodeList())
804 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
808 # FIXME: verify OS list
810 file_names = list(self.sstore.GetFileList())
811 file_names.append(constants.SSL_CERT_FILE)
812 file_names.append(constants.CLUSTER_CONF_FILE)
813 local_checksums = utils.FingerprintFiles(file_names)
815 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817 all_instanceinfo = rpc.call_instance_list(nodelist)
818 all_vglist = rpc.call_vg_list(nodelist)
819 node_verify_param = {
820 'filelist': file_names,
821 'nodelist': nodelist,
824 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825 all_rversion = rpc.call_version(nodelist)
827 for node in nodelist:
828 feedback_fn("* Verifying node %s" % node)
829 result = self._VerifyNode(node, file_names, local_checksums,
830 all_vglist[node], all_nvinfo[node],
831 all_rversion[node], feedback_fn)
835 volumeinfo = all_volumeinfo[node]
837 if type(volumeinfo) != dict:
838 feedback_fn(" - ERROR: connection to %s failed" % (node,))
842 node_volume[node] = volumeinfo
845 nodeinstance = all_instanceinfo[node]
846 if type(nodeinstance) != list:
847 feedback_fn(" - ERROR: connection to %s failed" % (node,))
851 node_instance[node] = nodeinstance
855 for instance in instancelist:
856 feedback_fn("* Verifying instance %s" % instance)
857 result = self._VerifyInstance(instance, node_volume, node_instance,
861 inst_config = self.cfg.GetInstanceInfo(instance)
863 inst_config.MapLVsByNode(node_vol_should)
865 feedback_fn("* Verifying orphan volumes")
866 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
870 feedback_fn("* Verifying remaining instances")
871 result = self._VerifyOrphanInstances(instancelist, node_instance,
878 class LUVerifyDisks(NoHooksLU):
879 """Verifies the cluster disks status.
884 def CheckPrereq(self):
885 """Check prerequisites.
887 This has no prerequisites.
892 def Exec(self, feedback_fn):
893 """Verify integrity of cluster disks.
896 result = res_nodes, res_instances = [], []
898 vg_name = self.cfg.GetVGName()
899 nodes = utils.NiceSort(self.cfg.GetNodeList())
900 instances = [self.cfg.GetInstanceInfo(name)
901 for name in self.cfg.GetInstanceList()]
904 for inst in instances:
906 if (inst.status != "up" or
907 inst.disk_template not in constants.DTS_NET_MIRROR):
909 inst.MapLVsByNode(inst_lvs)
910 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
911 for node, vol_list in inst_lvs.iteritems():
913 nv_dict[(node, vol)] = inst
918 node_lvs = rpc.call_volume_list(nodes, vg_name)
925 if not isinstance(lvs, dict):
926 logger.Info("connection to node %s failed or invalid data returned" %
928 res_nodes.append(node)
931 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
933 inst = nv_dict.get((node, lv_name), None)
934 if inst is not None and inst.name not in res_instances:
935 res_instances.append(inst.name)
940 class LURenameCluster(LogicalUnit):
941 """Rename the cluster.
944 HPATH = "cluster-rename"
945 HTYPE = constants.HTYPE_CLUSTER
948 def BuildHooksEnv(self):
953 "OP_TARGET": self.op.sstore.GetClusterName(),
954 "NEW_NAME": self.op.name,
956 mn = self.sstore.GetMasterNode()
957 return env, [mn], [mn]
959 def CheckPrereq(self):
960 """Verify that the passed name is a valid one.
963 hostname = utils.HostInfo(self.op.name)
965 new_name = hostname.name
966 self.ip = new_ip = hostname.ip
967 old_name = self.sstore.GetClusterName()
968 old_ip = self.sstore.GetMasterIP()
969 if new_name == old_name and new_ip == old_ip:
970 raise errors.OpPrereqError("Neither the name nor the IP address of the"
971 " cluster has changed")
973 result = utils.RunCmd(["fping", "-q", new_ip])
974 if not result.failed:
975 raise errors.OpPrereqError("The given cluster IP address (%s) is"
976 " reachable on the network. Aborting." %
979 self.op.name = new_name
981 def Exec(self, feedback_fn):
982 """Rename the cluster.
985 clustername = self.op.name
989 # shutdown the master IP
990 master = ss.GetMasterNode()
991 if not rpc.call_node_stop_master(master):
992 raise errors.OpExecError("Could not disable the master role")
996 ss.SetKey(ss.SS_MASTER_IP, ip)
997 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
999 # Distribute updated ss config to all nodes
1000 myself = self.cfg.GetNodeInfo(master)
1001 dist_nodes = self.cfg.GetNodeList()
1002 if myself.name in dist_nodes:
1003 dist_nodes.remove(myself.name)
1005 logger.Debug("Copying updated ssconf data to all nodes")
1006 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007 fname = ss.KeyToFilename(keyname)
1008 result = rpc.call_upload_file(dist_nodes, fname)
1009 for to_node in dist_nodes:
1010 if not result[to_node]:
1011 logger.Error("copy of file %s to node %s failed" %
1014 if not rpc.call_node_start_master(master):
1015 logger.Error("Could not re-enable the master role on the master,"
1016 " please restart manually.")
1019 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1020 """Sleep and poll for an instance's disk to sync.
1023 if not instance.disks:
1027 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1029 node = instance.primary_node
1031 for dev in instance.disks:
1032 cfgw.SetDiskID(dev, node)
1038 cumul_degraded = False
1039 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1041 proc.LogWarning("Can't get any data from node %s" % node)
1044 raise errors.RemoteError("Can't contact node %s for mirror data,"
1045 " aborting." % node)
1049 for i in range(len(rstats)):
1052 proc.LogWarning("Can't compute data for node %s/%s" %
1053 (node, instance.disks[i].iv_name))
1055 # we ignore the ldisk parameter
1056 perc_done, est_time, is_degraded, _ = mstat
1057 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1058 if perc_done is not None:
1060 if est_time is not None:
1061 rem_time = "%d estimated seconds remaining" % est_time
1064 rem_time = "no time estimate"
1065 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1066 (instance.disks[i].iv_name, perc_done, rem_time))
1073 time.sleep(min(60, max_time))
1079 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1080 return not cumul_degraded
1083 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1084 """Check that mirrors are not degraded.
1086 The ldisk parameter, if True, will change the test from the
1087 is_degraded attribute (which represents overall non-ok status for
1088 the device(s)) to the ldisk (representing the local storage status).
1091 cfgw.SetDiskID(dev, node)
1098 if on_primary or dev.AssembleOnSecondary():
1099 rstats = rpc.call_blockdev_find(node, dev)
1101 logger.ToStderr("Can't get any data from node %s" % node)
1104 result = result and (not rstats[idx])
1106 for child in dev.children:
1107 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1112 class LUDiagnoseOS(NoHooksLU):
1113 """Logical unit for OS diagnose/query.
1118 def CheckPrereq(self):
1119 """Check prerequisites.
1121 This always succeeds, since this is a pure query LU.
1126 def Exec(self, feedback_fn):
1127 """Compute the list of OSes.
1130 node_list = self.cfg.GetNodeList()
1131 node_data = rpc.call_os_diagnose(node_list)
1132 if node_data == False:
1133 raise errors.OpExecError("Can't gather the list of OSes")
1137 class LURemoveNode(LogicalUnit):
1138 """Logical unit for removing a node.
1141 HPATH = "node-remove"
1142 HTYPE = constants.HTYPE_NODE
1143 _OP_REQP = ["node_name"]
1145 def BuildHooksEnv(self):
1148 This doesn't run on the target node in the pre phase as a failed
1149 node would not allows itself to run.
1153 "OP_TARGET": self.op.node_name,
1154 "NODE_NAME": self.op.node_name,
1156 all_nodes = self.cfg.GetNodeList()
1157 all_nodes.remove(self.op.node_name)
1158 return env, all_nodes, all_nodes
1160 def CheckPrereq(self):
1161 """Check prerequisites.
1164 - the node exists in the configuration
1165 - it does not have primary or secondary instances
1166 - it's not the master
1168 Any errors are signalled by raising errors.OpPrereqError.
1171 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1173 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1175 instance_list = self.cfg.GetInstanceList()
1177 masternode = self.sstore.GetMasterNode()
1178 if node.name == masternode:
1179 raise errors.OpPrereqError("Node is the master node,"
1180 " you need to failover first.")
1182 for instance_name in instance_list:
1183 instance = self.cfg.GetInstanceInfo(instance_name)
1184 if node.name == instance.primary_node:
1185 raise errors.OpPrereqError("Instance %s still running on the node,"
1186 " please remove first." % instance_name)
1187 if node.name in instance.secondary_nodes:
1188 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1189 " please remove first." % instance_name)
1190 self.op.node_name = node.name
1193 def Exec(self, feedback_fn):
1194 """Removes the node from the cluster.
1198 logger.Info("stopping the node daemon and removing configs from node %s" %
1201 rpc.call_node_leave_cluster(node.name)
1203 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1205 logger.Info("Removing node %s from config" % node.name)
1207 self.cfg.RemoveNode(node.name)
1209 _RemoveHostFromEtcHosts(node.name)
1212 class LUQueryNodes(NoHooksLU):
1213 """Logical unit for querying nodes.
1216 _OP_REQP = ["output_fields", "names"]
1218 def CheckPrereq(self):
1219 """Check prerequisites.
1221 This checks that the fields required are valid output fields.
1224 self.dynamic_fields = frozenset(["dtotal", "dfree",
1225 "mtotal", "mnode", "mfree",
1228 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1229 "pinst_list", "sinst_list",
1231 dynamic=self.dynamic_fields,
1232 selected=self.op.output_fields)
1234 self.wanted = _GetWantedNodes(self, self.op.names)
1236 def Exec(self, feedback_fn):
1237 """Computes the list of nodes and their attributes.
1240 nodenames = self.wanted
1241 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1243 # begin data gathering
1245 if self.dynamic_fields.intersection(self.op.output_fields):
1247 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1248 for name in nodenames:
1249 nodeinfo = node_data.get(name, None)
1252 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1253 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1254 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1255 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1256 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1257 "bootid": nodeinfo['bootid'],
1260 live_data[name] = {}
1262 live_data = dict.fromkeys(nodenames, {})
1264 node_to_primary = dict([(name, set()) for name in nodenames])
1265 node_to_secondary = dict([(name, set()) for name in nodenames])
1267 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1268 "sinst_cnt", "sinst_list"))
1269 if inst_fields & frozenset(self.op.output_fields):
1270 instancelist = self.cfg.GetInstanceList()
1272 for instance_name in instancelist:
1273 inst = self.cfg.GetInstanceInfo(instance_name)
1274 if inst.primary_node in node_to_primary:
1275 node_to_primary[inst.primary_node].add(inst.name)
1276 for secnode in inst.secondary_nodes:
1277 if secnode in node_to_secondary:
1278 node_to_secondary[secnode].add(inst.name)
1280 # end data gathering
1283 for node in nodelist:
1285 for field in self.op.output_fields:
1288 elif field == "pinst_list":
1289 val = list(node_to_primary[node.name])
1290 elif field == "sinst_list":
1291 val = list(node_to_secondary[node.name])
1292 elif field == "pinst_cnt":
1293 val = len(node_to_primary[node.name])
1294 elif field == "sinst_cnt":
1295 val = len(node_to_secondary[node.name])
1296 elif field == "pip":
1297 val = node.primary_ip
1298 elif field == "sip":
1299 val = node.secondary_ip
1300 elif field in self.dynamic_fields:
1301 val = live_data[node.name].get(field, None)
1303 raise errors.ParameterError(field)
1304 node_output.append(val)
1305 output.append(node_output)
1310 class LUQueryNodeVolumes(NoHooksLU):
1311 """Logical unit for getting volumes on node(s).
1314 _OP_REQP = ["nodes", "output_fields"]
1316 def CheckPrereq(self):
1317 """Check prerequisites.
1319 This checks that the fields required are valid output fields.
1322 self.nodes = _GetWantedNodes(self, self.op.nodes)
1324 _CheckOutputFields(static=["node"],
1325 dynamic=["phys", "vg", "name", "size", "instance"],
1326 selected=self.op.output_fields)
1329 def Exec(self, feedback_fn):
1330 """Computes the list of nodes and their attributes.
1333 nodenames = self.nodes
1334 volumes = rpc.call_node_volumes(nodenames)
1336 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1337 in self.cfg.GetInstanceList()]
1339 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1342 for node in nodenames:
1343 if node not in volumes or not volumes[node]:
1346 node_vols = volumes[node][:]
1347 node_vols.sort(key=lambda vol: vol['dev'])
1349 for vol in node_vols:
1351 for field in self.op.output_fields:
1354 elif field == "phys":
1358 elif field == "name":
1360 elif field == "size":
1361 val = int(float(vol['size']))
1362 elif field == "instance":
1364 if node not in lv_by_node[inst]:
1366 if vol['name'] in lv_by_node[inst][node]:
1372 raise errors.ParameterError(field)
1373 node_output.append(str(val))
1375 output.append(node_output)
1380 class LUAddNode(LogicalUnit):
1381 """Logical unit for adding node to the cluster.
1385 HTYPE = constants.HTYPE_NODE
1386 _OP_REQP = ["node_name"]
1388 def BuildHooksEnv(self):
1391 This will run on all nodes before, and on all nodes + the new node after.
1395 "OP_TARGET": self.op.node_name,
1396 "NODE_NAME": self.op.node_name,
1397 "NODE_PIP": self.op.primary_ip,
1398 "NODE_SIP": self.op.secondary_ip,
1400 nodes_0 = self.cfg.GetNodeList()
1401 nodes_1 = nodes_0 + [self.op.node_name, ]
1402 return env, nodes_0, nodes_1
1404 def CheckPrereq(self):
1405 """Check prerequisites.
1408 - the new node is not already in the config
1410 - its parameters (single/dual homed) matches the cluster
1412 Any errors are signalled by raising errors.OpPrereqError.
1415 node_name = self.op.node_name
1418 dns_data = utils.HostInfo(node_name)
1420 node = dns_data.name
1421 primary_ip = self.op.primary_ip = dns_data.ip
1422 secondary_ip = getattr(self.op, "secondary_ip", None)
1423 if secondary_ip is None:
1424 secondary_ip = primary_ip
1425 if not utils.IsValidIP(secondary_ip):
1426 raise errors.OpPrereqError("Invalid secondary IP given")
1427 self.op.secondary_ip = secondary_ip
1428 node_list = cfg.GetNodeList()
1429 if node in node_list:
1430 raise errors.OpPrereqError("Node %s is already in the configuration"
1433 for existing_node_name in node_list:
1434 existing_node = cfg.GetNodeInfo(existing_node_name)
1435 if (existing_node.primary_ip == primary_ip or
1436 existing_node.secondary_ip == primary_ip or
1437 existing_node.primary_ip == secondary_ip or
1438 existing_node.secondary_ip == secondary_ip):
1439 raise errors.OpPrereqError("New node ip address(es) conflict with"
1440 " existing node %s" % existing_node.name)
1442 # check that the type of the node (single versus dual homed) is the
1443 # same as for the master
1444 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1445 master_singlehomed = myself.secondary_ip == myself.primary_ip
1446 newbie_singlehomed = secondary_ip == primary_ip
1447 if master_singlehomed != newbie_singlehomed:
1448 if master_singlehomed:
1449 raise errors.OpPrereqError("The master has no private ip but the"
1450 " new node has one")
1452 raise errors.OpPrereqError("The master has a private ip but the"
1453 " new node doesn't have one")
1455 # checks reachablity
1456 if not utils.TcpPing(utils.HostInfo().name,
1458 constants.DEFAULT_NODED_PORT):
1459 raise errors.OpPrereqError("Node not reachable by ping")
1461 if not newbie_singlehomed:
1462 # check reachability from my secondary ip to newbie's secondary ip
1463 if not utils.TcpPing(myself.secondary_ip,
1465 constants.DEFAULT_NODED_PORT):
1466 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1467 " based ping to noded port")
1469 self.new_node = objects.Node(name=node,
1470 primary_ip=primary_ip,
1471 secondary_ip=secondary_ip)
1473 def Exec(self, feedback_fn):
1474 """Adds the new node to the cluster.
1477 new_node = self.new_node
1478 node = new_node.name
1480 # set up inter-node password and certificate and restarts the node daemon
1481 gntpass = self.sstore.GetNodeDaemonPassword()
1482 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1483 raise errors.OpExecError("ganeti password corruption detected")
1484 f = open(constants.SSL_CERT_FILE)
1486 gntpem = f.read(8192)
1489 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1490 # so we use this to detect an invalid certificate; as long as the
1491 # cert doesn't contain this, the here-document will be correctly
1492 # parsed by the shell sequence below
1493 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1494 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1495 if not gntpem.endswith("\n"):
1496 raise errors.OpExecError("PEM must end with newline")
1497 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1499 # and then connect with ssh to set password and start ganeti-noded
1500 # note that all the below variables are sanitized at this point,
1501 # either by being constants or by the checks above
1503 mycommand = ("umask 077 && "
1504 "echo '%s' > '%s' && "
1505 "cat > '%s' << '!EOF.' && \n"
1506 "%s!EOF.\n%s restart" %
1507 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1508 constants.SSL_CERT_FILE, gntpem,
1509 constants.NODE_INITD_SCRIPT))
1511 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1513 raise errors.OpExecError("Remote command on node %s, error: %s,"
1515 (node, result.fail_reason, result.output))
1517 # check connectivity
1520 result = rpc.call_version([node])[node]
1522 if constants.PROTOCOL_VERSION == result:
1523 logger.Info("communication to node %s fine, sw version %s match" %
1526 raise errors.OpExecError("Version mismatch master version %s,"
1527 " node version %s" %
1528 (constants.PROTOCOL_VERSION, result))
1530 raise errors.OpExecError("Cannot get version from the new node")
1533 logger.Info("copy ssh key to node %s" % node)
1534 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1536 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1537 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1543 keyarray.append(f.read())
1547 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1548 keyarray[3], keyarray[4], keyarray[5])
1551 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1553 # Add node to our /etc/hosts, and add key to known_hosts
1554 _AddHostToEtcHosts(new_node.name)
1556 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1557 self.cfg.GetHostKey())
1559 if new_node.secondary_ip != new_node.primary_ip:
1560 if not rpc.call_node_tcp_ping(new_node.name,
1561 constants.LOCALHOST_IP_ADDRESS,
1562 new_node.secondary_ip,
1563 constants.DEFAULT_NODED_PORT,
1565 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1566 " you gave (%s). Please fix and re-run this"
1567 " command." % new_node.secondary_ip)
1569 success, msg = ssh.VerifyNodeHostname(node)
1571 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1572 " than the one the resolver gives: %s."
1573 " Please fix and re-run this command." %
1576 # Distribute updated /etc/hosts and known_hosts to all nodes,
1577 # including the node just added
1578 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1579 dist_nodes = self.cfg.GetNodeList() + [node]
1580 if myself.name in dist_nodes:
1581 dist_nodes.remove(myself.name)
1583 logger.Debug("Copying hosts and known_hosts to all nodes")
1584 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1585 result = rpc.call_upload_file(dist_nodes, fname)
1586 for to_node in dist_nodes:
1587 if not result[to_node]:
1588 logger.Error("copy of file %s to node %s failed" %
1591 to_copy = ss.GetFileList()
1592 for fname in to_copy:
1593 if not ssh.CopyFileToNode(node, fname):
1594 logger.Error("could not copy file %s to node %s" % (fname, node))
1596 logger.Info("adding node %s to cluster.conf" % node)
1597 self.cfg.AddNode(new_node)
1600 class LUMasterFailover(LogicalUnit):
1601 """Failover the master node to the current node.
1603 This is a special LU in that it must run on a non-master node.
1606 HPATH = "master-failover"
1607 HTYPE = constants.HTYPE_CLUSTER
1611 def BuildHooksEnv(self):
1614 This will run on the new master only in the pre phase, and on all
1615 the nodes in the post phase.
1619 "OP_TARGET": self.new_master,
1620 "NEW_MASTER": self.new_master,
1621 "OLD_MASTER": self.old_master,
1623 return env, [self.new_master], self.cfg.GetNodeList()
1625 def CheckPrereq(self):
1626 """Check prerequisites.
1628 This checks that we are not already the master.
1631 self.new_master = utils.HostInfo().name
1632 self.old_master = self.sstore.GetMasterNode()
1634 if self.old_master == self.new_master:
1635 raise errors.OpPrereqError("This commands must be run on the node"
1636 " where you want the new master to be."
1637 " %s is already the master" %
1640 def Exec(self, feedback_fn):
1641 """Failover the master node.
1643 This command, when run on a non-master node, will cause the current
1644 master to cease being master, and the non-master to become new
1648 #TODO: do not rely on gethostname returning the FQDN
1649 logger.Info("setting master to %s, old master: %s" %
1650 (self.new_master, self.old_master))
1652 if not rpc.call_node_stop_master(self.old_master):
1653 logger.Error("could disable the master role on the old master"
1654 " %s, please disable manually" % self.old_master)
1657 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1658 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1659 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1660 logger.Error("could not distribute the new simple store master file"
1661 " to the other nodes, please check.")
1663 if not rpc.call_node_start_master(self.new_master):
1664 logger.Error("could not start the master role on the new master"
1665 " %s, please check" % self.new_master)
1666 feedback_fn("Error in activating the master IP on the new master,"
1667 " please fix manually.")
1671 class LUQueryClusterInfo(NoHooksLU):
1672 """Query cluster configuration.
1678 def CheckPrereq(self):
1679 """No prerequsites needed for this LU.
1684 def Exec(self, feedback_fn):
1685 """Return cluster config.
1689 "name": self.sstore.GetClusterName(),
1690 "software_version": constants.RELEASE_VERSION,
1691 "protocol_version": constants.PROTOCOL_VERSION,
1692 "config_version": constants.CONFIG_VERSION,
1693 "os_api_version": constants.OS_API_VERSION,
1694 "export_version": constants.EXPORT_VERSION,
1695 "master": self.sstore.GetMasterNode(),
1696 "architecture": (platform.architecture()[0], platform.machine()),
1702 class LUClusterCopyFile(NoHooksLU):
1703 """Copy file to cluster.
1706 _OP_REQP = ["nodes", "filename"]
1708 def CheckPrereq(self):
1709 """Check prerequisites.
1711 It should check that the named file exists and that the given list
1715 if not os.path.exists(self.op.filename):
1716 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1718 self.nodes = _GetWantedNodes(self, self.op.nodes)
1720 def Exec(self, feedback_fn):
1721 """Copy a file from master to some nodes.
1724 opts - class with options as members
1725 args - list containing a single element, the file name
1727 nodes - list containing the name of target nodes; if empty, all nodes
1730 filename = self.op.filename
1732 myname = utils.HostInfo().name
1734 for node in self.nodes:
1737 if not ssh.CopyFileToNode(node, filename):
1738 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1741 class LUDumpClusterConfig(NoHooksLU):
1742 """Return a text-representation of the cluster-config.
1747 def CheckPrereq(self):
1748 """No prerequisites.
1753 def Exec(self, feedback_fn):
1754 """Dump a representation of the cluster config to the standard output.
1757 return self.cfg.DumpConfig()
1760 class LURunClusterCommand(NoHooksLU):
1761 """Run a command on some nodes.
1764 _OP_REQP = ["command", "nodes"]
1766 def CheckPrereq(self):
1767 """Check prerequisites.
1769 It checks that the given list of nodes is valid.
1772 self.nodes = _GetWantedNodes(self, self.op.nodes)
1774 def Exec(self, feedback_fn):
1775 """Run a command on some nodes.
1779 for node in self.nodes:
1780 result = ssh.SSHCall(node, "root", self.op.command)
1781 data.append((node, result.output, result.exit_code))
1786 class LUActivateInstanceDisks(NoHooksLU):
1787 """Bring up an instance's disks.
1790 _OP_REQP = ["instance_name"]
1792 def CheckPrereq(self):
1793 """Check prerequisites.
1795 This checks that the instance is in the cluster.
1798 instance = self.cfg.GetInstanceInfo(
1799 self.cfg.ExpandInstanceName(self.op.instance_name))
1800 if instance is None:
1801 raise errors.OpPrereqError("Instance '%s' not known" %
1802 self.op.instance_name)
1803 self.instance = instance
1806 def Exec(self, feedback_fn):
1807 """Activate the disks.
1810 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1812 raise errors.OpExecError("Cannot activate block devices")
1817 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1818 """Prepare the block devices for an instance.
1820 This sets up the block devices on all nodes.
1823 instance: a ganeti.objects.Instance object
1824 ignore_secondaries: if true, errors on secondary nodes won't result
1825 in an error return from the function
1828 false if the operation failed
1829 list of (host, instance_visible_name, node_visible_name) if the operation
1830 suceeded with the mapping from node devices to instance devices
1834 for inst_disk in instance.disks:
1835 master_result = None
1836 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1837 cfg.SetDiskID(node_disk, node)
1838 is_primary = node == instance.primary_node
1839 result = rpc.call_blockdev_assemble(node, node_disk,
1840 instance.name, is_primary)
1842 logger.Error("could not prepare block device %s on node %s"
1843 " (is_primary=%s)" %
1844 (inst_disk.iv_name, node, is_primary))
1845 if is_primary or not ignore_secondaries:
1848 master_result = result
1849 device_info.append((instance.primary_node, inst_disk.iv_name,
1852 # leave the disks configured for the primary node
1853 # this is a workaround that would be fixed better by
1854 # improving the logical/physical id handling
1855 for disk in instance.disks:
1856 cfg.SetDiskID(disk, instance.primary_node)
1858 return disks_ok, device_info
1861 def _StartInstanceDisks(cfg, instance, force):
1862 """Start the disks of an instance.
1865 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1866 ignore_secondaries=force)
1868 _ShutdownInstanceDisks(instance, cfg)
1869 if force is not None and not force:
1870 logger.Error("If the message above refers to a secondary node,"
1871 " you can retry the operation using '--force'.")
1872 raise errors.OpExecError("Disk consistency error")
1875 class LUDeactivateInstanceDisks(NoHooksLU):
1876 """Shutdown an instance's disks.
1879 _OP_REQP = ["instance_name"]
1881 def CheckPrereq(self):
1882 """Check prerequisites.
1884 This checks that the instance is in the cluster.
1887 instance = self.cfg.GetInstanceInfo(
1888 self.cfg.ExpandInstanceName(self.op.instance_name))
1889 if instance is None:
1890 raise errors.OpPrereqError("Instance '%s' not known" %
1891 self.op.instance_name)
1892 self.instance = instance
1894 def Exec(self, feedback_fn):
1895 """Deactivate the disks
1898 instance = self.instance
1899 ins_l = rpc.call_instance_list([instance.primary_node])
1900 ins_l = ins_l[instance.primary_node]
1901 if not type(ins_l) is list:
1902 raise errors.OpExecError("Can't contact node '%s'" %
1903 instance.primary_node)
1905 if self.instance.name in ins_l:
1906 raise errors.OpExecError("Instance is running, can't shutdown"
1909 _ShutdownInstanceDisks(instance, self.cfg)
1912 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1913 """Shutdown block devices of an instance.
1915 This does the shutdown on all nodes of the instance.
1917 If the ignore_primary is false, errors on the primary node are
1922 for disk in instance.disks:
1923 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1924 cfg.SetDiskID(top_disk, node)
1925 if not rpc.call_blockdev_shutdown(node, top_disk):
1926 logger.Error("could not shutdown block device %s on node %s" %
1927 (disk.iv_name, node))
1928 if not ignore_primary or node != instance.primary_node:
1933 class LUStartupInstance(LogicalUnit):
1934 """Starts an instance.
1937 HPATH = "instance-start"
1938 HTYPE = constants.HTYPE_INSTANCE
1939 _OP_REQP = ["instance_name", "force"]
1941 def BuildHooksEnv(self):
1944 This runs on master, primary and secondary nodes of the instance.
1948 "FORCE": self.op.force,
1950 env.update(_BuildInstanceHookEnvByObject(self.instance))
1951 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1952 list(self.instance.secondary_nodes))
1955 def CheckPrereq(self):
1956 """Check prerequisites.
1958 This checks that the instance is in the cluster.
1961 instance = self.cfg.GetInstanceInfo(
1962 self.cfg.ExpandInstanceName(self.op.instance_name))
1963 if instance is None:
1964 raise errors.OpPrereqError("Instance '%s' not known" %
1965 self.op.instance_name)
1967 # check bridges existance
1968 _CheckInstanceBridgesExist(instance)
1970 self.instance = instance
1971 self.op.instance_name = instance.name
1973 def Exec(self, feedback_fn):
1974 """Start the instance.
1977 instance = self.instance
1978 force = self.op.force
1979 extra_args = getattr(self.op, "extra_args", "")
1981 node_current = instance.primary_node
1983 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1985 raise errors.OpExecError("Could not contact node %s for infos" %
1988 freememory = nodeinfo[node_current]['memory_free']
1989 memory = instance.memory
1990 if memory > freememory:
1991 raise errors.OpExecError("Not enough memory to start instance"
1993 " needed %s MiB, available %s MiB" %
1994 (instance.name, node_current, memory,
1997 _StartInstanceDisks(self.cfg, instance, force)
1999 if not rpc.call_instance_start(node_current, instance, extra_args):
2000 _ShutdownInstanceDisks(instance, self.cfg)
2001 raise errors.OpExecError("Could not start instance")
2003 self.cfg.MarkInstanceUp(instance.name)
2006 class LURebootInstance(LogicalUnit):
2007 """Reboot an instance.
2010 HPATH = "instance-reboot"
2011 HTYPE = constants.HTYPE_INSTANCE
2012 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2014 def BuildHooksEnv(self):
2017 This runs on master, primary and secondary nodes of the instance.
2021 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2023 env.update(_BuildInstanceHookEnvByObject(self.instance))
2024 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2025 list(self.instance.secondary_nodes))
2028 def CheckPrereq(self):
2029 """Check prerequisites.
2031 This checks that the instance is in the cluster.
2034 instance = self.cfg.GetInstanceInfo(
2035 self.cfg.ExpandInstanceName(self.op.instance_name))
2036 if instance is None:
2037 raise errors.OpPrereqError("Instance '%s' not known" %
2038 self.op.instance_name)
2040 # check bridges existance
2041 _CheckInstanceBridgesExist(instance)
2043 self.instance = instance
2044 self.op.instance_name = instance.name
2046 def Exec(self, feedback_fn):
2047 """Reboot the instance.
2050 instance = self.instance
2051 ignore_secondaries = self.op.ignore_secondaries
2052 reboot_type = self.op.reboot_type
2053 extra_args = getattr(self.op, "extra_args", "")
2055 node_current = instance.primary_node
2057 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2058 constants.INSTANCE_REBOOT_HARD,
2059 constants.INSTANCE_REBOOT_FULL]:
2060 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2061 (constants.INSTANCE_REBOOT_SOFT,
2062 constants.INSTANCE_REBOOT_HARD,
2063 constants.INSTANCE_REBOOT_FULL))
2065 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2066 constants.INSTANCE_REBOOT_HARD]:
2067 if not rpc.call_instance_reboot(node_current, instance,
2068 reboot_type, extra_args):
2069 raise errors.OpExecError("Could not reboot instance")
2071 if not rpc.call_instance_shutdown(node_current, instance):
2072 raise errors.OpExecError("could not shutdown instance for full reboot")
2073 _ShutdownInstanceDisks(instance, self.cfg)
2074 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2075 if not rpc.call_instance_start(node_current, instance, extra_args):
2076 _ShutdownInstanceDisks(instance, self.cfg)
2077 raise errors.OpExecError("Could not start instance for full reboot")
2079 self.cfg.MarkInstanceUp(instance.name)
2082 class LUShutdownInstance(LogicalUnit):
2083 """Shutdown an instance.
2086 HPATH = "instance-stop"
2087 HTYPE = constants.HTYPE_INSTANCE
2088 _OP_REQP = ["instance_name"]
2090 def BuildHooksEnv(self):
2093 This runs on master, primary and secondary nodes of the instance.
2096 env = _BuildInstanceHookEnvByObject(self.instance)
2097 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2098 list(self.instance.secondary_nodes))
2101 def CheckPrereq(self):
2102 """Check prerequisites.
2104 This checks that the instance is in the cluster.
2107 instance = self.cfg.GetInstanceInfo(
2108 self.cfg.ExpandInstanceName(self.op.instance_name))
2109 if instance is None:
2110 raise errors.OpPrereqError("Instance '%s' not known" %
2111 self.op.instance_name)
2112 self.instance = instance
2114 def Exec(self, feedback_fn):
2115 """Shutdown the instance.
2118 instance = self.instance
2119 node_current = instance.primary_node
2120 if not rpc.call_instance_shutdown(node_current, instance):
2121 logger.Error("could not shutdown instance")
2123 self.cfg.MarkInstanceDown(instance.name)
2124 _ShutdownInstanceDisks(instance, self.cfg)
2127 class LUReinstallInstance(LogicalUnit):
2128 """Reinstall an instance.
2131 HPATH = "instance-reinstall"
2132 HTYPE = constants.HTYPE_INSTANCE
2133 _OP_REQP = ["instance_name"]
2135 def BuildHooksEnv(self):
2138 This runs on master, primary and secondary nodes of the instance.
2141 env = _BuildInstanceHookEnvByObject(self.instance)
2142 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2143 list(self.instance.secondary_nodes))
2146 def CheckPrereq(self):
2147 """Check prerequisites.
2149 This checks that the instance is in the cluster and is not running.
2152 instance = self.cfg.GetInstanceInfo(
2153 self.cfg.ExpandInstanceName(self.op.instance_name))
2154 if instance is None:
2155 raise errors.OpPrereqError("Instance '%s' not known" %
2156 self.op.instance_name)
2157 if instance.disk_template == constants.DT_DISKLESS:
2158 raise errors.OpPrereqError("Instance '%s' has no disks" %
2159 self.op.instance_name)
2160 if instance.status != "down":
2161 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2162 self.op.instance_name)
2163 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2165 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2166 (self.op.instance_name,
2167 instance.primary_node))
2169 self.op.os_type = getattr(self.op, "os_type", None)
2170 if self.op.os_type is not None:
2172 pnode = self.cfg.GetNodeInfo(
2173 self.cfg.ExpandNodeName(instance.primary_node))
2175 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2177 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2179 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2180 " primary node" % self.op.os_type)
2182 self.instance = instance
2184 def Exec(self, feedback_fn):
2185 """Reinstall the instance.
2188 inst = self.instance
2190 if self.op.os_type is not None:
2191 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2192 inst.os = self.op.os_type
2193 self.cfg.AddInstance(inst)
2195 _StartInstanceDisks(self.cfg, inst, None)
2197 feedback_fn("Running the instance OS create scripts...")
2198 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2199 raise errors.OpExecError("Could not install OS for instance %s"
2201 (inst.name, inst.primary_node))
2203 _ShutdownInstanceDisks(inst, self.cfg)
2206 class LURenameInstance(LogicalUnit):
2207 """Rename an instance.
2210 HPATH = "instance-rename"
2211 HTYPE = constants.HTYPE_INSTANCE
2212 _OP_REQP = ["instance_name", "new_name"]
2214 def BuildHooksEnv(self):
2217 This runs on master, primary and secondary nodes of the instance.
2220 env = _BuildInstanceHookEnvByObject(self.instance)
2221 env["INSTANCE_NEW_NAME"] = self.op.new_name
2222 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223 list(self.instance.secondary_nodes))
2226 def CheckPrereq(self):
2227 """Check prerequisites.
2229 This checks that the instance is in the cluster and is not running.
2232 instance = self.cfg.GetInstanceInfo(
2233 self.cfg.ExpandInstanceName(self.op.instance_name))
2234 if instance is None:
2235 raise errors.OpPrereqError("Instance '%s' not known" %
2236 self.op.instance_name)
2237 if instance.status != "down":
2238 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2239 self.op.instance_name)
2240 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2242 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2243 (self.op.instance_name,
2244 instance.primary_node))
2245 self.instance = instance
2247 # new name verification
2248 name_info = utils.HostInfo(self.op.new_name)
2250 self.op.new_name = new_name = name_info.name
2251 if not getattr(self.op, "ignore_ip", False):
2252 command = ["fping", "-q", name_info.ip]
2253 result = utils.RunCmd(command)
2254 if not result.failed:
2255 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2256 (name_info.ip, new_name))
2259 def Exec(self, feedback_fn):
2260 """Reinstall the instance.
2263 inst = self.instance
2264 old_name = inst.name
2266 self.cfg.RenameInstance(inst.name, self.op.new_name)
2268 # re-read the instance from the configuration after rename
2269 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2271 _StartInstanceDisks(self.cfg, inst, None)
2273 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2275 msg = ("Could run OS rename script for instance %s on node %s (but the"
2276 " instance has been renamed in Ganeti)" %
2277 (inst.name, inst.primary_node))
2280 _ShutdownInstanceDisks(inst, self.cfg)
2283 class LURemoveInstance(LogicalUnit):
2284 """Remove an instance.
2287 HPATH = "instance-remove"
2288 HTYPE = constants.HTYPE_INSTANCE
2289 _OP_REQP = ["instance_name"]
2291 def BuildHooksEnv(self):
2294 This runs on master, primary and secondary nodes of the instance.
2297 env = _BuildInstanceHookEnvByObject(self.instance)
2298 nl = [self.sstore.GetMasterNode()]
2301 def CheckPrereq(self):
2302 """Check prerequisites.
2304 This checks that the instance is in the cluster.
2307 instance = self.cfg.GetInstanceInfo(
2308 self.cfg.ExpandInstanceName(self.op.instance_name))
2309 if instance is None:
2310 raise errors.OpPrereqError("Instance '%s' not known" %
2311 self.op.instance_name)
2312 self.instance = instance
2314 def Exec(self, feedback_fn):
2315 """Remove the instance.
2318 instance = self.instance
2319 logger.Info("shutting down instance %s on node %s" %
2320 (instance.name, instance.primary_node))
2322 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2323 if self.op.ignore_failures:
2324 feedback_fn("Warning: can't shutdown instance")
2326 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2327 (instance.name, instance.primary_node))
2329 logger.Info("removing block devices for instance %s" % instance.name)
2331 if not _RemoveDisks(instance, self.cfg):
2332 if self.op.ignore_failures:
2333 feedback_fn("Warning: can't remove instance's disks")
2335 raise errors.OpExecError("Can't remove instance's disks")
2337 logger.Info("removing instance %s out of cluster config" % instance.name)
2339 self.cfg.RemoveInstance(instance.name)
2342 class LUQueryInstances(NoHooksLU):
2343 """Logical unit for querying instances.
2346 _OP_REQP = ["output_fields", "names"]
2348 def CheckPrereq(self):
2349 """Check prerequisites.
2351 This checks that the fields required are valid output fields.
2354 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2355 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2356 "admin_state", "admin_ram",
2357 "disk_template", "ip", "mac", "bridge",
2358 "sda_size", "sdb_size"],
2359 dynamic=self.dynamic_fields,
2360 selected=self.op.output_fields)
2362 self.wanted = _GetWantedInstances(self, self.op.names)
2364 def Exec(self, feedback_fn):
2365 """Computes the list of nodes and their attributes.
2368 instance_names = self.wanted
2369 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2372 # begin data gathering
2374 nodes = frozenset([inst.primary_node for inst in instance_list])
2377 if self.dynamic_fields.intersection(self.op.output_fields):
2379 node_data = rpc.call_all_instances_info(nodes)
2381 result = node_data[name]
2383 live_data.update(result)
2384 elif result == False:
2385 bad_nodes.append(name)
2386 # else no instance is alive
2388 live_data = dict([(name, {}) for name in instance_names])
2390 # end data gathering
2393 for instance in instance_list:
2395 for field in self.op.output_fields:
2400 elif field == "pnode":
2401 val = instance.primary_node
2402 elif field == "snodes":
2403 val = list(instance.secondary_nodes)
2404 elif field == "admin_state":
2405 val = (instance.status != "down")
2406 elif field == "oper_state":
2407 if instance.primary_node in bad_nodes:
2410 val = bool(live_data.get(instance.name))
2411 elif field == "admin_ram":
2412 val = instance.memory
2413 elif field == "oper_ram":
2414 if instance.primary_node in bad_nodes:
2416 elif instance.name in live_data:
2417 val = live_data[instance.name].get("memory", "?")
2420 elif field == "disk_template":
2421 val = instance.disk_template
2423 val = instance.nics[0].ip
2424 elif field == "bridge":
2425 val = instance.nics[0].bridge
2426 elif field == "mac":
2427 val = instance.nics[0].mac
2428 elif field == "sda_size" or field == "sdb_size":
2429 disk = instance.FindDisk(field[:3])
2435 raise errors.ParameterError(field)
2442 class LUFailoverInstance(LogicalUnit):
2443 """Failover an instance.
2446 HPATH = "instance-failover"
2447 HTYPE = constants.HTYPE_INSTANCE
2448 _OP_REQP = ["instance_name", "ignore_consistency"]
2450 def BuildHooksEnv(self):
2453 This runs on master, primary and secondary nodes of the instance.
2457 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2459 env.update(_BuildInstanceHookEnvByObject(self.instance))
2460 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2463 def CheckPrereq(self):
2464 """Check prerequisites.
2466 This checks that the instance is in the cluster.
2469 instance = self.cfg.GetInstanceInfo(
2470 self.cfg.ExpandInstanceName(self.op.instance_name))
2471 if instance is None:
2472 raise errors.OpPrereqError("Instance '%s' not known" %
2473 self.op.instance_name)
2475 if instance.disk_template not in constants.DTS_NET_MIRROR:
2476 raise errors.OpPrereqError("Instance's disk layout is not"
2477 " network mirrored, cannot failover.")
2479 secondary_nodes = instance.secondary_nodes
2480 if not secondary_nodes:
2481 raise errors.ProgrammerError("no secondary node but using "
2482 "DT_REMOTE_RAID1 template")
2484 # check memory requirements on the secondary node
2485 target_node = secondary_nodes[0]
2486 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2487 info = nodeinfo.get(target_node, None)
2489 raise errors.OpPrereqError("Cannot get current information"
2490 " from node '%s'" % nodeinfo)
2491 if instance.memory > info['memory_free']:
2492 raise errors.OpPrereqError("Not enough memory on target node %s."
2493 " %d MB available, %d MB required" %
2494 (target_node, info['memory_free'],
2497 # check bridge existance
2498 brlist = [nic.bridge for nic in instance.nics]
2499 if not rpc.call_bridges_exist(target_node, brlist):
2500 raise errors.OpPrereqError("One or more target bridges %s does not"
2501 " exist on destination node '%s'" %
2502 (brlist, target_node))
2504 self.instance = instance
2506 def Exec(self, feedback_fn):
2507 """Failover an instance.
2509 The failover is done by shutting it down on its present node and
2510 starting it on the secondary.
2513 instance = self.instance
2515 source_node = instance.primary_node
2516 target_node = instance.secondary_nodes[0]
2518 feedback_fn("* checking disk consistency between source and target")
2519 for dev in instance.disks:
2520 # for remote_raid1, these are md over drbd
2521 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2522 if not self.op.ignore_consistency:
2523 raise errors.OpExecError("Disk %s is degraded on target node,"
2524 " aborting failover." % dev.iv_name)
2526 feedback_fn("* checking target node resource availability")
2527 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2530 raise errors.OpExecError("Could not contact target node %s." %
2533 free_memory = int(nodeinfo[target_node]['memory_free'])
2534 memory = instance.memory
2535 if memory > free_memory:
2536 raise errors.OpExecError("Not enough memory to create instance %s on"
2537 " node %s. needed %s MiB, available %s MiB" %
2538 (instance.name, target_node, memory,
2541 feedback_fn("* shutting down instance on source node")
2542 logger.Info("Shutting down instance %s on node %s" %
2543 (instance.name, source_node))
2545 if not rpc.call_instance_shutdown(source_node, instance):
2546 if self.op.ignore_consistency:
2547 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2548 " anyway. Please make sure node %s is down" %
2549 (instance.name, source_node, source_node))
2551 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2552 (instance.name, source_node))
2554 feedback_fn("* deactivating the instance's disks on source node")
2555 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2556 raise errors.OpExecError("Can't shut down the instance's disks.")
2558 instance.primary_node = target_node
2559 # distribute new instance config to the other nodes
2560 self.cfg.AddInstance(instance)
2562 feedback_fn("* activating the instance's disks on target node")
2563 logger.Info("Starting instance %s on node %s" %
2564 (instance.name, target_node))
2566 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2567 ignore_secondaries=True)
2569 _ShutdownInstanceDisks(instance, self.cfg)
2570 raise errors.OpExecError("Can't activate the instance's disks")
2572 feedback_fn("* starting the instance on the target node")
2573 if not rpc.call_instance_start(target_node, instance, None):
2574 _ShutdownInstanceDisks(instance, self.cfg)
2575 raise errors.OpExecError("Could not start instance %s on node %s." %
2576 (instance.name, target_node))
2579 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2580 """Create a tree of block devices on the primary node.
2582 This always creates all devices.
2586 for child in device.children:
2587 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2590 cfg.SetDiskID(device, node)
2591 new_id = rpc.call_blockdev_create(node, device, device.size,
2592 instance.name, True, info)
2595 if device.physical_id is None:
2596 device.physical_id = new_id
2600 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2601 """Create a tree of block devices on a secondary node.
2603 If this device type has to be created on secondaries, create it and
2606 If not, just recurse to children keeping the same 'force' value.
2609 if device.CreateOnSecondary():
2612 for child in device.children:
2613 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2614 child, force, info):
2619 cfg.SetDiskID(device, node)
2620 new_id = rpc.call_blockdev_create(node, device, device.size,
2621 instance.name, False, info)
2624 if device.physical_id is None:
2625 device.physical_id = new_id
2629 def _GenerateUniqueNames(cfg, exts):
2630 """Generate a suitable LV name.
2632 This will generate a logical volume name for the given instance.
2637 new_id = cfg.GenerateUniqueID()
2638 results.append("%s%s" % (new_id, val))
2642 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2643 """Generate a drbd device complete with its children.
2646 port = cfg.AllocatePort()
2647 vgname = cfg.GetVGName()
2648 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2649 logical_id=(vgname, names[0]))
2650 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2651 logical_id=(vgname, names[1]))
2652 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2653 logical_id = (primary, secondary, port),
2654 children = [dev_data, dev_meta])
2658 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2659 """Generate a drbd8 device complete with its children.
2662 port = cfg.AllocatePort()
2663 vgname = cfg.GetVGName()
2664 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2665 logical_id=(vgname, names[0]))
2666 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2667 logical_id=(vgname, names[1]))
2668 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2669 logical_id = (primary, secondary, port),
2670 children = [dev_data, dev_meta],
2674 def _GenerateDiskTemplate(cfg, template_name,
2675 instance_name, primary_node,
2676 secondary_nodes, disk_sz, swap_sz):
2677 """Generate the entire disk layout for a given template type.
2680 #TODO: compute space requirements
2682 vgname = cfg.GetVGName()
2683 if template_name == "diskless":
2685 elif template_name == "plain":
2686 if len(secondary_nodes) != 0:
2687 raise errors.ProgrammerError("Wrong template configuration")
2689 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2690 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2691 logical_id=(vgname, names[0]),
2693 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2694 logical_id=(vgname, names[1]),
2696 disks = [sda_dev, sdb_dev]
2697 elif template_name == "local_raid1":
2698 if len(secondary_nodes) != 0:
2699 raise errors.ProgrammerError("Wrong template configuration")
2702 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2703 ".sdb_m1", ".sdb_m2"])
2704 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2705 logical_id=(vgname, names[0]))
2706 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2707 logical_id=(vgname, names[1]))
2708 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2710 children = [sda_dev_m1, sda_dev_m2])
2711 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2712 logical_id=(vgname, names[2]))
2713 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714 logical_id=(vgname, names[3]))
2715 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2717 children = [sdb_dev_m1, sdb_dev_m2])
2718 disks = [md_sda_dev, md_sdb_dev]
2719 elif template_name == constants.DT_REMOTE_RAID1:
2720 if len(secondary_nodes) != 1:
2721 raise errors.ProgrammerError("Wrong template configuration")
2722 remote_node = secondary_nodes[0]
2723 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2724 ".sdb_data", ".sdb_meta"])
2725 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2726 disk_sz, names[0:2])
2727 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2728 children = [drbd_sda_dev], size=disk_sz)
2729 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2730 swap_sz, names[2:4])
2731 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2732 children = [drbd_sdb_dev], size=swap_sz)
2733 disks = [md_sda_dev, md_sdb_dev]
2734 elif template_name == constants.DT_DRBD8:
2735 if len(secondary_nodes) != 1:
2736 raise errors.ProgrammerError("Wrong template configuration")
2737 remote_node = secondary_nodes[0]
2738 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2739 ".sdb_data", ".sdb_meta"])
2740 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2741 disk_sz, names[0:2], "sda")
2742 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2743 swap_sz, names[2:4], "sdb")
2744 disks = [drbd_sda_dev, drbd_sdb_dev]
2746 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2750 def _GetInstanceInfoText(instance):
2751 """Compute that text that should be added to the disk's metadata.
2754 return "originstname+%s" % instance.name
2757 def _CreateDisks(cfg, instance):
2758 """Create all disks for an instance.
2760 This abstracts away some work from AddInstance.
2763 instance: the instance object
2766 True or False showing the success of the creation process
2769 info = _GetInstanceInfoText(instance)
2771 for device in instance.disks:
2772 logger.Info("creating volume %s for instance %s" %
2773 (device.iv_name, instance.name))
2775 for secondary_node in instance.secondary_nodes:
2776 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777 device, False, info):
2778 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779 (device.iv_name, device, secondary_node))
2782 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783 instance, device, info):
2784 logger.Error("failed to create volume %s on primary!" %
2790 def _RemoveDisks(instance, cfg):
2791 """Remove all disks for an instance.
2793 This abstracts away some work from `AddInstance()` and
2794 `RemoveInstance()`. Note that in case some of the devices couldn't
2795 be removed, the removal will continue with the other ones (compare
2796 with `_CreateDisks()`).
2799 instance: the instance object
2802 True or False showing the success of the removal proces
2805 logger.Info("removing block devices for instance %s" % instance.name)
2808 for device in instance.disks:
2809 for node, disk in device.ComputeNodeTree(instance.primary_node):
2810 cfg.SetDiskID(disk, node)
2811 if not rpc.call_blockdev_remove(node, disk):
2812 logger.Error("could not remove block device %s on node %s,"
2813 " continuing anyway" %
2814 (device.iv_name, node))
2819 class LUCreateInstance(LogicalUnit):
2820 """Create an instance.
2823 HPATH = "instance-add"
2824 HTYPE = constants.HTYPE_INSTANCE
2825 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2826 "disk_template", "swap_size", "mode", "start", "vcpus",
2827 "wait_for_sync", "ip_check"]
2829 def BuildHooksEnv(self):
2832 This runs on master, primary and secondary nodes of the instance.
2836 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2837 "INSTANCE_DISK_SIZE": self.op.disk_size,
2838 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2839 "INSTANCE_ADD_MODE": self.op.mode,
2841 if self.op.mode == constants.INSTANCE_IMPORT:
2842 env["INSTANCE_SRC_NODE"] = self.op.src_node
2843 env["INSTANCE_SRC_PATH"] = self.op.src_path
2844 env["INSTANCE_SRC_IMAGE"] = self.src_image
2846 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2847 primary_node=self.op.pnode,
2848 secondary_nodes=self.secondaries,
2849 status=self.instance_status,
2850 os_type=self.op.os_type,
2851 memory=self.op.mem_size,
2852 vcpus=self.op.vcpus,
2853 nics=[(self.inst_ip, self.op.bridge)],
2856 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2861 def CheckPrereq(self):
2862 """Check prerequisites.
2865 if self.op.mode not in (constants.INSTANCE_CREATE,
2866 constants.INSTANCE_IMPORT):
2867 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2870 if self.op.mode == constants.INSTANCE_IMPORT:
2871 src_node = getattr(self.op, "src_node", None)
2872 src_path = getattr(self.op, "src_path", None)
2873 if src_node is None or src_path is None:
2874 raise errors.OpPrereqError("Importing an instance requires source"
2875 " node and path options")
2876 src_node_full = self.cfg.ExpandNodeName(src_node)
2877 if src_node_full is None:
2878 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2879 self.op.src_node = src_node = src_node_full
2881 if not os.path.isabs(src_path):
2882 raise errors.OpPrereqError("The source path must be absolute")
2884 export_info = rpc.call_export_info(src_node, src_path)
2887 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2889 if not export_info.has_section(constants.INISECT_EXP):
2890 raise errors.ProgrammerError("Corrupted export config")
2892 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2893 if (int(ei_version) != constants.EXPORT_VERSION):
2894 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2895 (ei_version, constants.EXPORT_VERSION))
2897 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2898 raise errors.OpPrereqError("Can't import instance with more than"
2901 # FIXME: are the old os-es, disk sizes, etc. useful?
2902 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2903 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2905 self.src_image = diskimage
2906 else: # INSTANCE_CREATE
2907 if getattr(self.op, "os_type", None) is None:
2908 raise errors.OpPrereqError("No guest OS specified")
2910 # check primary node
2911 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2913 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2915 self.op.pnode = pnode.name
2917 self.secondaries = []
2918 # disk template and mirror node verification
2919 if self.op.disk_template not in constants.DISK_TEMPLATES:
2920 raise errors.OpPrereqError("Invalid disk template name")
2922 if self.op.disk_template in constants.DTS_NET_MIRROR:
2923 if getattr(self.op, "snode", None) is None:
2924 raise errors.OpPrereqError("The networked disk templates need"
2927 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2928 if snode_name is None:
2929 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2931 elif snode_name == pnode.name:
2932 raise errors.OpPrereqError("The secondary node cannot be"
2933 " the primary node.")
2934 self.secondaries.append(snode_name)
2936 # Check lv size requirements
2937 nodenames = [pnode.name] + self.secondaries
2938 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2940 # Required free disk space as a function of disk and swap space
2942 constants.DT_DISKLESS: 0,
2943 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2944 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2945 # 256 MB are added for drbd metadata, 128MB for each drbd device
2946 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2947 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2950 if self.op.disk_template not in req_size_dict:
2951 raise errors.ProgrammerError("Disk template '%s' size requirement"
2952 " is unknown" % self.op.disk_template)
2954 req_size = req_size_dict[self.op.disk_template]
2956 for node in nodenames:
2957 info = nodeinfo.get(node, None)
2959 raise errors.OpPrereqError("Cannot get current information"
2960 " from node '%s'" % nodeinfo)
2961 if req_size > info['vg_free']:
2962 raise errors.OpPrereqError("Not enough disk space on target node %s."
2963 " %d MB available, %d MB required" %
2964 (node, info['vg_free'], req_size))
2967 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2969 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2970 " primary node" % self.op.os_type)
2972 # instance verification
2973 hostname1 = utils.HostInfo(self.op.instance_name)
2975 self.op.instance_name = instance_name = hostname1.name
2976 instance_list = self.cfg.GetInstanceList()
2977 if instance_name in instance_list:
2978 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2981 ip = getattr(self.op, "ip", None)
2982 if ip is None or ip.lower() == "none":
2984 elif ip.lower() == "auto":
2985 inst_ip = hostname1.ip
2987 if not utils.IsValidIP(ip):
2988 raise errors.OpPrereqError("given IP address '%s' doesn't look"
2989 " like a valid IP" % ip)
2991 self.inst_ip = inst_ip
2993 if self.op.start and not self.op.ip_check:
2994 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2995 " adding an instance in start mode")
2997 if self.op.ip_check:
2998 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2999 constants.DEFAULT_NODED_PORT):
3000 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3001 (hostname1.ip, instance_name))
3003 # bridge verification
3004 bridge = getattr(self.op, "bridge", None)
3006 self.op.bridge = self.cfg.GetDefBridge()
3008 self.op.bridge = bridge
3010 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3011 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3012 " destination node '%s'" %
3013 (self.op.bridge, pnode.name))
3016 self.instance_status = 'up'
3018 self.instance_status = 'down'
3020 def Exec(self, feedback_fn):
3021 """Create and add the instance to the cluster.
3024 instance = self.op.instance_name
3025 pnode_name = self.pnode.name
3027 nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3028 if self.inst_ip is not None:
3029 nic.ip = self.inst_ip
3031 network_port = None # placeholder assignment for later
3033 disks = _GenerateDiskTemplate(self.cfg,
3034 self.op.disk_template,
3035 instance, pnode_name,
3036 self.secondaries, self.op.disk_size,
3039 iobj = objects.Instance(name=instance, os=self.op.os_type,
3040 primary_node=pnode_name,
3041 memory=self.op.mem_size,
3042 vcpus=self.op.vcpus,
3043 nics=[nic], disks=disks,
3044 disk_template=self.op.disk_template,
3045 status=self.instance_status,
3046 network_port=network_port,
3049 feedback_fn("* creating instance disks...")
3050 if not _CreateDisks(self.cfg, iobj):
3051 _RemoveDisks(iobj, self.cfg)
3052 raise errors.OpExecError("Device creation failed, reverting...")
3054 feedback_fn("adding instance %s to cluster config" % instance)
3056 self.cfg.AddInstance(iobj)
3058 if self.op.wait_for_sync:
3059 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3060 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3061 # make sure the disks are not degraded (still sync-ing is ok)
3063 feedback_fn("* checking mirrors status")
3064 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3069 _RemoveDisks(iobj, self.cfg)
3070 self.cfg.RemoveInstance(iobj.name)
3071 raise errors.OpExecError("There are some degraded disks for"
3074 feedback_fn("creating os for instance %s on node %s" %
3075 (instance, pnode_name))
3077 if iobj.disk_template != constants.DT_DISKLESS:
3078 if self.op.mode == constants.INSTANCE_CREATE:
3079 feedback_fn("* running the instance OS create scripts...")
3080 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3081 raise errors.OpExecError("could not add os for instance %s"
3083 (instance, pnode_name))
3085 elif self.op.mode == constants.INSTANCE_IMPORT:
3086 feedback_fn("* running the instance OS import scripts...")
3087 src_node = self.op.src_node
3088 src_image = self.src_image
3089 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3090 src_node, src_image):
3091 raise errors.OpExecError("Could not import os for instance"
3093 (instance, pnode_name))
3095 # also checked in the prereq part
3096 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3100 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3101 feedback_fn("* starting instance...")
3102 if not rpc.call_instance_start(pnode_name, iobj, None):
3103 raise errors.OpExecError("Could not start instance")
3106 class LUConnectConsole(NoHooksLU):
3107 """Connect to an instance's console.
3109 This is somewhat special in that it returns the command line that
3110 you need to run on the master node in order to connect to the
3114 _OP_REQP = ["instance_name"]
3116 def CheckPrereq(self):
3117 """Check prerequisites.
3119 This checks that the instance is in the cluster.
3122 instance = self.cfg.GetInstanceInfo(
3123 self.cfg.ExpandInstanceName(self.op.instance_name))
3124 if instance is None:
3125 raise errors.OpPrereqError("Instance '%s' not known" %
3126 self.op.instance_name)
3127 self.instance = instance
3129 def Exec(self, feedback_fn):
3130 """Connect to the console of an instance
3133 instance = self.instance
3134 node = instance.primary_node
3136 node_insts = rpc.call_instance_list([node])[node]
3137 if node_insts is False:
3138 raise errors.OpExecError("Can't connect to node %s." % node)
3140 if instance.name not in node_insts:
3141 raise errors.OpExecError("Instance %s is not running." % instance.name)
3143 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3145 hyper = hypervisor.GetHypervisor()
3146 console_cmd = hyper.GetShellCommandForConsole(instance.name)
3148 argv = ["ssh", "-q", "-t"]
3149 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3150 argv.extend(ssh.BATCH_MODE_OPTS)
3152 argv.append(console_cmd)
3156 class LUAddMDDRBDComponent(LogicalUnit):
3157 """Adda new mirror member to an instance's disk.
3160 HPATH = "mirror-add"
3161 HTYPE = constants.HTYPE_INSTANCE
3162 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3164 def BuildHooksEnv(self):
3167 This runs on the master, the primary and all the secondaries.
3171 "NEW_SECONDARY": self.op.remote_node,
3172 "DISK_NAME": self.op.disk_name,
3174 env.update(_BuildInstanceHookEnvByObject(self.instance))
3175 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3176 self.op.remote_node,] + list(self.instance.secondary_nodes)
3179 def CheckPrereq(self):
3180 """Check prerequisites.
3182 This checks that the instance is in the cluster.
3185 instance = self.cfg.GetInstanceInfo(
3186 self.cfg.ExpandInstanceName(self.op.instance_name))
3187 if instance is None:
3188 raise errors.OpPrereqError("Instance '%s' not known" %
3189 self.op.instance_name)
3190 self.instance = instance
3192 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3193 if remote_node is None:
3194 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3195 self.remote_node = remote_node
3197 if remote_node == instance.primary_node:
3198 raise errors.OpPrereqError("The specified node is the primary node of"
3201 if instance.disk_template != constants.DT_REMOTE_RAID1:
3202 raise errors.OpPrereqError("Instance's disk layout is not"
3204 for disk in instance.disks:
3205 if disk.iv_name == self.op.disk_name:
3208 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3209 " instance." % self.op.disk_name)
3210 if len(disk.children) > 1:
3211 raise errors.OpPrereqError("The device already has two slave devices."
3212 " This would create a 3-disk raid1 which we"
3216 def Exec(self, feedback_fn):
3217 """Add the mirror component
3221 instance = self.instance
3223 remote_node = self.remote_node
3224 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3225 names = _GenerateUniqueNames(self.cfg, lv_names)
3226 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3227 remote_node, disk.size, names)
3229 logger.Info("adding new mirror component on secondary")
3231 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3233 _GetInstanceInfoText(instance)):
3234 raise errors.OpExecError("Failed to create new component on secondary"
3235 " node %s" % remote_node)
3237 logger.Info("adding new mirror component on primary")
3239 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3241 _GetInstanceInfoText(instance)):
3242 # remove secondary dev
3243 self.cfg.SetDiskID(new_drbd, remote_node)
3244 rpc.call_blockdev_remove(remote_node, new_drbd)
3245 raise errors.OpExecError("Failed to create volume on primary")
3247 # the device exists now
3248 # call the primary node to add the mirror to md
3249 logger.Info("adding new mirror component to md")
3250 if not rpc.call_blockdev_addchildren(instance.primary_node,
3252 logger.Error("Can't add mirror compoment to md!")
3253 self.cfg.SetDiskID(new_drbd, remote_node)
3254 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3255 logger.Error("Can't rollback on secondary")
3256 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3257 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3258 logger.Error("Can't rollback on primary")
3259 raise errors.OpExecError("Can't add mirror component to md array")
3261 disk.children.append(new_drbd)
3263 self.cfg.AddInstance(instance)
3265 _WaitForSync(self.cfg, instance, self.proc)
3270 class LURemoveMDDRBDComponent(LogicalUnit):
3271 """Remove a component from a remote_raid1 disk.
3274 HPATH = "mirror-remove"
3275 HTYPE = constants.HTYPE_INSTANCE
3276 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3278 def BuildHooksEnv(self):
3281 This runs on the master, the primary and all the secondaries.
3285 "DISK_NAME": self.op.disk_name,
3286 "DISK_ID": self.op.disk_id,
3287 "OLD_SECONDARY": self.old_secondary,
3289 env.update(_BuildInstanceHookEnvByObject(self.instance))
3290 nl = [self.sstore.GetMasterNode(),
3291 self.instance.primary_node] + list(self.instance.secondary_nodes)
3294 def CheckPrereq(self):
3295 """Check prerequisites.
3297 This checks that the instance is in the cluster.
3300 instance = self.cfg.GetInstanceInfo(
3301 self.cfg.ExpandInstanceName(self.op.instance_name))
3302 if instance is None:
3303 raise errors.OpPrereqError("Instance '%s' not known" %
3304 self.op.instance_name)
3305 self.instance = instance
3307 if instance.disk_template != constants.DT_REMOTE_RAID1:
3308 raise errors.OpPrereqError("Instance's disk layout is not"
3310 for disk in instance.disks:
3311 if disk.iv_name == self.op.disk_name:
3314 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3315 " instance." % self.op.disk_name)
3316 for child in disk.children:
3317 if (child.dev_type == constants.LD_DRBD7 and
3318 child.logical_id[2] == self.op.disk_id):
3321 raise errors.OpPrereqError("Can't find the device with this port.")
3323 if len(disk.children) < 2:
3324 raise errors.OpPrereqError("Cannot remove the last component from"
3328 if self.child.logical_id[0] == instance.primary_node:
3332 self.old_secondary = self.child.logical_id[oid]
3334 def Exec(self, feedback_fn):
3335 """Remove the mirror component
3338 instance = self.instance
3341 logger.Info("remove mirror component")
3342 self.cfg.SetDiskID(disk, instance.primary_node)
3343 if not rpc.call_blockdev_removechildren(instance.primary_node,
3345 raise errors.OpExecError("Can't remove child from mirror.")
3347 for node in child.logical_id[:2]:
3348 self.cfg.SetDiskID(child, node)
3349 if not rpc.call_blockdev_remove(node, child):
3350 logger.Error("Warning: failed to remove device from node %s,"
3351 " continuing operation." % node)
3353 disk.children.remove(child)
3354 self.cfg.AddInstance(instance)
3357 class LUReplaceDisks(LogicalUnit):
3358 """Replace the disks of an instance.
3361 HPATH = "mirrors-replace"
3362 HTYPE = constants.HTYPE_INSTANCE
3363 _OP_REQP = ["instance_name", "mode", "disks"]
3365 def BuildHooksEnv(self):
3368 This runs on the master, the primary and all the secondaries.
3372 "MODE": self.op.mode,
3373 "NEW_SECONDARY": self.op.remote_node,
3374 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3376 env.update(_BuildInstanceHookEnvByObject(self.instance))
3378 self.sstore.GetMasterNode(),
3379 self.instance.primary_node,
3381 if self.op.remote_node is not None:
3382 nl.append(self.op.remote_node)
3385 def CheckPrereq(self):
3386 """Check prerequisites.
3388 This checks that the instance is in the cluster.
3391 instance = self.cfg.GetInstanceInfo(
3392 self.cfg.ExpandInstanceName(self.op.instance_name))
3393 if instance is None:
3394 raise errors.OpPrereqError("Instance '%s' not known" %
3395 self.op.instance_name)
3396 self.instance = instance
3397 self.op.instance_name = instance.name
3399 if instance.disk_template not in constants.DTS_NET_MIRROR:
3400 raise errors.OpPrereqError("Instance's disk layout is not"
3401 " network mirrored.")
3403 if len(instance.secondary_nodes) != 1:
3404 raise errors.OpPrereqError("The instance has a strange layout,"
3405 " expected one secondary but found %d" %
3406 len(instance.secondary_nodes))
3408 self.sec_node = instance.secondary_nodes[0]
3410 remote_node = getattr(self.op, "remote_node", None)
3411 if remote_node is not None:
3412 remote_node = self.cfg.ExpandNodeName(remote_node)
3413 if remote_node is None:
3414 raise errors.OpPrereqError("Node '%s' not known" %
3415 self.op.remote_node)
3416 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3418 self.remote_node_info = None
3419 if remote_node == instance.primary_node:
3420 raise errors.OpPrereqError("The specified node is the primary node of"
3422 elif remote_node == self.sec_node:
3423 if self.op.mode == constants.REPLACE_DISK_SEC:
3424 # this is for DRBD8, where we can't execute the same mode of
3425 # replacement as for drbd7 (no different port allocated)
3426 raise errors.OpPrereqError("Same secondary given, cannot execute"
3428 # the user gave the current secondary, switch to
3429 # 'no-replace-secondary' mode for drbd7
3431 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3432 self.op.mode != constants.REPLACE_DISK_ALL):
3433 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3434 " disks replacement, not individual ones")
3435 if instance.disk_template == constants.DT_DRBD8:
3436 if (self.op.mode == constants.REPLACE_DISK_ALL and
3437 remote_node is not None):
3438 # switch to replace secondary mode
3439 self.op.mode = constants.REPLACE_DISK_SEC
3441 if self.op.mode == constants.REPLACE_DISK_ALL:
3442 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3443 " secondary disk replacement, not"
3445 elif self.op.mode == constants.REPLACE_DISK_PRI:
3446 if remote_node is not None:
3447 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3448 " the secondary while doing a primary"
3449 " node disk replacement")
3450 self.tgt_node = instance.primary_node
3451 self.oth_node = instance.secondary_nodes[0]
3452 elif self.op.mode == constants.REPLACE_DISK_SEC:
3453 self.new_node = remote_node # this can be None, in which case
3454 # we don't change the secondary
3455 self.tgt_node = instance.secondary_nodes[0]
3456 self.oth_node = instance.primary_node
3458 raise errors.ProgrammerError("Unhandled disk replace mode")
3460 for name in self.op.disks:
3461 if instance.FindDisk(name) is None:
3462 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3463 (name, instance.name))
3464 self.op.remote_node = remote_node
3466 def _ExecRR1(self, feedback_fn):
3467 """Replace the disks of an instance.
3470 instance = self.instance
3473 if self.op.remote_node is None:
3474 remote_node = self.sec_node
3476 remote_node = self.op.remote_node
3478 for dev in instance.disks:
3480 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3481 names = _GenerateUniqueNames(cfg, lv_names)
3482 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3483 remote_node, size, names)
3484 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3485 logger.Info("adding new mirror component on secondary for %s" %
3488 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3490 _GetInstanceInfoText(instance)):
3491 raise errors.OpExecError("Failed to create new component on secondary"
3492 " node %s. Full abort, cleanup manually!" %
3495 logger.Info("adding new mirror component on primary")
3497 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3499 _GetInstanceInfoText(instance)):
3500 # remove secondary dev
3501 cfg.SetDiskID(new_drbd, remote_node)
3502 rpc.call_blockdev_remove(remote_node, new_drbd)
3503 raise errors.OpExecError("Failed to create volume on primary!"
3504 " Full abort, cleanup manually!!")
3506 # the device exists now
3507 # call the primary node to add the mirror to md
3508 logger.Info("adding new mirror component to md")
3509 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3511 logger.Error("Can't add mirror compoment to md!")
3512 cfg.SetDiskID(new_drbd, remote_node)
3513 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3514 logger.Error("Can't rollback on secondary")
3515 cfg.SetDiskID(new_drbd, instance.primary_node)
3516 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3517 logger.Error("Can't rollback on primary")
3518 raise errors.OpExecError("Full abort, cleanup manually!!")
3520 dev.children.append(new_drbd)
3521 cfg.AddInstance(instance)
3523 # this can fail as the old devices are degraded and _WaitForSync
3524 # does a combined result over all disks, so we don't check its
3526 _WaitForSync(cfg, instance, self.proc, unlock=True)
3528 # so check manually all the devices
3529 for name in iv_names:
3530 dev, child, new_drbd = iv_names[name]
3531 cfg.SetDiskID(dev, instance.primary_node)
3532 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3534 raise errors.OpExecError("MD device %s is degraded!" % name)
3535 cfg.SetDiskID(new_drbd, instance.primary_node)
3536 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3538 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3540 for name in iv_names:
3541 dev, child, new_drbd = iv_names[name]
3542 logger.Info("remove mirror %s component" % name)
3543 cfg.SetDiskID(dev, instance.primary_node)
3544 if not rpc.call_blockdev_removechildren(instance.primary_node,
3546 logger.Error("Can't remove child from mirror, aborting"
3547 " *this device cleanup*.\nYou need to cleanup manually!!")
3550 for node in child.logical_id[:2]:
3551 logger.Info("remove child device on %s" % node)
3552 cfg.SetDiskID(child, node)
3553 if not rpc.call_blockdev_remove(node, child):
3554 logger.Error("Warning: failed to remove device from node %s,"
3555 " continuing operation." % node)
3557 dev.children.remove(child)
3559 cfg.AddInstance(instance)
3561 def _ExecD8DiskOnly(self, feedback_fn):
3562 """Replace a disk on the primary or secondary for dbrd8.
3564 The algorithm for replace is quite complicated:
3565 - for each disk to be replaced:
3566 - create new LVs on the target node with unique names
3567 - detach old LVs from the drbd device
3568 - rename old LVs to name_replaced.<time_t>
3569 - rename new LVs to old LVs
3570 - attach the new LVs (with the old names now) to the drbd device
3571 - wait for sync across all devices
3572 - for each modified disk:
3573 - remove old LVs (which have the name name_replaces.<time_t>)
3575 Failures are not very well handled.
3579 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3580 instance = self.instance
3582 vgname = self.cfg.GetVGName()
3585 tgt_node = self.tgt_node
3586 oth_node = self.oth_node
3588 # Step: check device activation
3589 self.proc.LogStep(1, steps_total, "check device existence")
3590 info("checking volume groups")
3591 my_vg = cfg.GetVGName()
3592 results = rpc.call_vg_list([oth_node, tgt_node])
3594 raise errors.OpExecError("Can't list volume groups on the nodes")
3595 for node in oth_node, tgt_node:
3596 res = results.get(node, False)
3597 if not res or my_vg not in res:
3598 raise errors.OpExecError("Volume group '%s' not found on %s" %
3600 for dev in instance.disks:
3601 if not dev.iv_name in self.op.disks:
3603 for node in tgt_node, oth_node:
3604 info("checking %s on %s" % (dev.iv_name, node))
3605 cfg.SetDiskID(dev, node)
3606 if not rpc.call_blockdev_find(node, dev):
3607 raise errors.OpExecError("Can't find device %s on node %s" %
3608 (dev.iv_name, node))
3610 # Step: check other node consistency
3611 self.proc.LogStep(2, steps_total, "check peer consistency")
3612 for dev in instance.disks:
3613 if not dev.iv_name in self.op.disks:
3615 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3616 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3617 oth_node==instance.primary_node):
3618 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3619 " to replace disks on this node (%s)" %
3620 (oth_node, tgt_node))
3622 # Step: create new storage
3623 self.proc.LogStep(3, steps_total, "allocate new storage")
3624 for dev in instance.disks:
3625 if not dev.iv_name in self.op.disks:
3628 cfg.SetDiskID(dev, tgt_node)
3629 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3630 names = _GenerateUniqueNames(cfg, lv_names)
3631 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3632 logical_id=(vgname, names[0]))
3633 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3634 logical_id=(vgname, names[1]))
3635 new_lvs = [lv_data, lv_meta]
3636 old_lvs = dev.children
3637 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3638 info("creating new local storage on %s for %s" %
3639 (tgt_node, dev.iv_name))
3640 # since we *always* want to create this LV, we use the
3641 # _Create...OnPrimary (which forces the creation), even if we
3642 # are talking about the secondary node
3643 for new_lv in new_lvs:
3644 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3645 _GetInstanceInfoText(instance)):
3646 raise errors.OpExecError("Failed to create new LV named '%s' on"
3648 (new_lv.logical_id[1], tgt_node))
3650 # Step: for each lv, detach+rename*2+attach
3651 self.proc.LogStep(4, steps_total, "change drbd configuration")
3652 for dev, old_lvs, new_lvs in iv_names.itervalues():
3653 info("detaching %s drbd from local storage" % dev.iv_name)
3654 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3655 raise errors.OpExecError("Can't detach drbd from local storage on node"
3656 " %s for device %s" % (tgt_node, dev.iv_name))
3658 #cfg.Update(instance)
3660 # ok, we created the new LVs, so now we know we have the needed
3661 # storage; as such, we proceed on the target node to rename
3662 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3663 # using the assumption than logical_id == physical_id (which in
3664 # turn is the unique_id on that node)
3666 # FIXME(iustin): use a better name for the replaced LVs
3667 temp_suffix = int(time.time())
3668 ren_fn = lambda d, suff: (d.physical_id[0],
3669 d.physical_id[1] + "_replaced-%s" % suff)
3670 # build the rename list based on what LVs exist on the node
3672 for to_ren in old_lvs:
3673 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3674 if find_res is not None: # device exists
3675 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3677 info("renaming the old LVs on the target node")
3678 if not rpc.call_blockdev_rename(tgt_node, rlist):
3679 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3680 # now we rename the new LVs to the old LVs
3681 info("renaming the new LVs on the target node")
3682 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3683 if not rpc.call_blockdev_rename(tgt_node, rlist):
3684 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3686 for old, new in zip(old_lvs, new_lvs):
3687 new.logical_id = old.logical_id
3688 cfg.SetDiskID(new, tgt_node)
3690 for disk in old_lvs:
3691 disk.logical_id = ren_fn(disk, temp_suffix)
3692 cfg.SetDiskID(disk, tgt_node)
3694 # now that the new lvs have the old name, we can add them to the device
3695 info("adding new mirror component on %s" % tgt_node)
3696 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3697 for new_lv in new_lvs:
3698 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3699 warning("Can't rollback device %s", "manually cleanup unused"
3701 raise errors.OpExecError("Can't add local storage to drbd")
3703 dev.children = new_lvs
3704 cfg.Update(instance)
3706 # Step: wait for sync
3708 # this can fail as the old devices are degraded and _WaitForSync
3709 # does a combined result over all disks, so we don't check its
3711 self.proc.LogStep(5, steps_total, "sync devices")
3712 _WaitForSync(cfg, instance, self.proc, unlock=True)
3714 # so check manually all the devices
3715 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3716 cfg.SetDiskID(dev, instance.primary_node)
3717 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3719 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3721 # Step: remove old storage
3722 self.proc.LogStep(6, steps_total, "removing old storage")
3723 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724 info("remove logical volumes for %s" % name)
3726 cfg.SetDiskID(lv, tgt_node)
3727 if not rpc.call_blockdev_remove(tgt_node, lv):
3728 warning("Can't remove old LV", "manually remove unused LVs")
3731 def _ExecD8Secondary(self, feedback_fn):
3732 """Replace the secondary node for drbd8.
3734 The algorithm for replace is quite complicated:
3735 - for all disks of the instance:
3736 - create new LVs on the new node with same names
3737 - shutdown the drbd device on the old secondary
3738 - disconnect the drbd network on the primary
3739 - create the drbd device on the new secondary
3740 - network attach the drbd on the primary, using an artifice:
3741 the drbd code for Attach() will connect to the network if it
3742 finds a device which is connected to the good local disks but
3744 - wait for sync across all devices
3745 - remove all disks from the old secondary
3747 Failures are not very well handled.
3751 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3752 instance = self.instance
3754 vgname = self.cfg.GetVGName()
3757 old_node = self.tgt_node
3758 new_node = self.new_node
3759 pri_node = instance.primary_node
3761 # Step: check device activation
3762 self.proc.LogStep(1, steps_total, "check device existence")
3763 info("checking volume groups")
3764 my_vg = cfg.GetVGName()
3765 results = rpc.call_vg_list([pri_node, new_node])
3767 raise errors.OpExecError("Can't list volume groups on the nodes")
3768 for node in pri_node, new_node:
3769 res = results.get(node, False)
3770 if not res or my_vg not in res:
3771 raise errors.OpExecError("Volume group '%s' not found on %s" %
3773 for dev in instance.disks:
3774 if not dev.iv_name in self.op.disks:
3776 info("checking %s on %s" % (dev.iv_name, pri_node))
3777 cfg.SetDiskID(dev, pri_node)
3778 if not rpc.call_blockdev_find(pri_node, dev):
3779 raise errors.OpExecError("Can't find device %s on node %s" %
3780 (dev.iv_name, pri_node))
3782 # Step: check other node consistency
3783 self.proc.LogStep(2, steps_total, "check peer consistency")
3784 for dev in instance.disks:
3785 if not dev.iv_name in self.op.disks:
3787 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3788 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3789 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3790 " unsafe to replace the secondary" %
3793 # Step: create new storage
3794 self.proc.LogStep(3, steps_total, "allocate new storage")
3795 for dev in instance.disks:
3797 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3798 # since we *always* want to create this LV, we use the
3799 # _Create...OnPrimary (which forces the creation), even if we
3800 # are talking about the secondary node
3801 for new_lv in dev.children:
3802 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3803 _GetInstanceInfoText(instance)):
3804 raise errors.OpExecError("Failed to create new LV named '%s' on"
3806 (new_lv.logical_id[1], new_node))
3808 iv_names[dev.iv_name] = (dev, dev.children)
3810 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3811 for dev in instance.disks:
3813 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3814 # create new devices on new_node
3815 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3816 logical_id=(pri_node, new_node,
3818 children=dev.children)
3819 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3821 _GetInstanceInfoText(instance)):
3822 raise errors.OpExecError("Failed to create new DRBD on"
3823 " node '%s'" % new_node)
3825 for dev in instance.disks:
3826 # we have new devices, shutdown the drbd on the old secondary
3827 info("shutting down drbd for %s on old node" % dev.iv_name)
3828 cfg.SetDiskID(dev, old_node)
3829 if not rpc.call_blockdev_shutdown(old_node, dev):
3830 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3831 "Please cleanup this device manually as soon as possible")
3833 info("detaching primary drbds from the network (=> standalone)")
3835 for dev in instance.disks:
3836 cfg.SetDiskID(dev, pri_node)
3837 # set the physical (unique in bdev terms) id to None, meaning
3838 # detach from network
3839 dev.physical_id = (None,) * len(dev.physical_id)
3840 # and 'find' the device, which will 'fix' it to match the
3842 if rpc.call_blockdev_find(pri_node, dev):
3845 warning("Failed to detach drbd %s from network, unusual case" %
3849 # no detaches succeeded (very unlikely)
3850 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3852 # if we managed to detach at least one, we update all the disks of
3853 # the instance to point to the new secondary
3854 info("updating instance configuration")
3855 for dev in instance.disks:
3856 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3857 cfg.SetDiskID(dev, pri_node)
3858 cfg.Update(instance)
3860 # and now perform the drbd attach
3861 info("attaching primary drbds to new secondary (standalone => connected)")
3863 for dev in instance.disks:
3864 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3865 # since the attach is smart, it's enough to 'find' the device,
3866 # it will automatically activate the network, if the physical_id
3868 cfg.SetDiskID(dev, pri_node)
3869 if not rpc.call_blockdev_find(pri_node, dev):
3870 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3871 "please do a gnt-instance info to see the status of disks")
3873 # this can fail as the old devices are degraded and _WaitForSync
3874 # does a combined result over all disks, so we don't check its
3876 self.proc.LogStep(5, steps_total, "sync devices")
3877 _WaitForSync(cfg, instance, self.proc, unlock=True)
3879 # so check manually all the devices
3880 for name, (dev, old_lvs) in iv_names.iteritems():
3881 cfg.SetDiskID(dev, pri_node)
3882 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3884 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3886 self.proc.LogStep(6, steps_total, "removing old storage")
3887 for name, (dev, old_lvs) in iv_names.iteritems():
3888 info("remove logical volumes for %s" % name)
3890 cfg.SetDiskID(lv, old_node)
3891 if not rpc.call_blockdev_remove(old_node, lv):
3892 warning("Can't remove LV on old secondary",
3893 "Cleanup stale volumes by hand")
3895 def Exec(self, feedback_fn):
3896 """Execute disk replacement.
3898 This dispatches the disk replacement to the appropriate handler.
3901 instance = self.instance
3902 if instance.disk_template == constants.DT_REMOTE_RAID1:
3904 elif instance.disk_template == constants.DT_DRBD8:
3905 if self.op.remote_node is None:
3906 fn = self._ExecD8DiskOnly
3908 fn = self._ExecD8Secondary
3910 raise errors.ProgrammerError("Unhandled disk replacement case")
3911 return fn(feedback_fn)
3914 class LUQueryInstanceData(NoHooksLU):
3915 """Query runtime instance data.
3918 _OP_REQP = ["instances"]
3920 def CheckPrereq(self):
3921 """Check prerequisites.
3923 This only checks the optional instance list against the existing names.
3926 if not isinstance(self.op.instances, list):
3927 raise errors.OpPrereqError("Invalid argument type 'instances'")
3928 if self.op.instances:
3929 self.wanted_instances = []
3930 names = self.op.instances
3932 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3933 if instance is None:
3934 raise errors.OpPrereqError("No such instance name '%s'" % name)
3935 self.wanted_instances.append(instance)
3937 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3938 in self.cfg.GetInstanceList()]
3942 def _ComputeDiskStatus(self, instance, snode, dev):
3943 """Compute block device status.
3946 self.cfg.SetDiskID(dev, instance.primary_node)
3947 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3948 if dev.dev_type in constants.LDS_DRBD:
3949 # we change the snode then (otherwise we use the one passed in)
3950 if dev.logical_id[0] == instance.primary_node:
3951 snode = dev.logical_id[1]
3953 snode = dev.logical_id[0]
3956 self.cfg.SetDiskID(dev, snode)
3957 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3962 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3963 for child in dev.children]
3968 "iv_name": dev.iv_name,
3969 "dev_type": dev.dev_type,
3970 "logical_id": dev.logical_id,
3971 "physical_id": dev.physical_id,
3972 "pstatus": dev_pstatus,
3973 "sstatus": dev_sstatus,
3974 "children": dev_children,
3979 def Exec(self, feedback_fn):
3980 """Gather and return data"""
3982 for instance in self.wanted_instances:
3983 remote_info = rpc.call_instance_info(instance.primary_node,
3985 if remote_info and "state" in remote_info:
3988 remote_state = "down"
3989 if instance.status == "down":
3990 config_state = "down"
3994 disks = [self._ComputeDiskStatus(instance, None, device)
3995 for device in instance.disks]
3998 "name": instance.name,
3999 "config_state": config_state,
4000 "run_state": remote_state,
4001 "pnode": instance.primary_node,
4002 "snodes": instance.secondary_nodes,
4004 "memory": instance.memory,
4005 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4007 "network_port": instance.network_port,
4008 "vcpus": instance.vcpus,
4011 result[instance.name] = idict
4016 class LUSetInstanceParms(LogicalUnit):
4017 """Modifies an instances's parameters.
4020 HPATH = "instance-modify"
4021 HTYPE = constants.HTYPE_INSTANCE
4022 _OP_REQP = ["instance_name"]
4024 def BuildHooksEnv(self):
4027 This runs on the master, primary and secondaries.
4032 args['memory'] = self.mem
4034 args['vcpus'] = self.vcpus
4035 if self.do_ip or self.do_bridge:
4039 ip = self.instance.nics[0].ip
4041 bridge = self.bridge
4043 bridge = self.instance.nics[0].bridge
4044 args['nics'] = [(ip, bridge)]
4045 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4046 nl = [self.sstore.GetMasterNode(),
4047 self.instance.primary_node] + list(self.instance.secondary_nodes)
4050 def CheckPrereq(self):
4051 """Check prerequisites.
4053 This only checks the instance list against the existing names.
4056 self.mem = getattr(self.op, "mem", None)
4057 self.vcpus = getattr(self.op, "vcpus", None)
4058 self.ip = getattr(self.op, "ip", None)
4059 self.bridge = getattr(self.op, "bridge", None)
4060 if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4061 raise errors.OpPrereqError("No changes submitted")
4062 if self.mem is not None:
4064 self.mem = int(self.mem)
4065 except ValueError, err:
4066 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4067 if self.vcpus is not None:
4069 self.vcpus = int(self.vcpus)
4070 except ValueError, err:
4071 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4072 if self.ip is not None:
4074 if self.ip.lower() == "none":
4077 if not utils.IsValidIP(self.ip):
4078 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4081 self.do_bridge = (self.bridge is not None)
4083 instance = self.cfg.GetInstanceInfo(
4084 self.cfg.ExpandInstanceName(self.op.instance_name))
4085 if instance is None:
4086 raise errors.OpPrereqError("No such instance name '%s'" %
4087 self.op.instance_name)
4088 self.op.instance_name = instance.name
4089 self.instance = instance
4092 def Exec(self, feedback_fn):
4093 """Modifies an instance.
4095 All parameters take effect only at the next restart of the instance.
4098 instance = self.instance
4100 instance.memory = self.mem
4101 result.append(("mem", self.mem))
4103 instance.vcpus = self.vcpus
4104 result.append(("vcpus", self.vcpus))
4106 instance.nics[0].ip = self.ip
4107 result.append(("ip", self.ip))
4109 instance.nics[0].bridge = self.bridge
4110 result.append(("bridge", self.bridge))
4112 self.cfg.AddInstance(instance)
4117 class LUQueryExports(NoHooksLU):
4118 """Query the exports list
4123 def CheckPrereq(self):
4124 """Check that the nodelist contains only existing nodes.
4127 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4129 def Exec(self, feedback_fn):
4130 """Compute the list of all the exported system images.
4133 a dictionary with the structure node->(export-list)
4134 where export-list is a list of the instances exported on
4138 return rpc.call_export_list(self.nodes)
4141 class LUExportInstance(LogicalUnit):
4142 """Export an instance to an image in the cluster.
4145 HPATH = "instance-export"
4146 HTYPE = constants.HTYPE_INSTANCE
4147 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4149 def BuildHooksEnv(self):
4152 This will run on the master, primary node and target node.
4156 "EXPORT_NODE": self.op.target_node,
4157 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4159 env.update(_BuildInstanceHookEnvByObject(self.instance))
4160 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4161 self.op.target_node]
4164 def CheckPrereq(self):
4165 """Check prerequisites.
4167 This checks that the instance name is a valid one.
4170 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4171 self.instance = self.cfg.GetInstanceInfo(instance_name)
4172 if self.instance is None:
4173 raise errors.OpPrereqError("Instance '%s' not found" %
4174 self.op.instance_name)
4177 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4178 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4180 if self.dst_node is None:
4181 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4182 self.op.target_node)
4183 self.op.target_node = self.dst_node.name
4185 def Exec(self, feedback_fn):
4186 """Export an instance to an image in the cluster.
4189 instance = self.instance
4190 dst_node = self.dst_node
4191 src_node = instance.primary_node
4192 # shutdown the instance, unless requested not to do so
4193 if self.op.shutdown:
4194 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4195 self.proc.ChainOpCode(op)
4197 vgname = self.cfg.GetVGName()
4202 for disk in instance.disks:
4203 if disk.iv_name == "sda":
4204 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4205 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4207 if not new_dev_name:
4208 logger.Error("could not snapshot block device %s on node %s" %
4209 (disk.logical_id[1], src_node))
4211 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4212 logical_id=(vgname, new_dev_name),
4213 physical_id=(vgname, new_dev_name),
4214 iv_name=disk.iv_name)
4215 snap_disks.append(new_dev)
4218 if self.op.shutdown:
4219 op = opcodes.OpStartupInstance(instance_name=instance.name,
4221 self.proc.ChainOpCode(op)
4223 # TODO: check for size
4225 for dev in snap_disks:
4226 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4228 logger.Error("could not export block device %s from node"
4230 (dev.logical_id[1], src_node, dst_node.name))
4231 if not rpc.call_blockdev_remove(src_node, dev):
4232 logger.Error("could not remove snapshot block device %s from"
4233 " node %s" % (dev.logical_id[1], src_node))
4235 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4236 logger.Error("could not finalize export for instance %s on node %s" %
4237 (instance.name, dst_node.name))
4239 nodelist = self.cfg.GetNodeList()
4240 nodelist.remove(dst_node.name)
4242 # on one-node clusters nodelist will be empty after the removal
4243 # if we proceed the backup would be removed because OpQueryExports
4244 # substitutes an empty list with the full cluster node list.
4246 op = opcodes.OpQueryExports(nodes=nodelist)
4247 exportlist = self.proc.ChainOpCode(op)
4248 for node in exportlist:
4249 if instance.name in exportlist[node]:
4250 if not rpc.call_export_remove(node, instance.name):
4251 logger.Error("could not remove older export for instance %s"
4252 " on node %s" % (instance.name, node))
4255 class TagsLU(NoHooksLU):
4258 This is an abstract class which is the parent of all the other tags LUs.
4261 def CheckPrereq(self):
4262 """Check prerequisites.
4265 if self.op.kind == constants.TAG_CLUSTER:
4266 self.target = self.cfg.GetClusterInfo()
4267 elif self.op.kind == constants.TAG_NODE:
4268 name = self.cfg.ExpandNodeName(self.op.name)
4270 raise errors.OpPrereqError("Invalid node name (%s)" %
4273 self.target = self.cfg.GetNodeInfo(name)
4274 elif self.op.kind == constants.TAG_INSTANCE:
4275 name = self.cfg.ExpandInstanceName(self.op.name)
4277 raise errors.OpPrereqError("Invalid instance name (%s)" %
4280 self.target = self.cfg.GetInstanceInfo(name)
4282 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4286 class LUGetTags(TagsLU):
4287 """Returns the tags of a given object.
4290 _OP_REQP = ["kind", "name"]
4292 def Exec(self, feedback_fn):
4293 """Returns the tag list.
4296 return self.target.GetTags()
4299 class LUSearchTags(NoHooksLU):
4300 """Searches the tags for a given pattern.
4303 _OP_REQP = ["pattern"]
4305 def CheckPrereq(self):
4306 """Check prerequisites.
4308 This checks the pattern passed for validity by compiling it.
4312 self.re = re.compile(self.op.pattern)
4313 except re.error, err:
4314 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4315 (self.op.pattern, err))
4317 def Exec(self, feedback_fn):
4318 """Returns the tag list.
4322 tgts = [("/cluster", cfg.GetClusterInfo())]
4323 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4324 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4325 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4326 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4328 for path, target in tgts:
4329 for tag in target.GetTags():
4330 if self.re.search(tag):
4331 results.append((path, tag))
4335 class LUAddTags(TagsLU):
4336 """Sets a tag on a given object.
4339 _OP_REQP = ["kind", "name", "tags"]
4341 def CheckPrereq(self):
4342 """Check prerequisites.
4344 This checks the type and length of the tag name and value.
4347 TagsLU.CheckPrereq(self)
4348 for tag in self.op.tags:
4349 objects.TaggableObject.ValidateTag(tag)
4351 def Exec(self, feedback_fn):
4356 for tag in self.op.tags:
4357 self.target.AddTag(tag)
4358 except errors.TagError, err:
4359 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4361 self.cfg.Update(self.target)
4362 except errors.ConfigurationError:
4363 raise errors.OpRetryError("There has been a modification to the"
4364 " config file and the operation has been"
4365 " aborted. Please retry.")
4368 class LUDelTags(TagsLU):
4369 """Delete a list of tags from a given object.
4372 _OP_REQP = ["kind", "name", "tags"]
4374 def CheckPrereq(self):
4375 """Check prerequisites.
4377 This checks that we have the given tag.
4380 TagsLU.CheckPrereq(self)
4381 for tag in self.op.tags:
4382 objects.TaggableObject.ValidateTag(tag)
4383 del_tags = frozenset(self.op.tags)
4384 cur_tags = self.target.GetTags()
4385 if not del_tags <= cur_tags:
4386 diff_tags = del_tags - cur_tags
4387 diff_names = ["'%s'" % tag for tag in diff_tags]
4389 raise errors.OpPrereqError("Tag(s) %s not found" %
4390 (",".join(diff_names)))
4392 def Exec(self, feedback_fn):
4393 """Remove the tag from the object.
4396 for tag in self.op.tags:
4397 self.target.RemoveTag(tag)
4399 self.cfg.Update(self.target)
4400 except errors.ConfigurationError:
4401 raise errors.OpRetryError("There has been a modification to the"
4402 " config file and the operation has been"
4403 " aborted. Please retry.")