4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable-msg=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import compat
40 from ganeti import rapi
42 import ganeti.rapi.client # pylint: disable-msg=W0611
43 import ganeti.rapi.client_utils
47 cli.cli_option("--src-rapi-port", action="store", type="int",
48 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49 help=("Source cluster RAPI port (defaults to %s)" %
50 constants.DEFAULT_RAPI_PORT))
53 cli.cli_option("--src-ca-file", action="store", type="string",
55 help=("File containing source cluster Certificate"
56 " Authority (CA) in PEM format"))
59 cli.cli_option("--src-username", action="store", type="string",
60 dest="src_username", default=None,
61 help="Source cluster username")
63 SRC_PASSWORD_FILE_OPT = \
64 cli.cli_option("--src-password-file", action="store", type="string",
65 dest="src_password_file",
66 help="File containing source cluster password")
68 DEST_RAPI_PORT_OPT = \
69 cli.cli_option("--dest-rapi-port", action="store", type="int",
70 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71 help=("Destination cluster RAPI port (defaults to source"
72 " cluster RAPI port)"))
75 cli.cli_option("--dest-ca-file", action="store", type="string",
77 help=("File containing destination cluster Certificate"
78 " Authority (CA) in PEM format (defaults to source"
82 cli.cli_option("--dest-username", action="store", type="string",
83 dest="dest_username", default=None,
84 help=("Destination cluster username (defaults to"
85 " source cluster username)"))
87 DEST_PASSWORD_FILE_OPT = \
88 cli.cli_option("--dest-password-file", action="store", type="string",
89 dest="dest_password_file",
90 help=("File containing destination cluster password"
91 " (defaults to source cluster password)"))
93 DEST_INSTANCE_NAME_OPT = \
94 cli.cli_option("--dest-instance-name", action="store", type="string",
95 dest="dest_instance_name",
96 help=("Instance name on destination cluster (only"
97 " when moving exactly one instance)"))
99 DEST_PRIMARY_NODE_OPT = \
100 cli.cli_option("--dest-primary-node", action="store", type="string",
101 dest="dest_primary_node",
102 help=("Primary node on destination cluster (only"
103 " when moving exactly one instance)"))
105 DEST_SECONDARY_NODE_OPT = \
106 cli.cli_option("--dest-secondary-node", action="store", type="string",
107 dest="dest_secondary_node",
108 help=("Secondary node on destination cluster (only"
109 " when moving exactly one instance)"))
112 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113 dest="parallel", metavar="<number>",
114 help="Number of instances to be moved simultaneously")
117 class Error(Exception):
124 """Special exception for aborting import/export.
129 class RapiClientFactory:
130 """Factory class for creating RAPI clients.
132 @ivar src_cluster_name: Source cluster name
133 @ivar dest_cluster_name: Destination cluster name
134 @ivar GetSourceClient: Callable returning new client for source cluster
135 @ivar GetDestClient: Callable returning new client for destination cluster
138 def __init__(self, options, src_cluster_name, dest_cluster_name):
139 """Initializes this class.
141 @param options: Program options
142 @type src_cluster_name: string
143 @param src_cluster_name: Source cluster name
144 @type dest_cluster_name: string
145 @param dest_cluster_name: Destination cluster name
148 self.src_cluster_name = src_cluster_name
149 self.dest_cluster_name = dest_cluster_name
151 # TODO: Implement timeouts for RAPI connections
152 # TODO: Support for using system default paths for verifying SSL certificate
153 logging.debug("Using '%s' as source CA", options.src_ca_file)
154 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
156 if options.dest_ca_file:
157 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
159 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
161 logging.debug("Using source CA for destination")
162 dest_curl_config = src_curl_config
164 logging.debug("Source RAPI server is %s:%s",
165 src_cluster_name, options.src_rapi_port)
166 logging.debug("Source username is '%s'", options.src_username)
168 if options.src_username is None:
171 src_username = options.src_username
173 if options.src_password_file:
174 logging.debug("Reading '%s' for source password",
175 options.src_password_file)
176 src_password = utils.ReadOneLineFile(options.src_password_file,
179 logging.debug("Source has no password")
182 self.GetSourceClient = lambda: \
183 rapi.client.GanetiRapiClient(src_cluster_name,
184 port=options.src_rapi_port,
185 curl_config_fn=src_curl_config,
186 username=src_username,
187 password=src_password)
189 if options.dest_rapi_port:
190 dest_rapi_port = options.dest_rapi_port
192 dest_rapi_port = options.src_rapi_port
194 if options.dest_username is None:
195 dest_username = src_username
197 dest_username = options.dest_username
199 logging.debug("Destination RAPI server is %s:%s",
200 dest_cluster_name, dest_rapi_port)
201 logging.debug("Destination username is '%s'", dest_username)
203 if options.dest_password_file:
204 logging.debug("Reading '%s' for destination password",
205 options.dest_password_file)
206 dest_password = utils.ReadOneLineFile(options.dest_password_file,
209 logging.debug("Using source password for destination")
210 dest_password = src_password
212 self.GetDestClient = lambda: \
213 rapi.client.GanetiRapiClient(dest_cluster_name,
215 curl_config_fn=dest_curl_config,
216 username=dest_username,
217 password=dest_password)
220 class MoveJobPollReportCb(cli.JobPollReportCbBase):
221 def __init__(self, abort_check_fn, remote_import_fn):
222 """Initializes this class.
224 @type abort_check_fn: callable
225 @param abort_check_fn: Function to check whether move is aborted
226 @type remote_import_fn: callable or None
227 @param remote_import_fn: Callback for reporting received remote import
231 cli.JobPollReportCbBase.__init__(self)
232 self._abort_check_fn = abort_check_fn
233 self._remote_import_fn = remote_import_fn
235 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236 """Handles a log message.
239 if log_type == constants.ELOG_REMOTE_IMPORT:
240 logging.debug("Received remote import information")
242 if not self._remote_import_fn:
243 raise RuntimeError("Received unexpected remote import information")
245 assert "x509_ca" in log_msg
246 assert "disks" in log_msg
248 self._remote_import_fn(log_msg)
252 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253 utils.SafeEncode(log_msg))
255 def ReportNotChanged(self, job_id, status):
256 """Called if a job hasn't changed in a while.
260 # Check whether we were told to abort by the other thread
261 self._abort_check_fn()
263 logging.warning("Aborting despite job %s still running", job_id)
267 class InstanceMove(object):
268 """Status class for instance moves.
271 def __init__(self, src_instance_name, dest_instance_name,
272 dest_pnode, dest_snode, dest_iallocator):
273 """Initializes this class.
275 @type src_instance_name: string
276 @param src_instance_name: Instance name on source cluster
277 @type dest_instance_name: string
278 @param dest_instance_name: Instance name on destination cluster
279 @type dest_pnode: string or None
280 @param dest_pnode: Name of primary node on destination cluster
281 @type dest_snode: string or None
282 @param dest_snode: Name of secondary node on destination cluster
283 @type dest_iallocator: string or None
284 @param dest_iallocator: Name of iallocator to use
287 self.src_instance_name = src_instance_name
288 self.dest_instance_name = dest_instance_name
289 self.dest_pnode = dest_pnode
290 self.dest_snode = dest_snode
291 self.dest_iallocator = dest_iallocator
293 self.error_message = None
296 class MoveRuntime(object):
297 """Class to keep track of instance move.
300 def __init__(self, move):
301 """Initializes this class.
303 @type move: L{InstanceMove}
308 # Thread synchronization
309 self.lock = threading.Lock()
310 self.source_to_dest = threading.Condition(self.lock)
311 self.dest_to_source = threading.Condition(self.lock)
314 self.src_error_message = None
315 self.src_expinfo = None
316 self.src_instinfo = None
318 # Destination information
319 self.dest_error_message = None
320 self.dest_impinfo = None
322 def HandleErrors(self, prefix, fn, *args):
323 """Wrapper to catch errors and abort threads.
326 @param prefix: Variable name prefix ("src" or "dest")
331 assert prefix in ("dest", "src")
334 # Call inner function
340 except Exception, err:
341 logging.exception("Caught unhandled exception")
344 setattr(self, "%s_error_message" % prefix, errmsg)
348 self.source_to_dest.notifyAll()
349 self.dest_to_source.notifyAll()
353 def CheckAbort(self):
354 """Check whether thread should be aborted.
356 @raise Abort: When thread should be aborted
359 if not (self.src_error_message is None and
360 self.dest_error_message is None):
361 logging.info("Aborting")
364 def Wait(self, cond, check_fn):
365 """Waits for a condition to become true.
367 @type cond: threading.Condition
368 @param cond: Threading condition
369 @type check_fn: callable
370 @param check_fn: Function to check whether condition is true
375 while check_fn(self):
381 def PollJob(self, cl, job_id, remote_import_fn=None):
382 """Wrapper for polling a job.
384 @type cl: L{rapi.client.GanetiRapiClient}
385 @param cl: RAPI client
387 @param job_id: Job ID
388 @type remote_import_fn: callable or None
389 @param remote_import_fn: Callback for reporting received remote import
393 return rapi.client_utils.PollJob(cl, job_id,
394 MoveJobPollReportCb(self.CheckAbort,
398 class MoveDestExecutor(object):
399 def __init__(self, dest_client, mrt):
400 """Destination side of an instance move.
402 @type dest_client: L{rapi.client.GanetiRapiClient}
403 @param dest_client: RAPI client
404 @type mrt: L{MoveRuntime}
405 @param mrt: Instance move runtime information
408 logging.debug("Waiting for instance information to become available")
409 mrt.Wait(mrt.source_to_dest,
410 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
412 logging.info("Creating instance %s in remote-import mode",
413 mrt.move.dest_instance_name)
414 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
415 mrt.move.dest_pnode, mrt.move.dest_snode,
416 mrt.move.dest_iallocator,
417 mrt.src_instinfo, mrt.src_expinfo)
418 mrt.PollJob(dest_client, job_id,
419 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
421 logging.info("Import successful")
424 def _SetImportInfo(mrt, impinfo):
425 """Sets the remote import information and notifies source thread.
427 @type mrt: L{MoveRuntime}
428 @param mrt: Instance move runtime information
429 @param impinfo: Remote import information
432 mrt.dest_to_source.acquire()
434 mrt.dest_impinfo = impinfo
435 mrt.dest_to_source.notifyAll()
437 mrt.dest_to_source.release()
440 def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
441 """Starts the instance creation in remote import mode.
443 @type cl: L{rapi.client.GanetiRapiClient}
444 @param cl: RAPI client
446 @param name: Instance name
447 @type pnode: string or None
448 @param pnode: Name of primary node on destination cluster
449 @type snode: string or None
450 @param snode: Name of secondary node on destination cluster
451 @type iallocator: string or None
452 @param iallocator: Name of iallocator to use
454 @param instance: Instance details from source cluster
456 @param expinfo: Prepared export information from source cluster
460 disk_template = instance["disk_template"]
465 } for i in instance["disks"]]
472 } for ip, mac, mode, link in instance["nics"]]
474 # TODO: Should this be the actual up/down status? (run_state)
475 start = (instance["config_state"] == "up")
477 assert len(disks) == len(instance["disks"])
478 assert len(nics) == len(instance["nics"])
480 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
481 name, disk_template, disks, nics,
487 iallocator=iallocator,
488 hypervisor=instance["hypervisor"],
489 source_handshake=expinfo["handshake"],
490 source_x509_ca=expinfo["x509_ca"],
491 source_instance_name=instance["name"],
492 beparams=instance["be_instance"],
493 hvparams=instance["hv_instance"])
496 class MoveSourceExecutor(object):
497 def __init__(self, src_client, mrt):
498 """Source side of an instance move.
500 @type src_client: L{rapi.client.GanetiRapiClient}
501 @param src_client: RAPI client
502 @type mrt: L{MoveRuntime}
503 @param mrt: Instance move runtime information
506 logging.info("Checking whether instance exists")
507 self._CheckInstance(src_client, mrt.move.src_instance_name)
509 logging.info("Retrieving instance information from source cluster")
510 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
511 mrt.move.src_instance_name)
513 logging.info("Preparing export on source cluster")
514 expinfo = self._PrepareExport(src_client, mrt.PollJob,
515 mrt.move.src_instance_name)
516 assert "handshake" in expinfo
517 assert "x509_key_name" in expinfo
518 assert "x509_ca" in expinfo
520 # Hand information to destination thread
521 mrt.source_to_dest.acquire()
523 mrt.src_instinfo = instinfo
524 mrt.src_expinfo = expinfo
525 mrt.source_to_dest.notifyAll()
527 mrt.source_to_dest.release()
529 logging.info("Waiting for destination information to become available")
530 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
532 logging.info("Starting remote export on source cluster")
533 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
534 expinfo["x509_key_name"], mrt.dest_impinfo)
536 logging.info("Export successful")
539 def _CheckInstance(cl, name):
540 """Checks whether the instance exists on the source cluster.
542 @type cl: L{rapi.client.GanetiRapiClient}
543 @param cl: RAPI client
545 @param name: Instance name
550 except rapi.client.GanetiApiError, err:
551 if err.code == rapi.client.HTTP_NOT_FOUND:
552 raise Error("Instance %s not found (%s)" % (name, str(err)))
556 def _GetInstanceInfo(cl, poll_job_fn, name):
557 """Retrieves detailed instance information from source cluster.
559 @type cl: L{rapi.client.GanetiRapiClient}
560 @param cl: RAPI client
561 @type poll_job_fn: callable
562 @param poll_job_fn: Function to poll for job result
564 @param name: Instance name
567 job_id = cl.GetInstanceInfo(name, static=True)
568 result = poll_job_fn(cl, job_id)
569 assert len(result[0].keys()) == 1
570 return result[0][result[0].keys()[0]]
573 def _PrepareExport(cl, poll_job_fn, name):
574 """Prepares export on source cluster.
576 @type cl: L{rapi.client.GanetiRapiClient}
577 @param cl: RAPI client
578 @type poll_job_fn: callable
579 @param poll_job_fn: Function to poll for job result
581 @param name: Instance name
584 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
585 return poll_job_fn(cl, job_id)[0]
588 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
589 """Exports instance from source cluster.
591 @type cl: L{rapi.client.GanetiRapiClient}
592 @param cl: RAPI client
593 @type poll_job_fn: callable
594 @param poll_job_fn: Function to poll for job result
596 @param name: Instance name
597 @param x509_key_name: Source X509 key
598 @param impinfo: Import information from destination cluster
601 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
602 impinfo["disks"], shutdown=True,
603 remove_instance=True,
604 x509_key_name=x509_key_name,
605 destination_x509_ca=impinfo["x509_ca"])
606 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
608 if not (fin_resu and compat.all(dresults)):
609 raise Error("Export failed for disks %s" %
610 utils.CommaJoin(str(idx) for idx, result
611 in enumerate(dresults) if not result))
614 class MoveSourceWorker(workerpool.BaseWorker):
615 def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
616 """Executes an instance move.
618 @type rapi_factory: L{RapiClientFactory}
619 @param rapi_factory: RAPI client factory
620 @type move: L{InstanceMove}
621 @param move: Instance move information
625 logging.info("Preparing to move %s from cluster %s to %s as %s",
626 move.src_instance_name, rapi_factory.src_cluster_name,
627 rapi_factory.dest_cluster_name, move.dest_instance_name)
629 mrt = MoveRuntime(move)
631 logging.debug("Starting destination thread")
632 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
633 target=mrt.HandleErrors,
634 args=("dest", MoveDestExecutor,
635 rapi_factory.GetDestClient(),
639 mrt.HandleErrors("src", MoveSourceExecutor,
640 rapi_factory.GetSourceClient(), mrt)
644 if mrt.src_error_message or mrt.dest_error_message:
645 move.error_message = ("Source error: %s, destination error: %s" %
646 (mrt.src_error_message, mrt.dest_error_message))
648 move.error_message = None
649 except Exception, err: # pylint: disable-msg=W0703
650 logging.exception("Caught unhandled exception")
651 move.error_message = str(err)
654 def CheckRapiSetup(rapi_factory):
655 """Checks the RAPI setup by retrieving the version.
657 @type rapi_factory: L{RapiClientFactory}
658 @param rapi_factory: RAPI client factory
661 src_client = rapi_factory.GetSourceClient()
662 logging.info("Connecting to source RAPI server")
663 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
665 dest_client = rapi_factory.GetDestClient()
666 logging.info("Connecting to destination RAPI server")
667 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
670 def SetupLogging(options):
671 """Setting up logging infrastructure.
673 @param options: Parsed command line options
676 fmt = "%(asctime)s: %(threadName)s "
677 if options.debug or options.verbose:
678 fmt += "%(levelname)s "
681 formatter = logging.Formatter(fmt)
683 stderr_handler = logging.StreamHandler()
684 stderr_handler.setFormatter(formatter)
686 stderr_handler.setLevel(logging.NOTSET)
687 elif options.verbose:
688 stderr_handler.setLevel(logging.INFO)
690 stderr_handler.setLevel(logging.ERROR)
692 root_logger = logging.getLogger("")
693 root_logger.setLevel(logging.NOTSET)
694 root_logger.addHandler(stderr_handler)
698 """Parses options passed to program.
701 program = os.path.basename(sys.argv[0])
703 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
704 " <source-cluster> <dest-cluster>"
707 parser.add_option(cli.DEBUG_OPT)
708 parser.add_option(cli.VERBOSE_OPT)
709 parser.add_option(cli.IALLOCATOR_OPT)
710 parser.add_option(SRC_RAPI_PORT_OPT)
711 parser.add_option(SRC_CA_FILE_OPT)
712 parser.add_option(SRC_USERNAME_OPT)
713 parser.add_option(SRC_PASSWORD_FILE_OPT)
714 parser.add_option(DEST_RAPI_PORT_OPT)
715 parser.add_option(DEST_CA_FILE_OPT)
716 parser.add_option(DEST_USERNAME_OPT)
717 parser.add_option(DEST_PASSWORD_FILE_OPT)
718 parser.add_option(DEST_INSTANCE_NAME_OPT)
719 parser.add_option(DEST_PRIMARY_NODE_OPT)
720 parser.add_option(DEST_SECONDARY_NODE_OPT)
721 parser.add_option(PARALLEL_OPT)
723 (options, args) = parser.parse_args()
725 return (parser, options, args)
728 def CheckOptions(parser, options, args):
729 """Checks options and arguments for validity.
733 parser.error("Not enough arguments")
735 src_cluster_name = args.pop(0)
736 dest_cluster_name = args.pop(0)
737 instance_names = args
739 assert len(instance_names) > 0
741 # TODO: Remove once using system default paths for SSL certificate
742 # verification is implemented
743 if not options.src_ca_file:
744 parser.error("Missing source cluster CA file")
746 if options.parallel < 1:
747 parser.error("Number of simultaneous moves must be >= 1")
749 if not (bool(options.iallocator) ^
750 bool(options.dest_primary_node or options.dest_secondary_node)):
751 parser.error("Destination node and iallocator options exclude each other")
753 if len(instance_names) == 1:
754 # Moving one instance only
755 if not (options.iallocator or
756 options.dest_primary_node or
757 options.dest_secondary_node):
758 parser.error("An iallocator or the destination node is required")
760 # Moving more than one instance
761 if (options.dest_instance_name or options.dest_primary_node or
762 options.dest_secondary_node):
763 parser.error("The options --dest-instance-name, --dest-primary-node and"
764 " --dest-secondary-node can only be used when moving exactly"
767 if not options.iallocator:
768 parser.error("An iallocator must be specified for moving more than one"
771 return (src_cluster_name, dest_cluster_name, instance_names)
774 @rapi.client.UsesRapiClient
779 (parser, options, args) = ParseOptions()
781 SetupLogging(options)
783 (src_cluster_name, dest_cluster_name, instance_names) = \
784 CheckOptions(parser, options, args)
786 logging.info("Source cluster: %s", src_cluster_name)
787 logging.info("Destination cluster: %s", dest_cluster_name)
788 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
790 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
792 CheckRapiSetup(rapi_factory)
794 assert (len(instance_names) == 1 or
795 not (options.dest_primary_node or options.dest_secondary_node))
796 assert len(instance_names) == 1 or options.iallocator
797 assert (len(instance_names) > 1 or options.iallocator or
798 options.dest_primary_node or options.dest_secondary_node)
800 # Prepare list of instance moves
802 for src_instance_name in instance_names:
803 if options.dest_instance_name:
804 assert len(instance_names) == 1
806 dest_instance_name = options.dest_instance_name
808 dest_instance_name = src_instance_name
810 moves.append(InstanceMove(src_instance_name, dest_instance_name,
811 options.dest_primary_node,
812 options.dest_secondary_node,
815 assert len(moves) == len(instance_names)
818 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
820 # Add instance moves to workerpool
822 wp.AddTask(rapi_factory, move)
824 # Wait for all moves to finish
828 wp.TerminateWorkers()
830 # There should be no threads running at this point, hence not using locks
833 logging.info("Instance move results:")
836 if move.dest_instance_name == move.src_instance_name:
837 name = move.src_instance_name
839 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
841 if move.error_message:
842 msg = "Failed (%s)" % move.error_message
846 logging.info("%s: %s", name, msg)
848 if compat.any(move.error_message for move in moves):
849 sys.exit(constants.EXIT_FAILURE)
851 sys.exit(constants.EXIT_SUCCESS)
854 if __name__ == "__main__":