#!/usr/bin/python # # Copyright (C) 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Tool to move instances from one cluster to another. """ # pylint: disable=C0103 # C0103: Invalid name move-instance import os import sys import time import logging import optparse import threading from ganeti import cli from ganeti import constants from ganeti import utils from ganeti import workerpool from ganeti import objects from ganeti import compat from ganeti import rapi import ganeti.rapi.client # pylint: disable=W0611 import ganeti.rapi.client_utils from ganeti.rapi.client import UsesRapiClient SRC_RAPI_PORT_OPT = \ cli.cli_option("--src-rapi-port", action="store", type="int", dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT, help=("Source cluster RAPI port (defaults to %s)" % constants.DEFAULT_RAPI_PORT)) SRC_CA_FILE_OPT = \ cli.cli_option("--src-ca-file", action="store", type="string", dest="src_ca_file", help=("File containing source cluster Certificate" " Authority (CA) in PEM format")) SRC_USERNAME_OPT = \ cli.cli_option("--src-username", action="store", type="string", dest="src_username", default=None, help="Source cluster username") SRC_PASSWORD_FILE_OPT = \ cli.cli_option("--src-password-file", action="store", type="string", dest="src_password_file", help="File containing source cluster password") DEST_RAPI_PORT_OPT = \ cli.cli_option("--dest-rapi-port", action="store", type="int", dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT, help=("Destination cluster RAPI port (defaults to source" " cluster RAPI port)")) DEST_CA_FILE_OPT = \ cli.cli_option("--dest-ca-file", action="store", type="string", dest="dest_ca_file", help=("File containing destination cluster Certificate" " Authority (CA) in PEM format (defaults to source" " cluster CA)")) DEST_USERNAME_OPT = \ cli.cli_option("--dest-username", action="store", type="string", dest="dest_username", default=None, help=("Destination cluster username (defaults to" " source cluster username)")) DEST_PASSWORD_FILE_OPT = \ cli.cli_option("--dest-password-file", action="store", type="string", dest="dest_password_file", help=("File containing destination cluster password" " (defaults to source cluster password)")) DEST_INSTANCE_NAME_OPT = \ cli.cli_option("--dest-instance-name", action="store", type="string", dest="dest_instance_name", help=("Instance name on destination cluster (only" " when moving exactly one instance)")) DEST_PRIMARY_NODE_OPT = \ cli.cli_option("--dest-primary-node", action="store", type="string", dest="dest_primary_node", help=("Primary node on destination cluster (only" " when moving exactly one instance)")) DEST_SECONDARY_NODE_OPT = \ cli.cli_option("--dest-secondary-node", action="store", type="string", dest="dest_secondary_node", help=("Secondary node on destination cluster (only" " when moving exactly one instance)")) PARALLEL_OPT = \ cli.cli_option("-p", "--parallel", action="store", type="int", default=1, dest="parallel", metavar="", help="Number of instances to be moved simultaneously") class Error(Exception): """Generic error. """ class Abort(Error): """Special exception for aborting import/export. """ class RapiClientFactory: """Factory class for creating RAPI clients. @ivar src_cluster_name: Source cluster name @ivar dest_cluster_name: Destination cluster name @ivar GetSourceClient: Callable returning new client for source cluster @ivar GetDestClient: Callable returning new client for destination cluster """ def __init__(self, options, src_cluster_name, dest_cluster_name): """Initializes this class. @param options: Program options @type src_cluster_name: string @param src_cluster_name: Source cluster name @type dest_cluster_name: string @param dest_cluster_name: Destination cluster name """ self.src_cluster_name = src_cluster_name self.dest_cluster_name = dest_cluster_name # TODO: Implement timeouts for RAPI connections # TODO: Support for using system default paths for verifying SSL certificate logging.debug("Using '%s' as source CA", options.src_ca_file) src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file) if options.dest_ca_file: logging.debug("Using '%s' as destination CA", options.dest_ca_file) dest_curl_config = \ rapi.client.GenericCurlConfig(cafile=options.dest_ca_file) else: logging.debug("Using source CA for destination") dest_curl_config = src_curl_config logging.debug("Source RAPI server is %s:%s", src_cluster_name, options.src_rapi_port) logging.debug("Source username is '%s'", options.src_username) if options.src_username is None: src_username = "" else: src_username = options.src_username if options.src_password_file: logging.debug("Reading '%s' for source password", options.src_password_file) src_password = utils.ReadOneLineFile(options.src_password_file, strict=True) else: logging.debug("Source has no password") src_password = None self.GetSourceClient = lambda: \ rapi.client.GanetiRapiClient(src_cluster_name, port=options.src_rapi_port, curl_config_fn=src_curl_config, username=src_username, password=src_password) if options.dest_rapi_port: dest_rapi_port = options.dest_rapi_port else: dest_rapi_port = options.src_rapi_port if options.dest_username is None: dest_username = src_username else: dest_username = options.dest_username logging.debug("Destination RAPI server is %s:%s", dest_cluster_name, dest_rapi_port) logging.debug("Destination username is '%s'", dest_username) if options.dest_password_file: logging.debug("Reading '%s' for destination password", options.dest_password_file) dest_password = utils.ReadOneLineFile(options.dest_password_file, strict=True) else: logging.debug("Using source password for destination") dest_password = src_password self.GetDestClient = lambda: \ rapi.client.GanetiRapiClient(dest_cluster_name, port=dest_rapi_port, curl_config_fn=dest_curl_config, username=dest_username, password=dest_password) class MoveJobPollReportCb(cli.JobPollReportCbBase): def __init__(self, abort_check_fn, remote_import_fn): """Initializes this class. @type abort_check_fn: callable @param abort_check_fn: Function to check whether move is aborted @type remote_import_fn: callable or None @param remote_import_fn: Callback for reporting received remote import information """ cli.JobPollReportCbBase.__init__(self) self._abort_check_fn = abort_check_fn self._remote_import_fn = remote_import_fn def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): """Handles a log message. """ if log_type == constants.ELOG_REMOTE_IMPORT: logging.debug("Received remote import information") if not self._remote_import_fn: raise RuntimeError("Received unexpected remote import information") assert "x509_ca" in log_msg assert "disks" in log_msg self._remote_import_fn(log_msg) return logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)), cli.FormatLogMessage(log_type, log_msg)) def ReportNotChanged(self, job_id, status): """Called if a job hasn't changed in a while. """ try: # Check whether we were told to abort by the other thread self._abort_check_fn() except Abort: logging.warning("Aborting despite job %s still running", job_id) raise class InstanceMove(object): """Status class for instance moves. """ def __init__(self, src_instance_name, dest_instance_name, dest_pnode, dest_snode, dest_iallocator, hvparams, beparams, osparams, nics): """Initializes this class. @type src_instance_name: string @param src_instance_name: Instance name on source cluster @type dest_instance_name: string @param dest_instance_name: Instance name on destination cluster @type dest_pnode: string or None @param dest_pnode: Name of primary node on destination cluster @type dest_snode: string or None @param dest_snode: Name of secondary node on destination cluster @type dest_iallocator: string or None @param dest_iallocator: Name of iallocator to use @type hvparams: dict or None @param hvparams: Hypervisor parameters to override @type beparams: dict or None @param beparams: Backend parameters to override @type osparams: dict or None @param osparams: OS parameters to override @type nics: dict or None @param nics: NICs to override """ self.src_instance_name = src_instance_name self.dest_instance_name = dest_instance_name self.dest_pnode = dest_pnode self.dest_snode = dest_snode self.dest_iallocator = dest_iallocator self.hvparams = hvparams self.beparams = beparams self.osparams = osparams self.nics = nics self.error_message = None class MoveRuntime(object): """Class to keep track of instance move. """ def __init__(self, move): """Initializes this class. @type move: L{InstanceMove} """ self.move = move # Thread synchronization self.lock = threading.Lock() self.source_to_dest = threading.Condition(self.lock) self.dest_to_source = threading.Condition(self.lock) # Source information self.src_error_message = None self.src_expinfo = None self.src_instinfo = None # Destination information self.dest_error_message = None self.dest_impinfo = None def HandleErrors(self, prefix, fn, *args): """Wrapper to catch errors and abort threads. @type prefix: string @param prefix: Variable name prefix ("src" or "dest") @type fn: callable @param fn: Function """ assert prefix in ("dest", "src") try: # Call inner function fn(*args) errmsg = None except Abort: errmsg = "Aborted" except Exception, err: logging.exception("Caught unhandled exception") errmsg = str(err) setattr(self, "%s_error_message" % prefix, errmsg) self.lock.acquire() try: self.source_to_dest.notifyAll() self.dest_to_source.notifyAll() finally: self.lock.release() def CheckAbort(self): """Check whether thread should be aborted. @raise Abort: When thread should be aborted """ if not (self.src_error_message is None and self.dest_error_message is None): logging.info("Aborting") raise Abort() def Wait(self, cond, check_fn): """Waits for a condition to become true. @type cond: threading.Condition @param cond: Threading condition @type check_fn: callable @param check_fn: Function to check whether condition is true """ cond.acquire() try: while check_fn(self): self.CheckAbort() cond.wait() finally: cond.release() def PollJob(self, cl, job_id, remote_import_fn=None): """Wrapper for polling a job. @type cl: L{rapi.client.GanetiRapiClient} @param cl: RAPI client @type job_id: string @param job_id: Job ID @type remote_import_fn: callable or None @param remote_import_fn: Callback for reporting received remote import information """ return rapi.client_utils.PollJob(cl, job_id, MoveJobPollReportCb(self.CheckAbort, remote_import_fn)) class MoveDestExecutor(object): def __init__(self, dest_client, mrt): """Destination side of an instance move. @type dest_client: L{rapi.client.GanetiRapiClient} @param dest_client: RAPI client @type mrt: L{MoveRuntime} @param mrt: Instance move runtime information """ logging.debug("Waiting for instance information to become available") mrt.Wait(mrt.source_to_dest, lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None) logging.info("Creating instance %s in remote-import mode", mrt.move.dest_instance_name) job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name, mrt.move.dest_pnode, mrt.move.dest_snode, mrt.move.dest_iallocator, mrt.src_instinfo, mrt.src_expinfo, mrt.move.hvparams, mrt.move.beparams, mrt.move.beparams, mrt.move.nics) mrt.PollJob(dest_client, job_id, remote_import_fn=compat.partial(self._SetImportInfo, mrt)) logging.info("Import successful") @staticmethod def _SetImportInfo(mrt, impinfo): """Sets the remote import information and notifies source thread. @type mrt: L{MoveRuntime} @param mrt: Instance move runtime information @param impinfo: Remote import information """ mrt.dest_to_source.acquire() try: mrt.dest_impinfo = impinfo mrt.dest_to_source.notifyAll() finally: mrt.dest_to_source.release() @staticmethod def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo, override_hvparams, override_beparams, override_osparams, override_nics): """Starts the instance creation in remote import mode. @type cl: L{rapi.client.GanetiRapiClient} @param cl: RAPI client @type name: string @param name: Instance name @type pnode: string or None @param pnode: Name of primary node on destination cluster @type snode: string or None @param snode: Name of secondary node on destination cluster @type iallocator: string or None @param iallocator: Name of iallocator to use @type instance: dict @param instance: Instance details from source cluster @type expinfo: dict @param expinfo: Prepared export information from source cluster @type override_hvparams: dict or None @param override_hvparams: Hypervisor parameters to override @type override_beparams: dict or None @param override_beparams: Backend parameters to override @type override_osparams: dict or None @param override_osparams: OS parameters to override @type override_nics: dict or None @param override_nics: NICs to override @return: Job ID """ disk_template = instance["disk_template"] disks = [{ constants.IDISK_SIZE: i["size"], constants.IDISK_MODE: i["mode"], } for i in instance["disks"]] nics = [{ constants.INIC_IP: ip, constants.INIC_MAC: mac, constants.INIC_MODE: mode, constants.INIC_LINK: link, constants.INIC_NETWORK: network } for ip, mac, mode, link, network, _ in instance["nics"]] if len(override_nics) > len(nics): raise Error("Can not create new NICs") if override_nics: assert len(override_nics) <= len(nics) for idx, (nic, override) in enumerate(zip(nics, override_nics)): nics[idx] = objects.FillDict(nic, override) # TODO: Should this be the actual up/down status? (run_state) start = (instance["config_state"] == "up") assert len(disks) == len(instance["disks"]) assert len(nics) == len(instance["nics"]) inst_beparams = instance["be_instance"] if not inst_beparams: inst_beparams = {} inst_hvparams = instance["hv_instance"] if not inst_hvparams: inst_hvparams = {} inst_osparams = instance["os_instance"] if not inst_osparams: inst_osparams = {} return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT, name, disk_template, disks, nics, os=instance["os"], pnode=pnode, snode=snode, start=start, ip_check=False, iallocator=iallocator, hypervisor=instance["hypervisor"], source_handshake=expinfo["handshake"], source_x509_ca=expinfo["x509_ca"], source_instance_name=instance["name"], beparams=objects.FillDict(inst_beparams, override_beparams), hvparams=objects.FillDict(inst_hvparams, override_hvparams), osparams=objects.FillDict(inst_osparams, override_osparams)) class MoveSourceExecutor(object): def __init__(self, src_client, mrt): """Source side of an instance move. @type src_client: L{rapi.client.GanetiRapiClient} @param src_client: RAPI client @type mrt: L{MoveRuntime} @param mrt: Instance move runtime information """ logging.info("Checking whether instance exists") self._CheckInstance(src_client, mrt.move.src_instance_name) logging.info("Retrieving instance information from source cluster") instinfo = self._GetInstanceInfo(src_client, mrt.PollJob, mrt.move.src_instance_name) if instinfo["disk_template"] == constants.DT_FILE: raise Error("Inter-cluster move of file-based instances is not" " supported.") logging.info("Preparing export on source cluster") expinfo = self._PrepareExport(src_client, mrt.PollJob, mrt.move.src_instance_name) assert "handshake" in expinfo assert "x509_key_name" in expinfo assert "x509_ca" in expinfo # Hand information to destination thread mrt.source_to_dest.acquire() try: mrt.src_instinfo = instinfo mrt.src_expinfo = expinfo mrt.source_to_dest.notifyAll() finally: mrt.source_to_dest.release() logging.info("Waiting for destination information to become available") mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None) logging.info("Starting remote export on source cluster") self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name, expinfo["x509_key_name"], mrt.dest_impinfo) logging.info("Export successful") @staticmethod def _CheckInstance(cl, name): """Checks whether the instance exists on the source cluster. @type cl: L{rapi.client.GanetiRapiClient} @param cl: RAPI client @type name: string @param name: Instance name """ try: cl.GetInstance(name) except rapi.client.GanetiApiError, err: if err.code == rapi.client.HTTP_NOT_FOUND: raise Error("Instance %s not found (%s)" % (name, str(err))) raise @staticmethod def _GetInstanceInfo(cl, poll_job_fn, name): """Retrieves detailed instance information from source cluster. @type cl: L{rapi.client.GanetiRapiClient} @param cl: RAPI client @type poll_job_fn: callable @param poll_job_fn: Function to poll for job result @type name: string @param name: Instance name """ job_id = cl.GetInstanceInfo(name, static=True) result = poll_job_fn(cl, job_id) assert len(result[0].keys()) == 1 return result[0][result[0].keys()[0]] @staticmethod def _PrepareExport(cl, poll_job_fn, name): """Prepares export on source cluster. @type cl: L{rapi.client.GanetiRapiClient} @param cl: RAPI client @type poll_job_fn: callable @param poll_job_fn: Function to poll for job result @type name: string @param name: Instance name """ job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE) return poll_job_fn(cl, job_id)[0] @staticmethod def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo): """Exports instance from source cluster. @type cl: L{rapi.client.GanetiRapiClient} @param cl: RAPI client @type poll_job_fn: callable @param poll_job_fn: Function to poll for job result @type name: string @param name: Instance name @param x509_key_name: Source X509 key @param impinfo: Import information from destination cluster """ job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE, impinfo["disks"], shutdown=True, remove_instance=True, x509_key_name=x509_key_name, destination_x509_ca=impinfo["x509_ca"]) (fin_resu, dresults) = poll_job_fn(cl, job_id)[0] if not (fin_resu and compat.all(dresults)): raise Error("Export failed for disks %s" % utils.CommaJoin(str(idx) for idx, result in enumerate(dresults) if not result)) class MoveSourceWorker(workerpool.BaseWorker): def RunTask(self, rapi_factory, move): # pylint: disable=W0221 """Executes an instance move. @type rapi_factory: L{RapiClientFactory} @param rapi_factory: RAPI client factory @type move: L{InstanceMove} @param move: Instance move information """ try: logging.info("Preparing to move %s from cluster %s to %s as %s", move.src_instance_name, rapi_factory.src_cluster_name, rapi_factory.dest_cluster_name, move.dest_instance_name) mrt = MoveRuntime(move) logging.debug("Starting destination thread") dest_thread = threading.Thread(name="DestFor%s" % self.getName(), target=mrt.HandleErrors, args=("dest", MoveDestExecutor, rapi_factory.GetDestClient(), mrt, )) dest_thread.start() try: mrt.HandleErrors("src", MoveSourceExecutor, rapi_factory.GetSourceClient(), mrt) finally: dest_thread.join() if mrt.src_error_message or mrt.dest_error_message: move.error_message = ("Source error: %s, destination error: %s" % (mrt.src_error_message, mrt.dest_error_message)) else: move.error_message = None except Exception, err: # pylint: disable=W0703 logging.exception("Caught unhandled exception") move.error_message = str(err) def CheckRapiSetup(rapi_factory): """Checks the RAPI setup by retrieving the version. @type rapi_factory: L{RapiClientFactory} @param rapi_factory: RAPI client factory """ src_client = rapi_factory.GetSourceClient() logging.info("Connecting to source RAPI server") logging.info("Source cluster RAPI version: %s", src_client.GetVersion()) dest_client = rapi_factory.GetDestClient() logging.info("Connecting to destination RAPI server") logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion()) def ParseOptions(): """Parses options passed to program. """ program = os.path.basename(sys.argv[0]) parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" " " " "), prog=program) parser.add_option(cli.DEBUG_OPT) parser.add_option(cli.VERBOSE_OPT) parser.add_option(cli.IALLOCATOR_OPT) parser.add_option(cli.BACKEND_OPT) parser.add_option(cli.HVOPTS_OPT) parser.add_option(cli.OSPARAMS_OPT) parser.add_option(cli.NET_OPT) parser.add_option(SRC_RAPI_PORT_OPT) parser.add_option(SRC_CA_FILE_OPT) parser.add_option(SRC_USERNAME_OPT) parser.add_option(SRC_PASSWORD_FILE_OPT) parser.add_option(DEST_RAPI_PORT_OPT) parser.add_option(DEST_CA_FILE_OPT) parser.add_option(DEST_USERNAME_OPT) parser.add_option(DEST_PASSWORD_FILE_OPT) parser.add_option(DEST_INSTANCE_NAME_OPT) parser.add_option(DEST_PRIMARY_NODE_OPT) parser.add_option(DEST_SECONDARY_NODE_OPT) parser.add_option(PARALLEL_OPT) (options, args) = parser.parse_args() return (parser, options, args) def CheckOptions(parser, options, args): """Checks options and arguments for validity. """ if len(args) < 3: parser.error("Not enough arguments") src_cluster_name = args.pop(0) dest_cluster_name = args.pop(0) instance_names = args assert len(instance_names) > 0 # TODO: Remove once using system default paths for SSL certificate # verification is implemented if not options.src_ca_file: parser.error("Missing source cluster CA file") if options.parallel < 1: parser.error("Number of simultaneous moves must be >= 1") if not (bool(options.iallocator) ^ bool(options.dest_primary_node or options.dest_secondary_node)): parser.error("Destination node and iallocator options exclude each other") if len(instance_names) == 1: # Moving one instance only if not (options.iallocator or options.dest_primary_node or options.dest_secondary_node): parser.error("An iallocator or the destination node is required") if options.hvparams: utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES) if options.beparams: utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES) if options.nics: options.nics = cli.ParseNicOption(options.nics) else: # Moving more than one instance if (options.dest_instance_name or options.dest_primary_node or options.dest_secondary_node or options.hvparams or options.beparams or options.osparams or options.nics): parser.error("The options --dest-instance-name, --dest-primary-node," " --dest-secondary-node, --hypervisor-parameters," " --backend-parameters, --os-parameters and --net can" " only be used when moving exactly one instance") if not options.iallocator: parser.error("An iallocator must be specified for moving more than one" " instance") return (src_cluster_name, dest_cluster_name, instance_names) @UsesRapiClient def main(): """Main routine. """ (parser, options, args) = ParseOptions() utils.SetupToolLogging(options.debug, options.verbose, threadname=True) (src_cluster_name, dest_cluster_name, instance_names) = \ CheckOptions(parser, options, args) logging.info("Source cluster: %s", src_cluster_name) logging.info("Destination cluster: %s", dest_cluster_name) logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names)) rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name) CheckRapiSetup(rapi_factory) assert (len(instance_names) == 1 or not (options.dest_primary_node or options.dest_secondary_node)) assert len(instance_names) == 1 or options.iallocator assert (len(instance_names) > 1 or options.iallocator or options.dest_primary_node or options.dest_secondary_node) assert (len(instance_names) == 1 or not (options.hvparams or options.beparams or options.osparams or options.nics)) # Prepare list of instance moves moves = [] for src_instance_name in instance_names: if options.dest_instance_name: assert len(instance_names) == 1 # Rename instance dest_instance_name = options.dest_instance_name else: dest_instance_name = src_instance_name moves.append(InstanceMove(src_instance_name, dest_instance_name, options.dest_primary_node, options.dest_secondary_node, options.iallocator, options.hvparams, options.beparams, options.osparams, options.nics)) assert len(moves) == len(instance_names) # Start workerpool wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker) try: # Add instance moves to workerpool for move in moves: wp.AddTask((rapi_factory, move)) # Wait for all moves to finish wp.Quiesce() finally: wp.TerminateWorkers() # There should be no threads running at this point, hence not using locks # anymore logging.info("Instance move results:") for move in moves: if move.dest_instance_name == move.src_instance_name: name = move.src_instance_name else: name = "%s as %s" % (move.src_instance_name, move.dest_instance_name) if move.error_message: msg = "Failed (%s)" % move.error_message else: msg = "Success" logging.info("%s: %s", name, msg) if compat.any(move.error_message for move in moves): sys.exit(constants.EXIT_FAILURE) sys.exit(constants.EXIT_SUCCESS) if __name__ == "__main__": main()