4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
73 self.processor = processor
77 for attr_name in self._OP_REQP:
78 attr_val = getattr(op, attr_name, None)
80 raise errors.OpPrereqError("Required parameter '%s' missing" %
83 if not cfg.IsCluster():
84 raise errors.OpPrereqError("Cluster not initialized yet,"
85 " use 'gnt-cluster init' first.")
87 master = sstore.GetMasterNode()
88 if master != socket.gethostname():
89 raise errors.OpPrereqError("Commands must be run on the master"
92 def CheckPrereq(self):
93 """Check prerequisites for this LU.
95 This method should check that the prerequisites for the execution
96 of this LU are fulfilled. It can do internode communication, but
97 it should be idempotent - no cluster or system changes are
100 The method should raise errors.OpPrereqError in case something is
101 not fulfilled. Its return value is ignored.
103 This method should also update all the parameters of the opcode to
104 their canonical form; e.g. a short node name must be fully
105 expanded after this method has successfully completed (so that
106 hooks, logging, etc. work correctly).
109 raise NotImplementedError
111 def Exec(self, feedback_fn):
114 This method should implement the actual work. It should raise
115 errors.OpExecError for failures that are somewhat dealt with in
119 raise NotImplementedError
121 def BuildHooksEnv(self):
122 """Build hooks environment for this LU.
124 This method should return a three-node tuple consisting of: a dict
125 containing the environment that will be used for running the
126 specific hook for this LU, a list of node names on which the hook
127 should run before the execution, and a list of node names on which
128 the hook should run after the execution.
130 The keys of the dict must not have 'GANETI_' prefixed as this will
131 be handled in the hooks runner. Also note additional keys will be
132 added by the hooks runner. If the LU doesn't define any
133 environment, an empty dict (and not None) should be returned.
135 As for the node lists, the master should not be included in the
136 them, as it will be added by the hooks runner in case this LU
137 requires a cluster to run on (otherwise we don't have a node
138 list). No nodes should be returned as an empty list (and not
141 Note that if the HPATH for a LU class is None, this function will
145 raise NotImplementedError
148 class NoHooksLU(LogicalUnit):
149 """Simple LU which runs no hooks.
151 This LU is intended as a parent for other LogicalUnits which will
152 run no hooks, in order to reduce duplicate code.
158 def BuildHooksEnv(self):
161 This is a no-op, since we don't run hooks.
167 def _GetWantedNodes(lu, nodes):
168 """Returns list of checked and expanded node names.
171 nodes: List of nodes (strings) or None for all
174 if not isinstance(nodes, list):
175 raise errors.OpPrereqError("Invalid argument type 'nodes'")
181 node = lu.cfg.ExpandNodeName(name)
183 raise errors.OpPrereqError("No such node name '%s'" % name)
187 wanted = lu.cfg.GetNodeList()
188 return utils.NiceSort(wanted)
191 def _GetWantedInstances(lu, instances):
192 """Returns list of checked and expanded instance names.
195 instances: List of instances (strings) or None for all
198 if not isinstance(instances, list):
199 raise errors.OpPrereqError("Invalid argument type 'instances'")
204 for name in instances:
205 instance = lu.cfg.ExpandInstanceName(name)
207 raise errors.OpPrereqError("No such instance name '%s'" % name)
208 wanted.append(instance)
211 wanted = lu.cfg.GetInstanceList()
212 return utils.NiceSort(wanted)
215 def _CheckOutputFields(static, dynamic, selected):
216 """Checks whether all selected fields are valid.
219 static: Static fields
220 dynamic: Dynamic fields
223 static_fields = frozenset(static)
224 dynamic_fields = frozenset(dynamic)
226 all_fields = static_fields | dynamic_fields
228 if not all_fields.issuperset(selected):
229 raise errors.OpPrereqError("Unknown output fields selected: %s"
230 % ",".join(frozenset(selected).
231 difference(all_fields)))
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235 memory, vcpus, nics):
236 """Builds instance related env variables for hooks from single variables.
239 secondary_nodes: List of secondary nodes as strings
242 "INSTANCE_NAME": name,
243 "INSTANCE_PRIMARY": primary_node,
244 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245 "INSTANCE_OS_TYPE": os_type,
246 "INSTANCE_STATUS": status,
247 "INSTANCE_MEMORY": memory,
248 "INSTANCE_VCPUS": vcpus,
252 nic_count = len(nics)
253 for idx, (ip, bridge) in enumerate(nics):
256 env["INSTANCE_NIC%d_IP" % idx] = ip
257 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
261 env["INSTANCE_NIC_COUNT"] = nic_count
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267 """Builds instance related env variables for hooks from an object.
270 instance: objects.Instance object of instance
271 override: dict of values to override
274 'name': instance.name,
275 'primary_node': instance.primary_node,
276 'secondary_nodes': instance.secondary_nodes,
277 'os_type': instance.os,
278 'status': instance.os,
279 'memory': instance.memory,
280 'vcpus': instance.vcpus,
281 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
284 args.update(override)
285 return _BuildInstanceHookEnv(**args)
288 def _UpdateEtcHosts(fullnode, ip):
289 """Ensure a node has a correct entry in /etc/hosts.
292 fullnode - Fully qualified domain name of host. (str)
293 ip - IPv4 address of host (str)
296 node = fullnode.split(".", 1)[0]
298 f = open('/etc/hosts', 'r+')
307 rawline = f.readline()
313 line = rawline.split('\n')[0]
316 line = line.split('#')[0]
319 # Entire line was comment, skip
320 save_lines.append(rawline)
323 fields = line.split()
327 for spec in [ ip, fullnode, node ]:
328 if spec not in fields:
335 save_lines.append(rawline)
338 if havesome and not haveall:
339 # Line (old, or manual?) which is missing some. Remove.
343 save_lines.append(rawline)
346 add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
350 save_lines = save_lines + add_lines
352 # We removed a line, write a new file and replace old.
353 fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354 newfile = os.fdopen(fd, 'w')
355 newfile.write(''.join(save_lines))
357 os.rename(tmpname, '/etc/hosts')
360 # Simply appending a new line will do the trick.
362 for add in add_lines:
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369 """Ensure a node has a correct known_hosts entry.
372 fullnode - Fully qualified domain name of host. (str)
373 ip - IPv4 address of host (str)
374 pubkey - the public key of the cluster
377 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
389 rawline = f.readline()
390 logger.Debug('read %s' % (repr(rawline),))
396 line = rawline.split('\n')[0]
398 parts = line.split(' ')
399 fields = parts[0].split(',')
404 for spec in [ ip, fullnode ]:
405 if spec not in fields:
410 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411 if haveall and key == pubkey:
413 save_lines.append(rawline)
414 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
417 if havesome and (not haveall or key != pubkey):
419 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
422 save_lines.append(rawline)
425 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
429 save_lines = save_lines + add_lines
431 # Write a new file and replace old.
432 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
434 newfile = os.fdopen(fd, 'w')
436 newfile.write(''.join(save_lines))
439 logger.Debug("Wrote new known_hosts.")
440 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
443 # Simply appending a new line will do the trick.
445 for add in add_lines:
451 def _HasValidVG(vglist, vgname):
452 """Checks if the volume group list is valid.
454 A non-None return value means there's an error, and the return value
455 is the error message.
458 vgsize = vglist.get(vgname, None)
460 return "volume group '%s' missing" % vgname
462 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
467 def _InitSSHSetup(node):
468 """Setup the SSH configuration for the cluster.
471 This generates a dsa keypair for root, adds the pub key to the
472 permitted hosts and adds the hostkey to its own known hosts.
475 node: the name of this host as a fqdn
478 if os.path.exists('/root/.ssh/id_dsa'):
479 utils.CreateBackup('/root/.ssh/id_dsa')
480 if os.path.exists('/root/.ssh/id_dsa.pub'):
481 utils.CreateBackup('/root/.ssh/id_dsa.pub')
483 utils.RemoveFile('/root/.ssh/id_dsa')
484 utils.RemoveFile('/root/.ssh/id_dsa.pub')
486 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487 "-f", "/root/.ssh/id_dsa",
490 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
493 f = open('/root/.ssh/id_dsa.pub', 'r')
495 utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
500 def _InitGanetiServerSetup(ss):
501 """Setup the necessary configuration for the initial node daemon.
503 This creates the nodepass file containing the shared password for
504 the cluster and also generates the SSL certificate.
507 # Create pseudo random password
508 randpass = sha.new(os.urandom(64)).hexdigest()
509 # and write it into sstore
510 ss.SetKey(ss.SS_NODED_PASS, randpass)
512 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513 "-days", str(365*5), "-nodes", "-x509",
514 "-keyout", constants.SSL_CERT_FILE,
515 "-out", constants.SSL_CERT_FILE, "-batch"])
517 raise errors.OpExecError("could not generate server ssl cert, command"
518 " %s had exitcode %s and error message %s" %
519 (result.cmd, result.exit_code, result.output))
521 os.chmod(constants.SSL_CERT_FILE, 0400)
523 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
526 raise errors.OpExecError("Could not start the node daemon, command %s"
527 " had exitcode %s and error %s" %
528 (result.cmd, result.exit_code, result.output))
531 class LUInitCluster(LogicalUnit):
532 """Initialise the cluster.
535 HPATH = "cluster-init"
536 HTYPE = constants.HTYPE_CLUSTER
537 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538 "def_bridge", "master_netdev"]
541 def BuildHooksEnv(self):
544 Notes: Since we don't require a cluster, we must manually add
545 ourselves in the post-run node list.
549 "CLUSTER": self.op.cluster_name,
550 "MASTER": self.hostname['hostname_full'],
552 return env, [], [self.hostname['hostname_full']]
554 def CheckPrereq(self):
555 """Verify that the passed name is a valid one.
558 if config.ConfigWriter.IsCluster():
559 raise errors.OpPrereqError("Cluster is already initialised")
561 hostname_local = socket.gethostname()
562 self.hostname = hostname = utils.LookupHostname(hostname_local)
564 raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
567 if hostname["hostname_full"] != hostname_local:
568 raise errors.OpPrereqError("My own hostname (%s) does not match the"
569 " resolver (%s): probably not using FQDN"
571 (hostname_local, hostname["hostname_full"]))
573 if hostname["ip"].startswith("127."):
574 raise errors.OpPrereqError("This host's IP resolves to the private"
575 " range (%s). Please fix DNS or /etc/hosts." %
578 self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
580 raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581 % self.op.cluster_name)
583 result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
585 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586 " to %s,\nbut this ip address does not"
587 " belong to this host."
588 " Aborting." % hostname['ip'])
590 secondary_ip = getattr(self.op, "secondary_ip", None)
591 if secondary_ip and not utils.IsValidIP(secondary_ip):
592 raise errors.OpPrereqError("Invalid secondary ip given")
593 if secondary_ip and secondary_ip != hostname['ip']:
594 result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
596 raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597 "but it does not belong to this host." %
599 self.secondary_ip = secondary_ip
601 # checks presence of the volume group given
602 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
605 raise errors.OpPrereqError("Error: %s" % vgstatus)
607 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
609 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
612 if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614 self.op.hypervisor_type)
616 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
618 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619 (self.op.master_netdev,
620 result.output.strip()))
622 def Exec(self, feedback_fn):
623 """Initialize the cluster.
626 clustername = self.clustername
627 hostname = self.hostname
629 # set up the simple store
630 ss = ssconf.SimpleStore()
631 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632 ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633 ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635 ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
637 # set up the inter-node password and certificate
638 _InitGanetiServerSetup(ss)
640 # start the master ip
641 rpc.call_node_start_master(hostname['hostname_full'])
643 # set up ssh config and /etc/hosts
644 f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
649 sshkey = sshline.split(" ")[1]
651 _UpdateEtcHosts(hostname['hostname_full'],
655 _UpdateKnownHosts(hostname['hostname_full'],
660 _InitSSHSetup(hostname['hostname'])
662 # init of cluster config file
663 cfgw = config.ConfigWriter()
664 cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665 sshkey, self.op.mac_prefix,
666 self.op.vg_name, self.op.def_bridge)
669 class LUDestroyCluster(NoHooksLU):
670 """Logical unit for destroying the cluster.
675 def CheckPrereq(self):
676 """Check prerequisites.
678 This checks whether the cluster is empty.
680 Any errors are signalled by raising errors.OpPrereqError.
683 master = self.sstore.GetMasterNode()
685 nodelist = self.cfg.GetNodeList()
686 if len(nodelist) != 1 or nodelist[0] != master:
687 raise errors.OpPrereqError("There are still %d node(s) in"
688 " this cluster." % (len(nodelist) - 1))
689 instancelist = self.cfg.GetInstanceList()
691 raise errors.OpPrereqError("There are still %d instance(s) in"
692 " this cluster." % len(instancelist))
694 def Exec(self, feedback_fn):
695 """Destroys the cluster.
698 utils.CreateBackup('/root/.ssh/id_dsa')
699 utils.CreateBackup('/root/.ssh/id_dsa.pub')
700 rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
703 class LUVerifyCluster(NoHooksLU):
704 """Verifies the cluster status.
709 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710 remote_version, feedback_fn):
711 """Run multiple tests against a node.
714 - compares ganeti version
715 - checks vg existance and size > 20G
716 - checks config file checksum
717 - checks ssh to other nodes
720 node: name of the node to check
721 file_list: required list of files
722 local_cksum: dictionary of local files and their checksums
725 # compares ganeti version
726 local_version = constants.PROTOCOL_VERSION
727 if not remote_version:
728 feedback_fn(" - ERROR: connection to %s failed" % (node))
731 if local_version != remote_version:
732 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
733 (local_version, node, remote_version))
736 # checks vg existance and size > 20G
740 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
744 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
746 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
749 # checks config file checksum
752 if 'filelist' not in node_result:
754 feedback_fn(" - ERROR: node hasn't returned file checksum data")
756 remote_cksum = node_result['filelist']
757 for file_name in file_list:
758 if file_name not in remote_cksum:
760 feedback_fn(" - ERROR: file '%s' missing" % file_name)
761 elif remote_cksum[file_name] != local_cksum[file_name]:
763 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
765 if 'nodelist' not in node_result:
767 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
769 if node_result['nodelist']:
771 for node in node_result['nodelist']:
772 feedback_fn(" - ERROR: communication with node '%s': %s" %
773 (node, node_result['nodelist'][node]))
774 hyp_result = node_result.get('hypervisor', None)
775 if hyp_result is not None:
776 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
779 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780 """Verify an instance.
782 This function checks to see if the required block devices are
783 available on the instance's node.
788 instancelist = self.cfg.GetInstanceList()
789 if not instance in instancelist:
790 feedback_fn(" - ERROR: instance %s not in instance list %s" %
791 (instance, instancelist))
794 instanceconfig = self.cfg.GetInstanceInfo(instance)
795 node_current = instanceconfig.primary_node
798 instanceconfig.MapLVsByNode(node_vol_should)
800 for node in node_vol_should:
801 for volume in node_vol_should[node]:
802 if node not in node_vol_is or volume not in node_vol_is[node]:
803 feedback_fn(" - ERROR: volume %s missing on node %s" %
807 if not instanceconfig.status == 'down':
808 if not instance in node_instance[node_current]:
809 feedback_fn(" - ERROR: instance %s not running on node %s" %
810 (instance, node_current))
813 for node in node_instance:
814 if (not node == node_current):
815 if instance in node_instance[node]:
816 feedback_fn(" - ERROR: instance %s should not run on node %s" %
822 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823 """Verify if there are any unknown volumes in the cluster.
825 The .os, .swap and backup volumes are ignored. All other volumes are
831 for node in node_vol_is:
832 for volume in node_vol_is[node]:
833 if node not in node_vol_should or volume not in node_vol_should[node]:
834 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
839 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840 """Verify the list of running instances.
842 This checks what instances are running but unknown to the cluster.
846 for node in node_instance:
847 for runninginstance in node_instance[node]:
848 if runninginstance not in instancelist:
849 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
850 (runninginstance, node))
854 def CheckPrereq(self):
855 """Check prerequisites.
857 This has no prerequisites.
862 def Exec(self, feedback_fn):
863 """Verify integrity of cluster, performing various test on nodes.
867 feedback_fn("* Verifying global settings")
868 self.cfg.VerifyConfig()
870 master = self.sstore.GetMasterNode()
871 vg_name = self.cfg.GetVGName()
872 nodelist = utils.NiceSort(self.cfg.GetNodeList())
873 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
877 # FIXME: verify OS list
879 file_names = list(self.sstore.GetFileList())
880 file_names.append(constants.SSL_CERT_FILE)
881 file_names.append(constants.CLUSTER_CONF_FILE)
882 local_checksums = utils.FingerprintFiles(file_names)
884 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886 all_instanceinfo = rpc.call_instance_list(nodelist)
887 all_vglist = rpc.call_vg_list(nodelist)
888 node_verify_param = {
889 'filelist': file_names,
890 'nodelist': nodelist,
893 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894 all_rversion = rpc.call_version(nodelist)
896 for node in nodelist:
897 feedback_fn("* Verifying node %s" % node)
898 result = self._VerifyNode(node, file_names, local_checksums,
899 all_vglist[node], all_nvinfo[node],
900 all_rversion[node], feedback_fn)
904 volumeinfo = all_volumeinfo[node]
906 if type(volumeinfo) != dict:
907 feedback_fn(" - ERROR: connection to %s failed" % (node,))
911 node_volume[node] = volumeinfo
914 nodeinstance = all_instanceinfo[node]
915 if type(nodeinstance) != list:
916 feedback_fn(" - ERROR: connection to %s failed" % (node,))
920 node_instance[node] = nodeinstance
924 for instance in instancelist:
925 feedback_fn("* Verifying instance %s" % instance)
926 result = self._VerifyInstance(instance, node_volume, node_instance,
930 inst_config = self.cfg.GetInstanceInfo(instance)
932 inst_config.MapLVsByNode(node_vol_should)
934 feedback_fn("* Verifying orphan volumes")
935 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
939 feedback_fn("* Verifying remaining instances")
940 result = self._VerifyOrphanInstances(instancelist, node_instance,
947 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948 """Sleep and poll for an instance's disk to sync.
951 if not instance.disks:
955 logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
957 node = instance.primary_node
959 for dev in instance.disks:
960 cfgw.SetDiskID(dev, node)
966 cumul_degraded = False
967 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
969 logger.ToStderr("Can't get any data from node %s" % node)
972 raise errors.RemoteError("Can't contact node %s for mirror data,"
977 for i in range(len(rstats)):
980 logger.ToStderr("Can't compute data for node %s/%s" %
981 (node, instance.disks[i].iv_name))
983 perc_done, est_time, is_degraded = mstat
984 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985 if perc_done is not None:
987 if est_time is not None:
988 rem_time = "%d estimated seconds remaining" % est_time
991 rem_time = "no time estimate"
992 logger.ToStdout("- device %s: %5.2f%% done, %s" %
993 (instance.disks[i].iv_name, perc_done, rem_time))
1000 time.sleep(min(60, max_time))
1006 logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007 return not cumul_degraded
1010 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011 """Check that mirrors are not degraded.
1014 cfgw.SetDiskID(dev, node)
1017 if on_primary or dev.AssembleOnSecondary():
1018 rstats = rpc.call_blockdev_find(node, dev)
1020 logger.ToStderr("Can't get any data from node %s" % node)
1023 result = result and (not rstats[5])
1025 for child in dev.children:
1026 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1031 class LUDiagnoseOS(NoHooksLU):
1032 """Logical unit for OS diagnose/query.
1037 def CheckPrereq(self):
1038 """Check prerequisites.
1040 This always succeeds, since this is a pure query LU.
1045 def Exec(self, feedback_fn):
1046 """Compute the list of OSes.
1049 node_list = self.cfg.GetNodeList()
1050 node_data = rpc.call_os_diagnose(node_list)
1051 if node_data == False:
1052 raise errors.OpExecError("Can't gather the list of OSes")
1056 class LURemoveNode(LogicalUnit):
1057 """Logical unit for removing a node.
1060 HPATH = "node-remove"
1061 HTYPE = constants.HTYPE_NODE
1062 _OP_REQP = ["node_name"]
1064 def BuildHooksEnv(self):
1067 This doesn't run on the target node in the pre phase as a failed
1068 node would not allows itself to run.
1072 "NODE_NAME": self.op.node_name,
1074 all_nodes = self.cfg.GetNodeList()
1075 all_nodes.remove(self.op.node_name)
1076 return env, all_nodes, all_nodes
1078 def CheckPrereq(self):
1079 """Check prerequisites.
1082 - the node exists in the configuration
1083 - it does not have primary or secondary instances
1084 - it's not the master
1086 Any errors are signalled by raising errors.OpPrereqError.
1089 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1091 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1093 instance_list = self.cfg.GetInstanceList()
1095 masternode = self.sstore.GetMasterNode()
1096 if node.name == masternode:
1097 raise errors.OpPrereqError("Node is the master node,"
1098 " you need to failover first.")
1100 for instance_name in instance_list:
1101 instance = self.cfg.GetInstanceInfo(instance_name)
1102 if node.name == instance.primary_node:
1103 raise errors.OpPrereqError("Instance %s still running on the node,"
1104 " please remove first." % instance_name)
1105 if node.name in instance.secondary_nodes:
1106 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107 " please remove first." % instance_name)
1108 self.op.node_name = node.name
1111 def Exec(self, feedback_fn):
1112 """Removes the node from the cluster.
1116 logger.Info("stopping the node daemon and removing configs from node %s" %
1119 rpc.call_node_leave_cluster(node.name)
1121 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1123 logger.Info("Removing node %s from config" % node.name)
1125 self.cfg.RemoveNode(node.name)
1128 class LUQueryNodes(NoHooksLU):
1129 """Logical unit for querying nodes.
1132 _OP_REQP = ["output_fields", "nodes"]
1134 def CheckPrereq(self):
1135 """Check prerequisites.
1137 This checks that the fields required are valid output fields.
1140 self.dynamic_fields = frozenset(["dtotal", "dfree",
1141 "mtotal", "mnode", "mfree"])
1143 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144 "pinst_list", "sinst_list",
1146 dynamic=self.dynamic_fields,
1147 selected=self.op.output_fields)
1149 self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
1151 def Exec(self, feedback_fn):
1152 """Computes the list of nodes and their attributes.
1155 nodenames = self.wanted_nodes
1156 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1158 # begin data gathering
1160 if self.dynamic_fields.intersection(self.op.output_fields):
1162 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163 for name in nodenames:
1164 nodeinfo = node_data.get(name, None)
1167 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1174 live_data[name] = {}
1176 live_data = dict.fromkeys(nodenames, {})
1178 node_to_primary = dict([(name, set()) for name in nodenames])
1179 node_to_secondary = dict([(name, set()) for name in nodenames])
1181 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182 "sinst_cnt", "sinst_list"))
1183 if inst_fields & frozenset(self.op.output_fields):
1184 instancelist = self.cfg.GetInstanceList()
1186 for instance_name in instancelist:
1187 inst = self.cfg.GetInstanceInfo(instance_name)
1188 if inst.primary_node in node_to_primary:
1189 node_to_primary[inst.primary_node].add(inst.name)
1190 for secnode in inst.secondary_nodes:
1191 if secnode in node_to_secondary:
1192 node_to_secondary[secnode].add(inst.name)
1194 # end data gathering
1197 for node in nodelist:
1199 for field in self.op.output_fields:
1202 elif field == "pinst_list":
1203 val = list(node_to_primary[node.name])
1204 elif field == "sinst_list":
1205 val = list(node_to_secondary[node.name])
1206 elif field == "pinst_cnt":
1207 val = len(node_to_primary[node.name])
1208 elif field == "sinst_cnt":
1209 val = len(node_to_secondary[node.name])
1210 elif field == "pip":
1211 val = node.primary_ip
1212 elif field == "sip":
1213 val = node.secondary_ip
1214 elif field in self.dynamic_fields:
1215 val = live_data[node.name].get(field, None)
1217 raise errors.ParameterError(field)
1218 node_output.append(val)
1219 output.append(node_output)
1224 class LUQueryNodeVolumes(NoHooksLU):
1225 """Logical unit for getting volumes on node(s).
1228 _OP_REQP = ["nodes", "output_fields"]
1230 def CheckPrereq(self):
1231 """Check prerequisites.
1233 This checks that the fields required are valid output fields.
1236 self.nodes = _GetWantedNodes(self, self.op.nodes)
1238 _CheckOutputFields(static=["node"],
1239 dynamic=["phys", "vg", "name", "size", "instance"],
1240 selected=self.op.output_fields)
1243 def Exec(self, feedback_fn):
1244 """Computes the list of nodes and their attributes.
1247 nodenames = self.nodes
1248 volumes = rpc.call_node_volumes(nodenames)
1250 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251 in self.cfg.GetInstanceList()]
1253 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1256 for node in nodenames:
1257 if node not in volumes or not volumes[node]:
1260 node_vols = volumes[node][:]
1261 node_vols.sort(key=lambda vol: vol['dev'])
1263 for vol in node_vols:
1265 for field in self.op.output_fields:
1268 elif field == "phys":
1272 elif field == "name":
1274 elif field == "size":
1275 val = int(float(vol['size']))
1276 elif field == "instance":
1278 if node not in lv_by_node[inst]:
1280 if vol['name'] in lv_by_node[inst][node]:
1286 raise errors.ParameterError(field)
1287 node_output.append(str(val))
1289 output.append(node_output)
1294 class LUAddNode(LogicalUnit):
1295 """Logical unit for adding node to the cluster.
1299 HTYPE = constants.HTYPE_NODE
1300 _OP_REQP = ["node_name"]
1302 def BuildHooksEnv(self):
1305 This will run on all nodes before, and on all nodes + the new node after.
1309 "NODE_NAME": self.op.node_name,
1310 "NODE_PIP": self.op.primary_ip,
1311 "NODE_SIP": self.op.secondary_ip,
1313 nodes_0 = self.cfg.GetNodeList()
1314 nodes_1 = nodes_0 + [self.op.node_name, ]
1315 return env, nodes_0, nodes_1
1317 def CheckPrereq(self):
1318 """Check prerequisites.
1321 - the new node is not already in the config
1323 - its parameters (single/dual homed) matches the cluster
1325 Any errors are signalled by raising errors.OpPrereqError.
1328 node_name = self.op.node_name
1331 dns_data = utils.LookupHostname(node_name)
1333 raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1335 node = dns_data['hostname']
1336 primary_ip = self.op.primary_ip = dns_data['ip']
1337 secondary_ip = getattr(self.op, "secondary_ip", None)
1338 if secondary_ip is None:
1339 secondary_ip = primary_ip
1340 if not utils.IsValidIP(secondary_ip):
1341 raise errors.OpPrereqError("Invalid secondary IP given")
1342 self.op.secondary_ip = secondary_ip
1343 node_list = cfg.GetNodeList()
1344 if node in node_list:
1345 raise errors.OpPrereqError("Node %s is already in the configuration"
1348 for existing_node_name in node_list:
1349 existing_node = cfg.GetNodeInfo(existing_node_name)
1350 if (existing_node.primary_ip == primary_ip or
1351 existing_node.secondary_ip == primary_ip or
1352 existing_node.primary_ip == secondary_ip or
1353 existing_node.secondary_ip == secondary_ip):
1354 raise errors.OpPrereqError("New node ip address(es) conflict with"
1355 " existing node %s" % existing_node.name)
1357 # check that the type of the node (single versus dual homed) is the
1358 # same as for the master
1359 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360 master_singlehomed = myself.secondary_ip == myself.primary_ip
1361 newbie_singlehomed = secondary_ip == primary_ip
1362 if master_singlehomed != newbie_singlehomed:
1363 if master_singlehomed:
1364 raise errors.OpPrereqError("The master has no private ip but the"
1365 " new node has one")
1367 raise errors.OpPrereqError("The master has a private ip but the"
1368 " new node doesn't have one")
1370 # checks reachablity
1371 command = ["fping", "-q", primary_ip]
1372 result = utils.RunCmd(command)
1374 raise errors.OpPrereqError("Node not reachable by ping")
1376 if not newbie_singlehomed:
1377 # check reachability from my secondary ip to newbie's secondary ip
1378 command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379 result = utils.RunCmd(command)
1381 raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1383 self.new_node = objects.Node(name=node,
1384 primary_ip=primary_ip,
1385 secondary_ip=secondary_ip)
1387 def Exec(self, feedback_fn):
1388 """Adds the new node to the cluster.
1391 new_node = self.new_node
1392 node = new_node.name
1394 # set up inter-node password and certificate and restarts the node daemon
1395 gntpass = self.sstore.GetNodeDaemonPassword()
1396 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397 raise errors.OpExecError("ganeti password corruption detected")
1398 f = open(constants.SSL_CERT_FILE)
1400 gntpem = f.read(8192)
1403 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404 # so we use this to detect an invalid certificate; as long as the
1405 # cert doesn't contain this, the here-document will be correctly
1406 # parsed by the shell sequence below
1407 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409 if not gntpem.endswith("\n"):
1410 raise errors.OpExecError("PEM must end with newline")
1411 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1413 # and then connect with ssh to set password and start ganeti-noded
1414 # note that all the below variables are sanitized at this point,
1415 # either by being constants or by the checks above
1417 mycommand = ("umask 077 && "
1418 "echo '%s' > '%s' && "
1419 "cat > '%s' << '!EOF.' && \n"
1420 "%s!EOF.\n%s restart" %
1421 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422 constants.SSL_CERT_FILE, gntpem,
1423 constants.NODE_INITD_SCRIPT))
1425 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1427 raise errors.OpExecError("Remote command on node %s, error: %s,"
1429 (node, result.fail_reason, result.output))
1431 # check connectivity
1434 result = rpc.call_version([node])[node]
1436 if constants.PROTOCOL_VERSION == result:
1437 logger.Info("communication to node %s fine, sw version %s match" %
1440 raise errors.OpExecError("Version mismatch master version %s,"
1441 " node version %s" %
1442 (constants.PROTOCOL_VERSION, result))
1444 raise errors.OpExecError("Cannot get version from the new node")
1447 logger.Info("copy ssh key to node %s" % node)
1449 keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1456 keyarray.append(f.read())
1460 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461 keyarray[3], keyarray[4], keyarray[5])
1464 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1466 # Add node to our /etc/hosts, and add key to known_hosts
1467 _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469 self.cfg.GetHostKey())
1471 if new_node.secondary_ip != new_node.primary_ip:
1472 result = ssh.SSHCall(node, "root",
1473 "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1475 raise errors.OpExecError("Node claims it doesn't have the"
1476 " secondary ip you gave (%s).\n"
1477 "Please fix and re-run this command." %
1478 new_node.secondary_ip)
1480 success, msg = ssh.VerifyNodeHostname(node)
1482 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483 " than the one the resolver gives: %s.\n"
1484 "Please fix and re-run this command." %
1487 # Distribute updated /etc/hosts and known_hosts to all nodes,
1488 # including the node just added
1489 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490 dist_nodes = self.cfg.GetNodeList() + [node]
1491 if myself.name in dist_nodes:
1492 dist_nodes.remove(myself.name)
1494 logger.Debug("Copying hosts and known_hosts to all nodes")
1495 for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496 result = rpc.call_upload_file(dist_nodes, fname)
1497 for to_node in dist_nodes:
1498 if not result[to_node]:
1499 logger.Error("copy of file %s to node %s failed" %
1502 to_copy = ss.GetFileList()
1503 for fname in to_copy:
1504 if not ssh.CopyFileToNode(node, fname):
1505 logger.Error("could not copy file %s to node %s" % (fname, node))
1507 logger.Info("adding node %s to cluster.conf" % node)
1508 self.cfg.AddNode(new_node)
1511 class LUMasterFailover(LogicalUnit):
1512 """Failover the master node to the current node.
1514 This is a special LU in that it must run on a non-master node.
1517 HPATH = "master-failover"
1518 HTYPE = constants.HTYPE_CLUSTER
1522 def BuildHooksEnv(self):
1525 This will run on the new master only in the pre phase, and on all
1526 the nodes in the post phase.
1530 "NEW_MASTER": self.new_master,
1531 "OLD_MASTER": self.old_master,
1533 return env, [self.new_master], self.cfg.GetNodeList()
1535 def CheckPrereq(self):
1536 """Check prerequisites.
1538 This checks that we are not already the master.
1541 self.new_master = socket.gethostname()
1543 self.old_master = self.sstore.GetMasterNode()
1545 if self.old_master == self.new_master:
1546 raise errors.OpPrereqError("This commands must be run on the node"
1547 " where you want the new master to be.\n"
1548 "%s is already the master" %
1551 def Exec(self, feedback_fn):
1552 """Failover the master node.
1554 This command, when run on a non-master node, will cause the current
1555 master to cease being master, and the non-master to become new
1559 #TODO: do not rely on gethostname returning the FQDN
1560 logger.Info("setting master to %s, old master: %s" %
1561 (self.new_master, self.old_master))
1563 if not rpc.call_node_stop_master(self.old_master):
1564 logger.Error("could disable the master role on the old master"
1565 " %s, please disable manually" % self.old_master)
1568 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571 logger.Error("could not distribute the new simple store master file"
1572 " to the other nodes, please check.")
1574 if not rpc.call_node_start_master(self.new_master):
1575 logger.Error("could not start the master role on the new master"
1576 " %s, please check" % self.new_master)
1577 feedback_fn("Error in activating the master IP on the new master,\n"
1578 "please fix manually.")
1582 class LUQueryClusterInfo(NoHooksLU):
1583 """Query cluster configuration.
1589 def CheckPrereq(self):
1590 """No prerequsites needed for this LU.
1595 def Exec(self, feedback_fn):
1596 """Return cluster config.
1600 "name": self.sstore.GetClusterName(),
1601 "software_version": constants.RELEASE_VERSION,
1602 "protocol_version": constants.PROTOCOL_VERSION,
1603 "config_version": constants.CONFIG_VERSION,
1604 "os_api_version": constants.OS_API_VERSION,
1605 "export_version": constants.EXPORT_VERSION,
1606 "master": self.sstore.GetMasterNode(),
1607 "architecture": (platform.architecture()[0], platform.machine()),
1613 class LUClusterCopyFile(NoHooksLU):
1614 """Copy file to cluster.
1617 _OP_REQP = ["nodes", "filename"]
1619 def CheckPrereq(self):
1620 """Check prerequisites.
1622 It should check that the named file exists and that the given list
1626 if not os.path.exists(self.op.filename):
1627 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1629 self.nodes = _GetWantedNodes(self, self.op.nodes)
1631 def Exec(self, feedback_fn):
1632 """Copy a file from master to some nodes.
1635 opts - class with options as members
1636 args - list containing a single element, the file name
1638 nodes - list containing the name of target nodes; if empty, all nodes
1641 filename = self.op.filename
1643 myname = socket.gethostname()
1645 for node in self.nodes:
1648 if not ssh.CopyFileToNode(node, filename):
1649 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1652 class LUDumpClusterConfig(NoHooksLU):
1653 """Return a text-representation of the cluster-config.
1658 def CheckPrereq(self):
1659 """No prerequisites.
1664 def Exec(self, feedback_fn):
1665 """Dump a representation of the cluster config to the standard output.
1668 return self.cfg.DumpConfig()
1671 class LURunClusterCommand(NoHooksLU):
1672 """Run a command on some nodes.
1675 _OP_REQP = ["command", "nodes"]
1677 def CheckPrereq(self):
1678 """Check prerequisites.
1680 It checks that the given list of nodes is valid.
1683 self.nodes = _GetWantedNodes(self, self.op.nodes)
1685 def Exec(self, feedback_fn):
1686 """Run a command on some nodes.
1690 for node in self.nodes:
1691 result = ssh.SSHCall(node, "root", self.op.command)
1692 data.append((node, result.output, result.exit_code))
1697 class LUActivateInstanceDisks(NoHooksLU):
1698 """Bring up an instance's disks.
1701 _OP_REQP = ["instance_name"]
1703 def CheckPrereq(self):
1704 """Check prerequisites.
1706 This checks that the instance is in the cluster.
1709 instance = self.cfg.GetInstanceInfo(
1710 self.cfg.ExpandInstanceName(self.op.instance_name))
1711 if instance is None:
1712 raise errors.OpPrereqError("Instance '%s' not known" %
1713 self.op.instance_name)
1714 self.instance = instance
1717 def Exec(self, feedback_fn):
1718 """Activate the disks.
1721 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1723 raise errors.OpExecError("Cannot activate block devices")
1728 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729 """Prepare the block devices for an instance.
1731 This sets up the block devices on all nodes.
1734 instance: a ganeti.objects.Instance object
1735 ignore_secondaries: if true, errors on secondary nodes won't result
1736 in an error return from the function
1739 false if the operation failed
1740 list of (host, instance_visible_name, node_visible_name) if the operation
1741 suceeded with the mapping from node devices to instance devices
1745 for inst_disk in instance.disks:
1746 master_result = None
1747 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748 cfg.SetDiskID(node_disk, node)
1749 is_primary = node == instance.primary_node
1750 result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1752 logger.Error("could not prepare block device %s on node %s (is_pri"
1753 "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754 if is_primary or not ignore_secondaries:
1757 master_result = result
1758 device_info.append((instance.primary_node, inst_disk.iv_name,
1761 return disks_ok, device_info
1764 def _StartInstanceDisks(cfg, instance, force):
1765 """Start the disks of an instance.
1768 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769 ignore_secondaries=force)
1771 _ShutdownInstanceDisks(instance, cfg)
1772 if force is not None and not force:
1773 logger.Error("If the message above refers to a secondary node,"
1774 " you can retry the operation using '--force'.")
1775 raise errors.OpExecError("Disk consistency error")
1778 class LUDeactivateInstanceDisks(NoHooksLU):
1779 """Shutdown an instance's disks.
1782 _OP_REQP = ["instance_name"]
1784 def CheckPrereq(self):
1785 """Check prerequisites.
1787 This checks that the instance is in the cluster.
1790 instance = self.cfg.GetInstanceInfo(
1791 self.cfg.ExpandInstanceName(self.op.instance_name))
1792 if instance is None:
1793 raise errors.OpPrereqError("Instance '%s' not known" %
1794 self.op.instance_name)
1795 self.instance = instance
1797 def Exec(self, feedback_fn):
1798 """Deactivate the disks
1801 instance = self.instance
1802 ins_l = rpc.call_instance_list([instance.primary_node])
1803 ins_l = ins_l[instance.primary_node]
1804 if not type(ins_l) is list:
1805 raise errors.OpExecError("Can't contact node '%s'" %
1806 instance.primary_node)
1808 if self.instance.name in ins_l:
1809 raise errors.OpExecError("Instance is running, can't shutdown"
1812 _ShutdownInstanceDisks(instance, self.cfg)
1815 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816 """Shutdown block devices of an instance.
1818 This does the shutdown on all nodes of the instance.
1820 If the ignore_primary is false, errors on the primary node are
1825 for disk in instance.disks:
1826 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827 cfg.SetDiskID(top_disk, node)
1828 if not rpc.call_blockdev_shutdown(node, top_disk):
1829 logger.Error("could not shutdown block device %s on node %s" %
1830 (disk.iv_name, node))
1831 if not ignore_primary or node != instance.primary_node:
1836 class LUStartupInstance(LogicalUnit):
1837 """Starts an instance.
1840 HPATH = "instance-start"
1841 HTYPE = constants.HTYPE_INSTANCE
1842 _OP_REQP = ["instance_name", "force"]
1844 def BuildHooksEnv(self):
1847 This runs on master, primary and secondary nodes of the instance.
1851 "FORCE": self.op.force,
1853 env.update(_BuildInstanceHookEnvByObject(self.instance))
1854 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855 list(self.instance.secondary_nodes))
1858 def CheckPrereq(self):
1859 """Check prerequisites.
1861 This checks that the instance is in the cluster.
1864 instance = self.cfg.GetInstanceInfo(
1865 self.cfg.ExpandInstanceName(self.op.instance_name))
1866 if instance is None:
1867 raise errors.OpPrereqError("Instance '%s' not known" %
1868 self.op.instance_name)
1870 # check bridges existance
1871 brlist = [nic.bridge for nic in instance.nics]
1872 if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873 raise errors.OpPrereqError("one or more target bridges %s does not"
1874 " exist on destination node '%s'" %
1875 (brlist, instance.primary_node))
1877 self.instance = instance
1878 self.op.instance_name = instance.name
1880 def Exec(self, feedback_fn):
1881 """Start the instance.
1884 instance = self.instance
1885 force = self.op.force
1886 extra_args = getattr(self.op, "extra_args", "")
1888 node_current = instance.primary_node
1890 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1892 raise errors.OpExecError("Could not contact node %s for infos" %
1895 freememory = nodeinfo[node_current]['memory_free']
1896 memory = instance.memory
1897 if memory > freememory:
1898 raise errors.OpExecError("Not enough memory to start instance"
1900 " needed %s MiB, available %s MiB" %
1901 (instance.name, node_current, memory,
1904 _StartInstanceDisks(self.cfg, instance, force)
1906 if not rpc.call_instance_start(node_current, instance, extra_args):
1907 _ShutdownInstanceDisks(instance, self.cfg)
1908 raise errors.OpExecError("Could not start instance")
1910 self.cfg.MarkInstanceUp(instance.name)
1913 class LUShutdownInstance(LogicalUnit):
1914 """Shutdown an instance.
1917 HPATH = "instance-stop"
1918 HTYPE = constants.HTYPE_INSTANCE
1919 _OP_REQP = ["instance_name"]
1921 def BuildHooksEnv(self):
1924 This runs on master, primary and secondary nodes of the instance.
1927 env = _BuildInstanceHookEnvByObject(self.instance)
1928 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929 list(self.instance.secondary_nodes))
1932 def CheckPrereq(self):
1933 """Check prerequisites.
1935 This checks that the instance is in the cluster.
1938 instance = self.cfg.GetInstanceInfo(
1939 self.cfg.ExpandInstanceName(self.op.instance_name))
1940 if instance is None:
1941 raise errors.OpPrereqError("Instance '%s' not known" %
1942 self.op.instance_name)
1943 self.instance = instance
1945 def Exec(self, feedback_fn):
1946 """Shutdown the instance.
1949 instance = self.instance
1950 node_current = instance.primary_node
1951 if not rpc.call_instance_shutdown(node_current, instance):
1952 logger.Error("could not shutdown instance")
1954 self.cfg.MarkInstanceDown(instance.name)
1955 _ShutdownInstanceDisks(instance, self.cfg)
1958 class LUReinstallInstance(LogicalUnit):
1959 """Reinstall an instance.
1962 HPATH = "instance-reinstall"
1963 HTYPE = constants.HTYPE_INSTANCE
1964 _OP_REQP = ["instance_name"]
1966 def BuildHooksEnv(self):
1969 This runs on master, primary and secondary nodes of the instance.
1972 env = _BuildInstanceHookEnvByObject(self.instance)
1973 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974 list(self.instance.secondary_nodes))
1977 def CheckPrereq(self):
1978 """Check prerequisites.
1980 This checks that the instance is in the cluster and is not running.
1983 instance = self.cfg.GetInstanceInfo(
1984 self.cfg.ExpandInstanceName(self.op.instance_name))
1985 if instance is None:
1986 raise errors.OpPrereqError("Instance '%s' not known" %
1987 self.op.instance_name)
1988 if instance.disk_template == constants.DT_DISKLESS:
1989 raise errors.OpPrereqError("Instance '%s' has no disks" %
1990 self.op.instance_name)
1991 if instance.status != "down":
1992 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993 self.op.instance_name)
1994 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1996 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997 (self.op.instance_name,
1998 instance.primary_node))
2000 self.op.os_type = getattr(self.op, "os_type", None)
2001 if self.op.os_type is not None:
2003 pnode = self.cfg.GetNodeInfo(
2004 self.cfg.ExpandNodeName(instance.primary_node))
2006 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2008 os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009 if not isinstance(os_obj, objects.OS):
2010 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011 " primary node" % self.op.os_type)
2013 self.instance = instance
2015 def Exec(self, feedback_fn):
2016 """Reinstall the instance.
2019 inst = self.instance
2021 if self.op.os_type is not None:
2022 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023 inst.os = self.op.os_type
2024 self.cfg.AddInstance(inst)
2026 _StartInstanceDisks(self.cfg, inst, None)
2028 feedback_fn("Running the instance OS create scripts...")
2029 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030 raise errors.OpExecError("Could not install OS for instance %s "
2032 (inst.name, inst.primary_node))
2034 _ShutdownInstanceDisks(inst, self.cfg)
2037 class LURemoveInstance(LogicalUnit):
2038 """Remove an instance.
2041 HPATH = "instance-remove"
2042 HTYPE = constants.HTYPE_INSTANCE
2043 _OP_REQP = ["instance_name"]
2045 def BuildHooksEnv(self):
2048 This runs on master, primary and secondary nodes of the instance.
2051 env = _BuildInstanceHookEnvByObject(self.instance)
2052 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053 list(self.instance.secondary_nodes))
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2059 This checks that the instance is in the cluster.
2062 instance = self.cfg.GetInstanceInfo(
2063 self.cfg.ExpandInstanceName(self.op.instance_name))
2064 if instance is None:
2065 raise errors.OpPrereqError("Instance '%s' not known" %
2066 self.op.instance_name)
2067 self.instance = instance
2069 def Exec(self, feedback_fn):
2070 """Remove the instance.
2073 instance = self.instance
2074 logger.Info("shutting down instance %s on node %s" %
2075 (instance.name, instance.primary_node))
2077 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2078 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2079 (instance.name, instance.primary_node))
2081 logger.Info("removing block devices for instance %s" % instance.name)
2083 _RemoveDisks(instance, self.cfg)
2085 logger.Info("removing instance %s out of cluster config" % instance.name)
2087 self.cfg.RemoveInstance(instance.name)
2090 class LUQueryInstances(NoHooksLU):
2091 """Logical unit for querying instances.
2094 _OP_REQP = ["output_fields"]
2096 def CheckPrereq(self):
2097 """Check prerequisites.
2099 This checks that the fields required are valid output fields.
2102 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2103 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2104 "admin_state", "admin_ram",
2105 "disk_template", "ip", "mac", "bridge",
2106 "sda_size", "sdb_size"],
2107 dynamic=self.dynamic_fields,
2108 selected=self.op.output_fields)
2110 def Exec(self, feedback_fn):
2111 """Computes the list of nodes and their attributes.
2114 instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2115 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2118 # begin data gathering
2120 nodes = frozenset([inst.primary_node for inst in instance_list])
2123 if self.dynamic_fields.intersection(self.op.output_fields):
2125 node_data = rpc.call_all_instances_info(nodes)
2127 result = node_data[name]
2129 live_data.update(result)
2130 elif result == False:
2131 bad_nodes.append(name)
2132 # else no instance is alive
2134 live_data = dict([(name, {}) for name in instance_names])
2136 # end data gathering
2139 for instance in instance_list:
2141 for field in self.op.output_fields:
2146 elif field == "pnode":
2147 val = instance.primary_node
2148 elif field == "snodes":
2149 val = list(instance.secondary_nodes)
2150 elif field == "admin_state":
2151 val = (instance.status != "down")
2152 elif field == "oper_state":
2153 if instance.primary_node in bad_nodes:
2156 val = bool(live_data.get(instance.name))
2157 elif field == "admin_ram":
2158 val = instance.memory
2159 elif field == "oper_ram":
2160 if instance.primary_node in bad_nodes:
2162 elif instance.name in live_data:
2163 val = live_data[instance.name].get("memory", "?")
2166 elif field == "disk_template":
2167 val = instance.disk_template
2169 val = instance.nics[0].ip
2170 elif field == "bridge":
2171 val = instance.nics[0].bridge
2172 elif field == "mac":
2173 val = instance.nics[0].mac
2174 elif field == "sda_size" or field == "sdb_size":
2175 disk = instance.FindDisk(field[:3])
2181 raise errors.ParameterError(field)
2188 class LUFailoverInstance(LogicalUnit):
2189 """Failover an instance.
2192 HPATH = "instance-failover"
2193 HTYPE = constants.HTYPE_INSTANCE
2194 _OP_REQP = ["instance_name", "ignore_consistency"]
2196 def BuildHooksEnv(self):
2199 This runs on master, primary and secondary nodes of the instance.
2203 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2205 env.update(_BuildInstanceHookEnvByObject(self.instance))
2206 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2209 def CheckPrereq(self):
2210 """Check prerequisites.
2212 This checks that the instance is in the cluster.
2215 instance = self.cfg.GetInstanceInfo(
2216 self.cfg.ExpandInstanceName(self.op.instance_name))
2217 if instance is None:
2218 raise errors.OpPrereqError("Instance '%s' not known" %
2219 self.op.instance_name)
2221 if instance.disk_template != constants.DT_REMOTE_RAID1:
2222 raise errors.OpPrereqError("Instance's disk layout is not"
2225 secondary_nodes = instance.secondary_nodes
2226 if not secondary_nodes:
2227 raise errors.ProgrammerError("no secondary node but using "
2228 "DT_REMOTE_RAID1 template")
2230 # check memory requirements on the secondary node
2231 target_node = secondary_nodes[0]
2232 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2233 info = nodeinfo.get(target_node, None)
2235 raise errors.OpPrereqError("Cannot get current information"
2236 " from node '%s'" % nodeinfo)
2237 if instance.memory > info['memory_free']:
2238 raise errors.OpPrereqError("Not enough memory on target node %s."
2239 " %d MB available, %d MB required" %
2240 (target_node, info['memory_free'],
2243 # check bridge existance
2244 brlist = [nic.bridge for nic in instance.nics]
2245 if not rpc.call_bridges_exist(instance.primary_node, brlist):
2246 raise errors.OpPrereqError("One or more target bridges %s does not"
2247 " exist on destination node '%s'" %
2248 (brlist, instance.primary_node))
2250 self.instance = instance
2252 def Exec(self, feedback_fn):
2253 """Failover an instance.
2255 The failover is done by shutting it down on its present node and
2256 starting it on the secondary.
2259 instance = self.instance
2261 source_node = instance.primary_node
2262 target_node = instance.secondary_nodes[0]
2264 feedback_fn("* checking disk consistency between source and target")
2265 for dev in instance.disks:
2266 # for remote_raid1, these are md over drbd
2267 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2268 if not self.op.ignore_consistency:
2269 raise errors.OpExecError("Disk %s is degraded on target node,"
2270 " aborting failover." % dev.iv_name)
2272 feedback_fn("* checking target node resource availability")
2273 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2276 raise errors.OpExecError("Could not contact target node %s." %
2279 free_memory = int(nodeinfo[target_node]['memory_free'])
2280 memory = instance.memory
2281 if memory > free_memory:
2282 raise errors.OpExecError("Not enough memory to create instance %s on"
2283 " node %s. needed %s MiB, available %s MiB" %
2284 (instance.name, target_node, memory,
2287 feedback_fn("* shutting down instance on source node")
2288 logger.Info("Shutting down instance %s on node %s" %
2289 (instance.name, source_node))
2291 if not rpc.call_instance_shutdown(source_node, instance):
2292 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2293 " anyway. Please make sure node %s is down" %
2294 (instance.name, source_node, source_node))
2296 feedback_fn("* deactivating the instance's disks on source node")
2297 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2298 raise errors.OpExecError("Can't shut down the instance's disks.")
2300 instance.primary_node = target_node
2301 # distribute new instance config to the other nodes
2302 self.cfg.AddInstance(instance)
2304 feedback_fn("* activating the instance's disks on target node")
2305 logger.Info("Starting instance %s on node %s" %
2306 (instance.name, target_node))
2308 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2309 ignore_secondaries=True)
2311 _ShutdownInstanceDisks(instance, self.cfg)
2312 raise errors.OpExecError("Can't activate the instance's disks")
2314 feedback_fn("* starting the instance on the target node")
2315 if not rpc.call_instance_start(target_node, instance, None):
2316 _ShutdownInstanceDisks(instance, self.cfg)
2317 raise errors.OpExecError("Could not start instance %s on node %s." %
2318 (instance.name, target_node))
2321 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2322 """Create a tree of block devices on the primary node.
2324 This always creates all devices.
2328 for child in device.children:
2329 if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2332 cfg.SetDiskID(device, node)
2333 new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2336 if device.physical_id is None:
2337 device.physical_id = new_id
2341 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2342 """Create a tree of block devices on a secondary node.
2344 If this device type has to be created on secondaries, create it and
2347 If not, just recurse to children keeping the same 'force' value.
2350 if device.CreateOnSecondary():
2353 for child in device.children:
2354 if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2359 cfg.SetDiskID(device, node)
2360 new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2363 if device.physical_id is None:
2364 device.physical_id = new_id
2368 def _GenerateUniqueNames(cfg, exts):
2369 """Generate a suitable LV name.
2371 This will generate a logical volume name for the given instance.
2376 new_id = cfg.GenerateUniqueID()
2377 results.append("%s%s" % (new_id, val))
2381 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2382 """Generate a drbd device complete with its children.
2385 port = cfg.AllocatePort()
2386 vgname = cfg.GetVGName()
2387 dev_data = objects.Disk(dev_type="lvm", size=size,
2388 logical_id=(vgname, names[0]))
2389 dev_meta = objects.Disk(dev_type="lvm", size=128,
2390 logical_id=(vgname, names[1]))
2391 drbd_dev = objects.Disk(dev_type="drbd", size=size,
2392 logical_id = (primary, secondary, port),
2393 children = [dev_data, dev_meta])
2397 def _GenerateDiskTemplate(cfg, template_name,
2398 instance_name, primary_node,
2399 secondary_nodes, disk_sz, swap_sz):
2400 """Generate the entire disk layout for a given template type.
2403 #TODO: compute space requirements
2405 vgname = cfg.GetVGName()
2406 if template_name == "diskless":
2408 elif template_name == "plain":
2409 if len(secondary_nodes) != 0:
2410 raise errors.ProgrammerError("Wrong template configuration")
2412 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2413 sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2414 logical_id=(vgname, names[0]),
2416 sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2417 logical_id=(vgname, names[1]),
2419 disks = [sda_dev, sdb_dev]
2420 elif template_name == "local_raid1":
2421 if len(secondary_nodes) != 0:
2422 raise errors.ProgrammerError("Wrong template configuration")
2425 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2426 ".sdb_m1", ".sdb_m2"])
2427 sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2428 logical_id=(vgname, names[0]))
2429 sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2430 logical_id=(vgname, names[1]))
2431 md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2433 children = [sda_dev_m1, sda_dev_m2])
2434 sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2435 logical_id=(vgname, names[2]))
2436 sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2437 logical_id=(vgname, names[3]))
2438 md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2440 children = [sdb_dev_m1, sdb_dev_m2])
2441 disks = [md_sda_dev, md_sdb_dev]
2442 elif template_name == constants.DT_REMOTE_RAID1:
2443 if len(secondary_nodes) != 1:
2444 raise errors.ProgrammerError("Wrong template configuration")
2445 remote_node = secondary_nodes[0]
2446 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2447 ".sdb_data", ".sdb_meta"])
2448 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2449 disk_sz, names[0:2])
2450 md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2451 children = [drbd_sda_dev], size=disk_sz)
2452 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2453 swap_sz, names[2:4])
2454 md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2455 children = [drbd_sdb_dev], size=swap_sz)
2456 disks = [md_sda_dev, md_sdb_dev]
2458 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2462 def _GetInstanceInfoText(instance):
2463 """Compute that text that should be added to the disk's metadata.
2466 return "originstname+%s" % instance.name
2469 def _CreateDisks(cfg, instance):
2470 """Create all disks for an instance.
2472 This abstracts away some work from AddInstance.
2475 instance: the instance object
2478 True or False showing the success of the creation process
2481 info = _GetInstanceInfoText(instance)
2483 for device in instance.disks:
2484 logger.Info("creating volume %s for instance %s" %
2485 (device.iv_name, instance.name))
2487 for secondary_node in instance.secondary_nodes:
2488 if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2490 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2491 (device.iv_name, device, secondary_node))
2494 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2495 logger.Error("failed to create volume %s on primary!" %
2501 def _RemoveDisks(instance, cfg):
2502 """Remove all disks for an instance.
2504 This abstracts away some work from `AddInstance()` and
2505 `RemoveInstance()`. Note that in case some of the devices couldn't
2506 be remove, the removal will continue with the other ones (compare
2507 with `_CreateDisks()`).
2510 instance: the instance object
2513 True or False showing the success of the removal proces
2516 logger.Info("removing block devices for instance %s" % instance.name)
2519 for device in instance.disks:
2520 for node, disk in device.ComputeNodeTree(instance.primary_node):
2521 cfg.SetDiskID(disk, node)
2522 if not rpc.call_blockdev_remove(node, disk):
2523 logger.Error("could not remove block device %s on node %s,"
2524 " continuing anyway" %
2525 (device.iv_name, node))
2530 class LUCreateInstance(LogicalUnit):
2531 """Create an instance.
2534 HPATH = "instance-add"
2535 HTYPE = constants.HTYPE_INSTANCE
2536 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2537 "disk_template", "swap_size", "mode", "start", "vcpus",
2540 def BuildHooksEnv(self):
2543 This runs on master, primary and secondary nodes of the instance.
2547 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2548 "INSTANCE_DISK_SIZE": self.op.disk_size,
2549 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2550 "INSTANCE_ADD_MODE": self.op.mode,
2552 if self.op.mode == constants.INSTANCE_IMPORT:
2553 env["INSTANCE_SRC_NODE"] = self.op.src_node
2554 env["INSTANCE_SRC_PATH"] = self.op.src_path
2555 env["INSTANCE_SRC_IMAGE"] = self.src_image
2557 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2558 primary_node=self.op.pnode,
2559 secondary_nodes=self.secondaries,
2560 status=self.instance_status,
2561 os_type=self.op.os_type,
2562 memory=self.op.mem_size,
2563 vcpus=self.op.vcpus,
2564 nics=[(self.inst_ip, self.op.bridge)],
2567 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2572 def CheckPrereq(self):
2573 """Check prerequisites.
2576 if self.op.mode not in (constants.INSTANCE_CREATE,
2577 constants.INSTANCE_IMPORT):
2578 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2581 if self.op.mode == constants.INSTANCE_IMPORT:
2582 src_node = getattr(self.op, "src_node", None)
2583 src_path = getattr(self.op, "src_path", None)
2584 if src_node is None or src_path is None:
2585 raise errors.OpPrereqError("Importing an instance requires source"
2586 " node and path options")
2587 src_node_full = self.cfg.ExpandNodeName(src_node)
2588 if src_node_full is None:
2589 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2590 self.op.src_node = src_node = src_node_full
2592 if not os.path.isabs(src_path):
2593 raise errors.OpPrereqError("The source path must be absolute")
2595 export_info = rpc.call_export_info(src_node, src_path)
2598 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2600 if not export_info.has_section(constants.INISECT_EXP):
2601 raise errors.ProgrammerError("Corrupted export config")
2603 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2604 if (int(ei_version) != constants.EXPORT_VERSION):
2605 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2606 (ei_version, constants.EXPORT_VERSION))
2608 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2609 raise errors.OpPrereqError("Can't import instance with more than"
2612 # FIXME: are the old os-es, disk sizes, etc. useful?
2613 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2614 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2616 self.src_image = diskimage
2617 else: # INSTANCE_CREATE
2618 if getattr(self.op, "os_type", None) is None:
2619 raise errors.OpPrereqError("No guest OS specified")
2621 # check primary node
2622 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2624 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2626 self.op.pnode = pnode.name
2628 self.secondaries = []
2629 # disk template and mirror node verification
2630 if self.op.disk_template not in constants.DISK_TEMPLATES:
2631 raise errors.OpPrereqError("Invalid disk template name")
2633 if self.op.disk_template == constants.DT_REMOTE_RAID1:
2634 if getattr(self.op, "snode", None) is None:
2635 raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2638 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2639 if snode_name is None:
2640 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2642 elif snode_name == pnode.name:
2643 raise errors.OpPrereqError("The secondary node cannot be"
2644 " the primary node.")
2645 self.secondaries.append(snode_name)
2647 # Check lv size requirements
2648 nodenames = [pnode.name] + self.secondaries
2649 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2651 # Required free disk space as a function of disk and swap space
2653 constants.DT_DISKLESS: 0,
2654 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2655 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2656 # 256 MB are added for drbd metadata, 128MB for each drbd device
2657 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2660 if self.op.disk_template not in req_size_dict:
2661 raise errors.ProgrammerError("Disk template '%s' size requirement"
2662 " is unknown" % self.op.disk_template)
2664 req_size = req_size_dict[self.op.disk_template]
2666 for node in nodenames:
2667 info = nodeinfo.get(node, None)
2669 raise errors.OpPrereqError("Cannot get current information"
2670 " from node '%s'" % nodeinfo)
2671 if req_size > info['vg_free']:
2672 raise errors.OpPrereqError("Not enough disk space on target node %s."
2673 " %d MB available, %d MB required" %
2674 (node, info['vg_free'], req_size))
2677 os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2678 if not isinstance(os_obj, objects.OS):
2679 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2680 " primary node" % self.op.os_type)
2682 # instance verification
2683 hostname1 = utils.LookupHostname(self.op.instance_name)
2685 raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2686 self.op.instance_name)
2688 self.op.instance_name = instance_name = hostname1['hostname']
2689 instance_list = self.cfg.GetInstanceList()
2690 if instance_name in instance_list:
2691 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2694 ip = getattr(self.op, "ip", None)
2695 if ip is None or ip.lower() == "none":
2697 elif ip.lower() == "auto":
2698 inst_ip = hostname1['ip']
2700 if not utils.IsValidIP(ip):
2701 raise errors.OpPrereqError("given IP address '%s' doesn't look"
2702 " like a valid IP" % ip)
2704 self.inst_ip = inst_ip
2706 command = ["fping", "-q", hostname1['ip']]
2707 result = utils.RunCmd(command)
2708 if not result.failed:
2709 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2710 (hostname1['ip'], instance_name))
2712 # bridge verification
2713 bridge = getattr(self.op, "bridge", None)
2715 self.op.bridge = self.cfg.GetDefBridge()
2717 self.op.bridge = bridge
2719 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2720 raise errors.OpPrereqError("target bridge '%s' does not exist on"
2721 " destination node '%s'" %
2722 (self.op.bridge, pnode.name))
2725 self.instance_status = 'up'
2727 self.instance_status = 'down'
2729 def Exec(self, feedback_fn):
2730 """Create and add the instance to the cluster.
2733 instance = self.op.instance_name
2734 pnode_name = self.pnode.name
2736 nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2737 if self.inst_ip is not None:
2738 nic.ip = self.inst_ip
2740 disks = _GenerateDiskTemplate(self.cfg,
2741 self.op.disk_template,
2742 instance, pnode_name,
2743 self.secondaries, self.op.disk_size,
2746 iobj = objects.Instance(name=instance, os=self.op.os_type,
2747 primary_node=pnode_name,
2748 memory=self.op.mem_size,
2749 vcpus=self.op.vcpus,
2750 nics=[nic], disks=disks,
2751 disk_template=self.op.disk_template,
2752 status=self.instance_status,
2755 feedback_fn("* creating instance disks...")
2756 if not _CreateDisks(self.cfg, iobj):
2757 _RemoveDisks(iobj, self.cfg)
2758 raise errors.OpExecError("Device creation failed, reverting...")
2760 feedback_fn("adding instance %s to cluster config" % instance)
2762 self.cfg.AddInstance(iobj)
2764 if self.op.wait_for_sync:
2765 disk_abort = not _WaitForSync(self.cfg, iobj)
2766 elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2767 # make sure the disks are not degraded (still sync-ing is ok)
2769 feedback_fn("* checking mirrors status")
2770 disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2775 _RemoveDisks(iobj, self.cfg)
2776 self.cfg.RemoveInstance(iobj.name)
2777 raise errors.OpExecError("There are some degraded disks for"
2780 feedback_fn("creating os for instance %s on node %s" %
2781 (instance, pnode_name))
2783 if iobj.disk_template != constants.DT_DISKLESS:
2784 if self.op.mode == constants.INSTANCE_CREATE:
2785 feedback_fn("* running the instance OS create scripts...")
2786 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2787 raise errors.OpExecError("could not add os for instance %s"
2789 (instance, pnode_name))
2791 elif self.op.mode == constants.INSTANCE_IMPORT:
2792 feedback_fn("* running the instance OS import scripts...")
2793 src_node = self.op.src_node
2794 src_image = self.src_image
2795 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2796 src_node, src_image):
2797 raise errors.OpExecError("Could not import os for instance"
2799 (instance, pnode_name))
2801 # also checked in the prereq part
2802 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2806 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2807 feedback_fn("* starting instance...")
2808 if not rpc.call_instance_start(pnode_name, iobj, None):
2809 raise errors.OpExecError("Could not start instance")
2812 class LUConnectConsole(NoHooksLU):
2813 """Connect to an instance's console.
2815 This is somewhat special in that it returns the command line that
2816 you need to run on the master node in order to connect to the
2820 _OP_REQP = ["instance_name"]
2822 def CheckPrereq(self):
2823 """Check prerequisites.
2825 This checks that the instance is in the cluster.
2828 instance = self.cfg.GetInstanceInfo(
2829 self.cfg.ExpandInstanceName(self.op.instance_name))
2830 if instance is None:
2831 raise errors.OpPrereqError("Instance '%s' not known" %
2832 self.op.instance_name)
2833 self.instance = instance
2835 def Exec(self, feedback_fn):
2836 """Connect to the console of an instance
2839 instance = self.instance
2840 node = instance.primary_node
2842 node_insts = rpc.call_instance_list([node])[node]
2843 if node_insts is False:
2844 raise errors.OpExecError("Can't connect to node %s." % node)
2846 if instance.name not in node_insts:
2847 raise errors.OpExecError("Instance %s is not running." % instance.name)
2849 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2851 hyper = hypervisor.GetHypervisor()
2852 console_cmd = hyper.GetShellCommandForConsole(instance.name)
2854 argv = ["ssh", "-q", "-t"]
2855 argv.extend(ssh.KNOWN_HOSTS_OPTS)
2856 argv.extend(ssh.BATCH_MODE_OPTS)
2858 argv.append(console_cmd)
2862 class LUAddMDDRBDComponent(LogicalUnit):
2863 """Adda new mirror member to an instance's disk.
2866 HPATH = "mirror-add"
2867 HTYPE = constants.HTYPE_INSTANCE
2868 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2870 def BuildHooksEnv(self):
2873 This runs on the master, the primary and all the secondaries.
2877 "NEW_SECONDARY": self.op.remote_node,
2878 "DISK_NAME": self.op.disk_name,
2880 env.update(_BuildInstanceHookEnvByObject(self.instance))
2881 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2882 self.op.remote_node,] + list(self.instance.secondary_nodes)
2885 def CheckPrereq(self):
2886 """Check prerequisites.
2888 This checks that the instance is in the cluster.
2891 instance = self.cfg.GetInstanceInfo(
2892 self.cfg.ExpandInstanceName(self.op.instance_name))
2893 if instance is None:
2894 raise errors.OpPrereqError("Instance '%s' not known" %
2895 self.op.instance_name)
2896 self.instance = instance
2898 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2899 if remote_node is None:
2900 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2901 self.remote_node = remote_node
2903 if remote_node == instance.primary_node:
2904 raise errors.OpPrereqError("The specified node is the primary node of"
2907 if instance.disk_template != constants.DT_REMOTE_RAID1:
2908 raise errors.OpPrereqError("Instance's disk layout is not"
2910 for disk in instance.disks:
2911 if disk.iv_name == self.op.disk_name:
2914 raise errors.OpPrereqError("Can't find this device ('%s') in the"
2915 " instance." % self.op.disk_name)
2916 if len(disk.children) > 1:
2917 raise errors.OpPrereqError("The device already has two slave"
2919 "This would create a 3-disk raid1"
2920 " which we don't allow.")
2923 def Exec(self, feedback_fn):
2924 """Add the mirror component
2928 instance = self.instance
2930 remote_node = self.remote_node
2931 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2932 names = _GenerateUniqueNames(self.cfg, lv_names)
2933 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2934 remote_node, disk.size, names)
2936 logger.Info("adding new mirror component on secondary")
2938 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2939 _GetInstanceInfoText(instance)):
2940 raise errors.OpExecError("Failed to create new component on secondary"
2941 " node %s" % remote_node)
2943 logger.Info("adding new mirror component on primary")
2945 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2946 _GetInstanceInfoText(instance)):
2947 # remove secondary dev
2948 self.cfg.SetDiskID(new_drbd, remote_node)
2949 rpc.call_blockdev_remove(remote_node, new_drbd)
2950 raise errors.OpExecError("Failed to create volume on primary")
2952 # the device exists now
2953 # call the primary node to add the mirror to md
2954 logger.Info("adding new mirror component to md")
2955 if not rpc.call_blockdev_addchild(instance.primary_node,
2957 logger.Error("Can't add mirror compoment to md!")
2958 self.cfg.SetDiskID(new_drbd, remote_node)
2959 if not rpc.call_blockdev_remove(remote_node, new_drbd):
2960 logger.Error("Can't rollback on secondary")
2961 self.cfg.SetDiskID(new_drbd, instance.primary_node)
2962 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2963 logger.Error("Can't rollback on primary")
2964 raise errors.OpExecError("Can't add mirror component to md array")
2966 disk.children.append(new_drbd)
2968 self.cfg.AddInstance(instance)
2970 _WaitForSync(self.cfg, instance)
2975 class LURemoveMDDRBDComponent(LogicalUnit):
2976 """Remove a component from a remote_raid1 disk.
2979 HPATH = "mirror-remove"
2980 HTYPE = constants.HTYPE_INSTANCE
2981 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2983 def BuildHooksEnv(self):
2986 This runs on the master, the primary and all the secondaries.
2990 "DISK_NAME": self.op.disk_name,
2991 "DISK_ID": self.op.disk_id,
2992 "OLD_SECONDARY": self.old_secondary,
2994 env.update(_BuildInstanceHookEnvByObject(self.instance))
2995 nl = [self.sstore.GetMasterNode(),
2996 self.instance.primary_node] + list(self.instance.secondary_nodes)
2999 def CheckPrereq(self):
3000 """Check prerequisites.
3002 This checks that the instance is in the cluster.
3005 instance = self.cfg.GetInstanceInfo(
3006 self.cfg.ExpandInstanceName(self.op.instance_name))
3007 if instance is None:
3008 raise errors.OpPrereqError("Instance '%s' not known" %
3009 self.op.instance_name)
3010 self.instance = instance
3012 if instance.disk_template != constants.DT_REMOTE_RAID1:
3013 raise errors.OpPrereqError("Instance's disk layout is not"
3015 for disk in instance.disks:
3016 if disk.iv_name == self.op.disk_name:
3019 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3020 " instance." % self.op.disk_name)
3021 for child in disk.children:
3022 if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3025 raise errors.OpPrereqError("Can't find the device with this port.")
3027 if len(disk.children) < 2:
3028 raise errors.OpPrereqError("Cannot remove the last component from"
3032 if self.child.logical_id[0] == instance.primary_node:
3036 self.old_secondary = self.child.logical_id[oid]
3038 def Exec(self, feedback_fn):
3039 """Remove the mirror component
3042 instance = self.instance
3045 logger.Info("remove mirror component")
3046 self.cfg.SetDiskID(disk, instance.primary_node)
3047 if not rpc.call_blockdev_removechild(instance.primary_node,
3049 raise errors.OpExecError("Can't remove child from mirror.")
3051 for node in child.logical_id[:2]:
3052 self.cfg.SetDiskID(child, node)
3053 if not rpc.call_blockdev_remove(node, child):
3054 logger.Error("Warning: failed to remove device from node %s,"
3055 " continuing operation." % node)
3057 disk.children.remove(child)
3058 self.cfg.AddInstance(instance)
3061 class LUReplaceDisks(LogicalUnit):
3062 """Replace the disks of an instance.
3065 HPATH = "mirrors-replace"
3066 HTYPE = constants.HTYPE_INSTANCE
3067 _OP_REQP = ["instance_name"]
3069 def BuildHooksEnv(self):
3072 This runs on the master, the primary and all the secondaries.
3076 "NEW_SECONDARY": self.op.remote_node,
3077 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3079 env.update(_BuildInstanceHookEnvByObject(self.instance))
3080 nl = [self.sstore.GetMasterNode(),
3081 self.instance.primary_node] + list(self.instance.secondary_nodes)
3084 def CheckPrereq(self):
3085 """Check prerequisites.
3087 This checks that the instance is in the cluster.
3090 instance = self.cfg.GetInstanceInfo(
3091 self.cfg.ExpandInstanceName(self.op.instance_name))
3092 if instance is None:
3093 raise errors.OpPrereqError("Instance '%s' not known" %
3094 self.op.instance_name)
3095 self.instance = instance
3097 if instance.disk_template != constants.DT_REMOTE_RAID1:
3098 raise errors.OpPrereqError("Instance's disk layout is not"
3101 if len(instance.secondary_nodes) != 1:
3102 raise errors.OpPrereqError("The instance has a strange layout,"
3103 " expected one secondary but found %d" %
3104 len(instance.secondary_nodes))
3106 remote_node = getattr(self.op, "remote_node", None)
3107 if remote_node is None:
3108 remote_node = instance.secondary_nodes[0]
3110 remote_node = self.cfg.ExpandNodeName(remote_node)
3111 if remote_node is None:
3112 raise errors.OpPrereqError("Node '%s' not known" %
3113 self.op.remote_node)
3114 if remote_node == instance.primary_node:
3115 raise errors.OpPrereqError("The specified node is the primary node of"
3117 self.op.remote_node = remote_node
3119 def Exec(self, feedback_fn):
3120 """Replace the disks of an instance.
3123 instance = self.instance
3126 remote_node = self.op.remote_node
3128 for dev in instance.disks:
3130 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3131 names = _GenerateUniqueNames(cfg, lv_names)
3132 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3133 remote_node, size, names)
3134 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3135 logger.Info("adding new mirror component on secondary for %s" %
3138 if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3139 _GetInstanceInfoText(instance)):
3140 raise errors.OpExecError("Failed to create new component on"
3141 " secondary node %s\n"
3142 "Full abort, cleanup manually!" %
3145 logger.Info("adding new mirror component on primary")
3147 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3148 _GetInstanceInfoText(instance)):
3149 # remove secondary dev
3150 cfg.SetDiskID(new_drbd, remote_node)
3151 rpc.call_blockdev_remove(remote_node, new_drbd)
3152 raise errors.OpExecError("Failed to create volume on primary!\n"
3153 "Full abort, cleanup manually!!")
3155 # the device exists now
3156 # call the primary node to add the mirror to md
3157 logger.Info("adding new mirror component to md")
3158 if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3160 logger.Error("Can't add mirror compoment to md!")
3161 cfg.SetDiskID(new_drbd, remote_node)
3162 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3163 logger.Error("Can't rollback on secondary")
3164 cfg.SetDiskID(new_drbd, instance.primary_node)
3165 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3166 logger.Error("Can't rollback on primary")
3167 raise errors.OpExecError("Full abort, cleanup manually!!")
3169 dev.children.append(new_drbd)
3170 cfg.AddInstance(instance)
3172 # this can fail as the old devices are degraded and _WaitForSync
3173 # does a combined result over all disks, so we don't check its
3175 _WaitForSync(cfg, instance, unlock=True)
3177 # so check manually all the devices
3178 for name in iv_names:
3179 dev, child, new_drbd = iv_names[name]
3180 cfg.SetDiskID(dev, instance.primary_node)
3181 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3183 raise errors.OpExecError("MD device %s is degraded!" % name)
3184 cfg.SetDiskID(new_drbd, instance.primary_node)
3185 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3187 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3189 for name in iv_names:
3190 dev, child, new_drbd = iv_names[name]
3191 logger.Info("remove mirror %s component" % name)
3192 cfg.SetDiskID(dev, instance.primary_node)
3193 if not rpc.call_blockdev_removechild(instance.primary_node,
3195 logger.Error("Can't remove child from mirror, aborting"
3196 " *this device cleanup*.\nYou need to cleanup manually!!")
3199 for node in child.logical_id[:2]:
3200 logger.Info("remove child device on %s" % node)
3201 cfg.SetDiskID(child, node)
3202 if not rpc.call_blockdev_remove(node, child):
3203 logger.Error("Warning: failed to remove device from node %s,"
3204 " continuing operation." % node)
3206 dev.children.remove(child)
3208 cfg.AddInstance(instance)
3211 class LUQueryInstanceData(NoHooksLU):
3212 """Query runtime instance data.
3215 _OP_REQP = ["instances"]
3217 def CheckPrereq(self):
3218 """Check prerequisites.
3220 This only checks the optional instance list against the existing names.
3223 if not isinstance(self.op.instances, list):
3224 raise errors.OpPrereqError("Invalid argument type 'instances'")
3225 if self.op.instances:
3226 self.wanted_instances = []
3227 names = self.op.instances
3229 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3230 if instance is None:
3231 raise errors.OpPrereqError("No such instance name '%s'" % name)
3232 self.wanted_instances.append(instance)
3234 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3235 in self.cfg.GetInstanceList()]
3239 def _ComputeDiskStatus(self, instance, snode, dev):
3240 """Compute block device status.
3243 self.cfg.SetDiskID(dev, instance.primary_node)
3244 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3245 if dev.dev_type == "drbd":
3246 # we change the snode then (otherwise we use the one passed in)
3247 if dev.logical_id[0] == instance.primary_node:
3248 snode = dev.logical_id[1]
3250 snode = dev.logical_id[0]
3253 self.cfg.SetDiskID(dev, snode)
3254 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3259 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3260 for child in dev.children]
3265 "iv_name": dev.iv_name,
3266 "dev_type": dev.dev_type,
3267 "logical_id": dev.logical_id,
3268 "physical_id": dev.physical_id,
3269 "pstatus": dev_pstatus,
3270 "sstatus": dev_sstatus,
3271 "children": dev_children,
3276 def Exec(self, feedback_fn):
3277 """Gather and return data"""
3279 for instance in self.wanted_instances:
3280 remote_info = rpc.call_instance_info(instance.primary_node,
3282 if remote_info and "state" in remote_info:
3285 remote_state = "down"
3286 if instance.status == "down":
3287 config_state = "down"
3291 disks = [self._ComputeDiskStatus(instance, None, device)
3292 for device in instance.disks]
3295 "name": instance.name,
3296 "config_state": config_state,
3297 "run_state": remote_state,
3298 "pnode": instance.primary_node,
3299 "snodes": instance.secondary_nodes,
3301 "memory": instance.memory,
3302 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3306 result[instance.name] = idict
3311 class LUQueryNodeData(NoHooksLU):
3312 """Logical unit for querying node data.
3315 _OP_REQP = ["nodes"]
3317 def CheckPrereq(self):
3318 """Check prerequisites.
3320 This only checks the optional node list against the existing names.
3323 self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3325 def Exec(self, feedback_fn):
3326 """Compute and return the list of nodes.
3329 ilist = [self.cfg.GetInstanceInfo(iname) for iname
3330 in self.cfg.GetInstanceList()]
3332 for node in [self.cfg.GetNodeInfo(name) for name in self.wanted_nodes]:
3333 result.append((node.name, node.primary_ip, node.secondary_ip,
3334 [inst.name for inst in ilist
3335 if inst.primary_node == node.name],
3336 [inst.name for inst in ilist
3337 if node.name in inst.secondary_nodes],
3342 class LUSetInstanceParms(LogicalUnit):
3343 """Modifies an instances's parameters.
3346 HPATH = "instance-modify"
3347 HTYPE = constants.HTYPE_INSTANCE
3348 _OP_REQP = ["instance_name"]
3350 def BuildHooksEnv(self):
3353 This runs on the master, primary and secondaries.
3358 args['memory'] = self.mem
3360 args['vcpus'] = self.vcpus
3361 if self.do_ip or self.do_bridge:
3365 ip = self.instance.nics[0].ip
3367 bridge = self.bridge
3369 bridge = self.instance.nics[0].bridge
3370 args['nics'] = [(ip, bridge)]
3371 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3372 nl = [self.sstore.GetMasterNode(),
3373 self.instance.primary_node] + list(self.instance.secondary_nodes)
3376 def CheckPrereq(self):
3377 """Check prerequisites.
3379 This only checks the instance list against the existing names.
3382 self.mem = getattr(self.op, "mem", None)
3383 self.vcpus = getattr(self.op, "vcpus", None)
3384 self.ip = getattr(self.op, "ip", None)
3385 self.bridge = getattr(self.op, "bridge", None)
3386 if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3387 raise errors.OpPrereqError("No changes submitted")
3388 if self.mem is not None:
3390 self.mem = int(self.mem)
3391 except ValueError, err:
3392 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3393 if self.vcpus is not None:
3395 self.vcpus = int(self.vcpus)
3396 except ValueError, err:
3397 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3398 if self.ip is not None:
3400 if self.ip.lower() == "none":
3403 if not utils.IsValidIP(self.ip):
3404 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3407 self.do_bridge = (self.bridge is not None)
3409 instance = self.cfg.GetInstanceInfo(
3410 self.cfg.ExpandInstanceName(self.op.instance_name))
3411 if instance is None:
3412 raise errors.OpPrereqError("No such instance name '%s'" %
3413 self.op.instance_name)
3414 self.op.instance_name = instance.name
3415 self.instance = instance
3418 def Exec(self, feedback_fn):
3419 """Modifies an instance.
3421 All parameters take effect only at the next restart of the instance.
3424 instance = self.instance
3426 instance.memory = self.mem
3427 result.append(("mem", self.mem))
3429 instance.vcpus = self.vcpus
3430 result.append(("vcpus", self.vcpus))
3432 instance.nics[0].ip = self.ip
3433 result.append(("ip", self.ip))
3435 instance.nics[0].bridge = self.bridge
3436 result.append(("bridge", self.bridge))
3438 self.cfg.AddInstance(instance)
3443 class LUQueryExports(NoHooksLU):
3444 """Query the exports list
3449 def CheckPrereq(self):
3450 """Check that the nodelist contains only existing nodes.
3453 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3455 def Exec(self, feedback_fn):
3456 """Compute the list of all the exported system images.
3459 a dictionary with the structure node->(export-list)
3460 where export-list is a list of the instances exported on
3464 return rpc.call_export_list(self.nodes)
3467 class LUExportInstance(LogicalUnit):
3468 """Export an instance to an image in the cluster.
3471 HPATH = "instance-export"
3472 HTYPE = constants.HTYPE_INSTANCE
3473 _OP_REQP = ["instance_name", "target_node", "shutdown"]
3475 def BuildHooksEnv(self):
3478 This will run on the master, primary node and target node.
3482 "EXPORT_NODE": self.op.target_node,
3483 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3485 env.update(_BuildInstanceHookEnvByObject(self.instance))
3486 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3487 self.op.target_node]
3490 def CheckPrereq(self):
3491 """Check prerequisites.
3493 This checks that the instance name is a valid one.
3496 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3497 self.instance = self.cfg.GetInstanceInfo(instance_name)
3498 if self.instance is None:
3499 raise errors.OpPrereqError("Instance '%s' not found" %
3500 self.op.instance_name)
3503 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3504 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3506 if self.dst_node is None:
3507 raise errors.OpPrereqError("Destination node '%s' is unknown." %
3508 self.op.target_node)
3509 self.op.target_node = self.dst_node.name
3511 def Exec(self, feedback_fn):
3512 """Export an instance to an image in the cluster.
3515 instance = self.instance
3516 dst_node = self.dst_node
3517 src_node = instance.primary_node
3518 # shutdown the instance, unless requested not to do so
3519 if self.op.shutdown:
3520 op = opcodes.OpShutdownInstance(instance_name=instance.name)
3521 self.processor.ChainOpCode(op, feedback_fn)
3523 vgname = self.cfg.GetVGName()
3528 for disk in instance.disks:
3529 if disk.iv_name == "sda":
3530 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3531 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3533 if not new_dev_name:
3534 logger.Error("could not snapshot block device %s on node %s" %
3535 (disk.logical_id[1], src_node))
3537 new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3538 logical_id=(vgname, new_dev_name),
3539 physical_id=(vgname, new_dev_name),
3540 iv_name=disk.iv_name)
3541 snap_disks.append(new_dev)
3544 if self.op.shutdown:
3545 op = opcodes.OpStartupInstance(instance_name=instance.name,
3547 self.processor.ChainOpCode(op, feedback_fn)
3549 # TODO: check for size
3551 for dev in snap_disks:
3552 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3554 logger.Error("could not export block device %s from node"
3556 (dev.logical_id[1], src_node, dst_node.name))
3557 if not rpc.call_blockdev_remove(src_node, dev):
3558 logger.Error("could not remove snapshot block device %s from"
3559 " node %s" % (dev.logical_id[1], src_node))
3561 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3562 logger.Error("could not finalize export for instance %s on node %s" %
3563 (instance.name, dst_node.name))
3565 nodelist = self.cfg.GetNodeList()
3566 nodelist.remove(dst_node.name)
3568 # on one-node clusters nodelist will be empty after the removal
3569 # if we proceed the backup would be removed because OpQueryExports
3570 # substitutes an empty list with the full cluster node list.
3572 op = opcodes.OpQueryExports(nodes=nodelist)
3573 exportlist = self.processor.ChainOpCode(op, feedback_fn)
3574 for node in exportlist:
3575 if instance.name in exportlist[node]:
3576 if not rpc.call_export_remove(node, instance.name):
3577 logger.Error("could not remove older export for instance %s"
3578 " on node %s" % (instance.name, node))
3581 class TagsLU(NoHooksLU):
3584 This is an abstract class which is the parent of all the other tags LUs.
3587 def CheckPrereq(self):
3588 """Check prerequisites.
3591 if self.op.kind == constants.TAG_CLUSTER:
3592 self.target = self.cfg.GetClusterInfo()
3593 elif self.op.kind == constants.TAG_NODE:
3594 name = self.cfg.ExpandNodeName(self.op.name)
3596 raise errors.OpPrereqError("Invalid node name (%s)" %
3599 self.target = self.cfg.GetNodeInfo(name)
3600 elif self.op.kind == constants.TAG_INSTANCE:
3601 name = self.cfg.ExpandInstanceName(name)
3603 raise errors.OpPrereqError("Invalid instance name (%s)" %
3606 self.target = self.cfg.GetInstanceInfo(name)
3608 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3612 class LUGetTags(TagsLU):
3613 """Returns the tags of a given object.
3616 _OP_REQP = ["kind", "name"]
3618 def Exec(self, feedback_fn):
3619 """Returns the tag list.
3622 return self.target.GetTags()
3625 class LUAddTag(TagsLU):
3626 """Sets a tag on a given object.
3629 _OP_REQP = ["kind", "name", "tag"]
3631 def CheckPrereq(self):
3632 """Check prerequisites.
3634 This checks the type and length of the tag name and value.
3637 TagsLU.CheckPrereq(self)
3638 objects.TaggableObject.ValidateTag(self.op.tag)
3640 def Exec(self, feedback_fn):
3645 self.target.AddTag(self.op.tag)
3646 except errors.TagError, err:
3647 raise errors.OpExecError("Error while setting tag: %s" % str(err))
3649 self.cfg.Update(self.target)
3650 except errors.ConfigurationError:
3651 raise errors.OpRetryError("There has been a modification to the"
3652 " config file and the operation has been"
3653 " aborted. Please retry.")
3656 class LUDelTag(TagsLU):
3657 """Delete a tag from a given object.
3660 _OP_REQP = ["kind", "name", "tag"]
3662 def CheckPrereq(self):
3663 """Check prerequisites.
3665 This checks that we have the given tag.
3668 TagsLU.CheckPrereq(self)
3669 objects.TaggableObject.ValidateTag(self.op.tag)
3670 if self.op.tag not in self.target.GetTags():
3671 raise errors.OpPrereqError("Tag not found")
3673 def Exec(self, feedback_fn):
3674 """Remove the tag from the object.
3677 self.target.RemoveTag(self.op.tag)
3679 self.cfg.Update(self.target)
3680 except errors.ConfigurationError:
3681 raise errors.OpRetryError("There has been a modification to the"
3682 " config file and the operation has been"
3683 " aborted. Please retry.")