4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable-msg=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import compat
40 from ganeti import rapi
42 import ganeti.rapi.client # pylint: disable-msg=W0611
43 import ganeti.rapi.client_utils
47 cli.cli_option("--src-rapi-port", action="store", type="int",
48 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49 help=("Source cluster RAPI port (defaults to %s)" %
50 constants.DEFAULT_RAPI_PORT))
53 cli.cli_option("--src-ca-file", action="store", type="string",
55 help=("File containing source cluster Certificate"
56 " Authority (CA) in PEM format"))
59 cli.cli_option("--src-username", action="store", type="string",
60 dest="src_username", default=None,
61 help="Source cluster username")
63 SRC_PASSWORD_FILE_OPT = \
64 cli.cli_option("--src-password-file", action="store", type="string",
65 dest="src_password_file",
66 help="File containing source cluster password")
68 DEST_RAPI_PORT_OPT = \
69 cli.cli_option("--dest-rapi-port", action="store", type="int",
70 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71 help=("Destination cluster RAPI port (defaults to source"
72 " cluster RAPI port)"))
75 cli.cli_option("--dest-ca-file", action="store", type="string",
77 help=("File containing destination cluster Certificate"
78 " Authority (CA) in PEM format (defaults to source"
82 cli.cli_option("--dest-username", action="store", type="string",
83 dest="dest_username", default=None,
84 help=("Destination cluster username (defaults to"
85 " source cluster username)"))
87 DEST_PASSWORD_FILE_OPT = \
88 cli.cli_option("--dest-password-file", action="store", type="string",
89 dest="dest_password_file",
90 help=("File containing destination cluster password"
91 " (defaults to source cluster password)"))
93 DEST_INSTANCE_NAME_OPT = \
94 cli.cli_option("--dest-instance-name", action="store", type="string",
95 dest="dest_instance_name",
96 help=("Instance name on destination cluster (only"
97 " when moving exactly one instance)"))
99 DEST_PRIMARY_NODE_OPT = \
100 cli.cli_option("--dest-primary-node", action="store", type="string",
101 dest="dest_primary_node",
102 help=("Primary node on destination cluster (only"
103 " when moving exactly one instance)"))
105 DEST_SECONDARY_NODE_OPT = \
106 cli.cli_option("--dest-secondary-node", action="store", type="string",
107 dest="dest_secondary_node",
108 help=("Secondary node on destination cluster (only"
109 " when moving exactly one instance)"))
112 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113 dest="parallel", metavar="<number>",
114 help="Number of instances to be moved simultaneously")
117 class Error(Exception):
124 """Special exception for aborting import/export.
129 class RapiClientFactory:
130 """Factory class for creating RAPI clients.
132 @ivar src_cluster_name: Source cluster name
133 @ivar dest_cluster_name: Destination cluster name
134 @ivar GetSourceClient: Callable returning new client for source cluster
135 @ivar GetDestClient: Callable returning new client for destination cluster
138 def __init__(self, options, src_cluster_name, dest_cluster_name):
139 """Initializes this class.
141 @param options: Program options
142 @type src_cluster_name: string
143 @param src_cluster_name: Source cluster name
144 @type dest_cluster_name: string
145 @param dest_cluster_name: Destination cluster name
148 self.src_cluster_name = src_cluster_name
149 self.dest_cluster_name = dest_cluster_name
151 # TODO: Implement timeouts for RAPI connections
152 # TODO: Support for using system default paths for verifying SSL certificate
153 logging.debug("Using '%s' as source CA", options.src_ca_file)
154 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
156 if options.dest_ca_file:
157 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
159 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
161 logging.debug("Using source CA for destination")
162 dest_curl_config = src_curl_config
164 logging.debug("Source RAPI server is %s:%s",
165 src_cluster_name, options.src_rapi_port)
166 logging.debug("Source username is '%s'", options.src_username)
168 if options.src_username is None:
171 src_username = options.src_username
173 if options.src_password_file:
174 logging.debug("Reading '%s' for source password",
175 options.src_password_file)
176 src_password = utils.ReadOneLineFile(options.src_password_file,
179 logging.debug("Source has no password")
182 self.GetSourceClient = lambda: \
183 rapi.client.GanetiRapiClient(src_cluster_name,
184 port=options.src_rapi_port,
185 curl_config_fn=src_curl_config,
186 username=src_username,
187 password=src_password)
189 if options.dest_rapi_port:
190 dest_rapi_port = options.dest_rapi_port
192 dest_rapi_port = options.src_rapi_port
194 if options.dest_username is None:
195 dest_username = src_username
197 dest_username = options.dest_username
199 logging.debug("Destination RAPI server is %s:%s",
200 dest_cluster_name, dest_rapi_port)
201 logging.debug("Destination username is '%s'", dest_username)
203 if options.dest_password_file:
204 logging.debug("Reading '%s' for destination password",
205 options.dest_password_file)
206 dest_password = utils.ReadOneLineFile(options.dest_password_file,
209 logging.debug("Using source password for destination")
210 dest_password = src_password
212 self.GetDestClient = lambda: \
213 rapi.client.GanetiRapiClient(dest_cluster_name,
215 curl_config_fn=dest_curl_config,
216 username=dest_username,
217 password=dest_password)
220 class MoveJobPollReportCb(cli.JobPollReportCbBase):
221 def __init__(self, abort_check_fn, remote_import_fn):
222 """Initializes this class.
224 @type abort_check_fn: callable
225 @param abort_check_fn: Function to check whether move is aborted
226 @type remote_import_fn: callable or None
227 @param remote_import_fn: Callback for reporting received remote import
231 cli.JobPollReportCbBase.__init__(self)
232 self._abort_check_fn = abort_check_fn
233 self._remote_import_fn = remote_import_fn
235 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236 """Handles a log message.
239 if log_type == constants.ELOG_REMOTE_IMPORT:
240 logging.debug("Received remote import information")
242 if not self._remote_import_fn:
243 raise RuntimeError("Received unexpected remote import information")
245 assert "x509_ca" in log_msg
246 assert "disks" in log_msg
248 self._remote_import_fn(log_msg)
252 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253 cli.FormatLogMessage(log_type, log_msg))
255 def ReportNotChanged(self, job_id, status):
256 """Called if a job hasn't changed in a while.
260 # Check whether we were told to abort by the other thread
261 self._abort_check_fn()
263 logging.warning("Aborting despite job %s still running", job_id)
267 class InstanceMove(object):
268 """Status class for instance moves.
271 def __init__(self, src_instance_name, dest_instance_name,
272 dest_pnode, dest_snode, dest_iallocator):
273 """Initializes this class.
275 @type src_instance_name: string
276 @param src_instance_name: Instance name on source cluster
277 @type dest_instance_name: string
278 @param dest_instance_name: Instance name on destination cluster
279 @type dest_pnode: string or None
280 @param dest_pnode: Name of primary node on destination cluster
281 @type dest_snode: string or None
282 @param dest_snode: Name of secondary node on destination cluster
283 @type dest_iallocator: string or None
284 @param dest_iallocator: Name of iallocator to use
287 self.src_instance_name = src_instance_name
288 self.dest_instance_name = dest_instance_name
289 self.dest_pnode = dest_pnode
290 self.dest_snode = dest_snode
291 self.dest_iallocator = dest_iallocator
293 self.error_message = None
296 class MoveRuntime(object):
297 """Class to keep track of instance move.
300 def __init__(self, move):
301 """Initializes this class.
303 @type move: L{InstanceMove}
308 # Thread synchronization
309 self.lock = threading.Lock()
310 self.source_to_dest = threading.Condition(self.lock)
311 self.dest_to_source = threading.Condition(self.lock)
314 self.src_error_message = None
315 self.src_expinfo = None
316 self.src_instinfo = None
318 # Destination information
319 self.dest_error_message = None
320 self.dest_impinfo = None
322 def HandleErrors(self, prefix, fn, *args):
323 """Wrapper to catch errors and abort threads.
326 @param prefix: Variable name prefix ("src" or "dest")
331 assert prefix in ("dest", "src")
334 # Call inner function
340 except Exception, err:
341 logging.exception("Caught unhandled exception")
344 setattr(self, "%s_error_message" % prefix, errmsg)
348 self.source_to_dest.notifyAll()
349 self.dest_to_source.notifyAll()
353 def CheckAbort(self):
354 """Check whether thread should be aborted.
356 @raise Abort: When thread should be aborted
359 if not (self.src_error_message is None and
360 self.dest_error_message is None):
361 logging.info("Aborting")
364 def Wait(self, cond, check_fn):
365 """Waits for a condition to become true.
367 @type cond: threading.Condition
368 @param cond: Threading condition
369 @type check_fn: callable
370 @param check_fn: Function to check whether condition is true
375 while check_fn(self):
381 def PollJob(self, cl, job_id, remote_import_fn=None):
382 """Wrapper for polling a job.
384 @type cl: L{rapi.client.GanetiRapiClient}
385 @param cl: RAPI client
387 @param job_id: Job ID
388 @type remote_import_fn: callable or None
389 @param remote_import_fn: Callback for reporting received remote import
393 return rapi.client_utils.PollJob(cl, job_id,
394 MoveJobPollReportCb(self.CheckAbort,
398 class MoveDestExecutor(object):
399 def __init__(self, dest_client, mrt):
400 """Destination side of an instance move.
402 @type dest_client: L{rapi.client.GanetiRapiClient}
403 @param dest_client: RAPI client
404 @type mrt: L{MoveRuntime}
405 @param mrt: Instance move runtime information
408 logging.debug("Waiting for instance information to become available")
409 mrt.Wait(mrt.source_to_dest,
410 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
412 logging.info("Creating instance %s in remote-import mode",
413 mrt.move.dest_instance_name)
414 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
415 mrt.move.dest_pnode, mrt.move.dest_snode,
416 mrt.move.dest_iallocator,
417 mrt.src_instinfo, mrt.src_expinfo)
418 mrt.PollJob(dest_client, job_id,
419 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
421 logging.info("Import successful")
424 def _SetImportInfo(mrt, impinfo):
425 """Sets the remote import information and notifies source thread.
427 @type mrt: L{MoveRuntime}
428 @param mrt: Instance move runtime information
429 @param impinfo: Remote import information
432 mrt.dest_to_source.acquire()
434 mrt.dest_impinfo = impinfo
435 mrt.dest_to_source.notifyAll()
437 mrt.dest_to_source.release()
440 def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
441 """Starts the instance creation in remote import mode.
443 @type cl: L{rapi.client.GanetiRapiClient}
444 @param cl: RAPI client
446 @param name: Instance name
447 @type pnode: string or None
448 @param pnode: Name of primary node on destination cluster
449 @type snode: string or None
450 @param snode: Name of secondary node on destination cluster
451 @type iallocator: string or None
452 @param iallocator: Name of iallocator to use
454 @param instance: Instance details from source cluster
456 @param expinfo: Prepared export information from source cluster
460 disk_template = instance["disk_template"]
465 } for i in instance["disks"]]
472 } for ip, mac, mode, link in instance["nics"]]
474 # TODO: Should this be the actual up/down status? (run_state)
475 start = (instance["config_state"] == "up")
477 assert len(disks) == len(instance["disks"])
478 assert len(nics) == len(instance["nics"])
480 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
481 name, disk_template, disks, nics,
487 iallocator=iallocator,
488 hypervisor=instance["hypervisor"],
489 source_handshake=expinfo["handshake"],
490 source_x509_ca=expinfo["x509_ca"],
491 source_instance_name=instance["name"],
492 beparams=instance["be_instance"],
493 hvparams=instance["hv_instance"],
494 osparams=instance["os_instance"])
497 class MoveSourceExecutor(object):
498 def __init__(self, src_client, mrt):
499 """Source side of an instance move.
501 @type src_client: L{rapi.client.GanetiRapiClient}
502 @param src_client: RAPI client
503 @type mrt: L{MoveRuntime}
504 @param mrt: Instance move runtime information
507 logging.info("Checking whether instance exists")
508 self._CheckInstance(src_client, mrt.move.src_instance_name)
510 logging.info("Retrieving instance information from source cluster")
511 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
512 mrt.move.src_instance_name)
514 logging.info("Preparing export on source cluster")
515 expinfo = self._PrepareExport(src_client, mrt.PollJob,
516 mrt.move.src_instance_name)
517 assert "handshake" in expinfo
518 assert "x509_key_name" in expinfo
519 assert "x509_ca" in expinfo
521 # Hand information to destination thread
522 mrt.source_to_dest.acquire()
524 mrt.src_instinfo = instinfo
525 mrt.src_expinfo = expinfo
526 mrt.source_to_dest.notifyAll()
528 mrt.source_to_dest.release()
530 logging.info("Waiting for destination information to become available")
531 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
533 logging.info("Starting remote export on source cluster")
534 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
535 expinfo["x509_key_name"], mrt.dest_impinfo)
537 logging.info("Export successful")
540 def _CheckInstance(cl, name):
541 """Checks whether the instance exists on the source cluster.
543 @type cl: L{rapi.client.GanetiRapiClient}
544 @param cl: RAPI client
546 @param name: Instance name
551 except rapi.client.GanetiApiError, err:
552 if err.code == rapi.client.HTTP_NOT_FOUND:
553 raise Error("Instance %s not found (%s)" % (name, str(err)))
557 def _GetInstanceInfo(cl, poll_job_fn, name):
558 """Retrieves detailed instance information from source cluster.
560 @type cl: L{rapi.client.GanetiRapiClient}
561 @param cl: RAPI client
562 @type poll_job_fn: callable
563 @param poll_job_fn: Function to poll for job result
565 @param name: Instance name
568 job_id = cl.GetInstanceInfo(name, static=True)
569 result = poll_job_fn(cl, job_id)
570 assert len(result[0].keys()) == 1
571 return result[0][result[0].keys()[0]]
574 def _PrepareExport(cl, poll_job_fn, name):
575 """Prepares export on source cluster.
577 @type cl: L{rapi.client.GanetiRapiClient}
578 @param cl: RAPI client
579 @type poll_job_fn: callable
580 @param poll_job_fn: Function to poll for job result
582 @param name: Instance name
585 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
586 return poll_job_fn(cl, job_id)[0]
589 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
590 """Exports instance from source cluster.
592 @type cl: L{rapi.client.GanetiRapiClient}
593 @param cl: RAPI client
594 @type poll_job_fn: callable
595 @param poll_job_fn: Function to poll for job result
597 @param name: Instance name
598 @param x509_key_name: Source X509 key
599 @param impinfo: Import information from destination cluster
602 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
603 impinfo["disks"], shutdown=True,
604 remove_instance=True,
605 x509_key_name=x509_key_name,
606 destination_x509_ca=impinfo["x509_ca"])
607 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
609 if not (fin_resu and compat.all(dresults)):
610 raise Error("Export failed for disks %s" %
611 utils.CommaJoin(str(idx) for idx, result
612 in enumerate(dresults) if not result))
615 class MoveSourceWorker(workerpool.BaseWorker):
616 def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
617 """Executes an instance move.
619 @type rapi_factory: L{RapiClientFactory}
620 @param rapi_factory: RAPI client factory
621 @type move: L{InstanceMove}
622 @param move: Instance move information
626 logging.info("Preparing to move %s from cluster %s to %s as %s",
627 move.src_instance_name, rapi_factory.src_cluster_name,
628 rapi_factory.dest_cluster_name, move.dest_instance_name)
630 mrt = MoveRuntime(move)
632 logging.debug("Starting destination thread")
633 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
634 target=mrt.HandleErrors,
635 args=("dest", MoveDestExecutor,
636 rapi_factory.GetDestClient(),
640 mrt.HandleErrors("src", MoveSourceExecutor,
641 rapi_factory.GetSourceClient(), mrt)
645 if mrt.src_error_message or mrt.dest_error_message:
646 move.error_message = ("Source error: %s, destination error: %s" %
647 (mrt.src_error_message, mrt.dest_error_message))
649 move.error_message = None
650 except Exception, err: # pylint: disable-msg=W0703
651 logging.exception("Caught unhandled exception")
652 move.error_message = str(err)
655 def CheckRapiSetup(rapi_factory):
656 """Checks the RAPI setup by retrieving the version.
658 @type rapi_factory: L{RapiClientFactory}
659 @param rapi_factory: RAPI client factory
662 src_client = rapi_factory.GetSourceClient()
663 logging.info("Connecting to source RAPI server")
664 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
666 dest_client = rapi_factory.GetDestClient()
667 logging.info("Connecting to destination RAPI server")
668 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
671 def SetupLogging(options):
672 """Setting up logging infrastructure.
674 @param options: Parsed command line options
677 fmt = "%(asctime)s: %(threadName)s "
678 if options.debug or options.verbose:
679 fmt += "%(levelname)s "
682 formatter = logging.Formatter(fmt)
684 stderr_handler = logging.StreamHandler()
685 stderr_handler.setFormatter(formatter)
687 stderr_handler.setLevel(logging.NOTSET)
688 elif options.verbose:
689 stderr_handler.setLevel(logging.INFO)
691 stderr_handler.setLevel(logging.ERROR)
693 root_logger = logging.getLogger("")
694 root_logger.setLevel(logging.NOTSET)
695 root_logger.addHandler(stderr_handler)
699 """Parses options passed to program.
702 program = os.path.basename(sys.argv[0])
704 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
705 " <source-cluster> <dest-cluster>"
708 parser.add_option(cli.DEBUG_OPT)
709 parser.add_option(cli.VERBOSE_OPT)
710 parser.add_option(cli.IALLOCATOR_OPT)
711 parser.add_option(SRC_RAPI_PORT_OPT)
712 parser.add_option(SRC_CA_FILE_OPT)
713 parser.add_option(SRC_USERNAME_OPT)
714 parser.add_option(SRC_PASSWORD_FILE_OPT)
715 parser.add_option(DEST_RAPI_PORT_OPT)
716 parser.add_option(DEST_CA_FILE_OPT)
717 parser.add_option(DEST_USERNAME_OPT)
718 parser.add_option(DEST_PASSWORD_FILE_OPT)
719 parser.add_option(DEST_INSTANCE_NAME_OPT)
720 parser.add_option(DEST_PRIMARY_NODE_OPT)
721 parser.add_option(DEST_SECONDARY_NODE_OPT)
722 parser.add_option(PARALLEL_OPT)
724 (options, args) = parser.parse_args()
726 return (parser, options, args)
729 def CheckOptions(parser, options, args):
730 """Checks options and arguments for validity.
734 parser.error("Not enough arguments")
736 src_cluster_name = args.pop(0)
737 dest_cluster_name = args.pop(0)
738 instance_names = args
740 assert len(instance_names) > 0
742 # TODO: Remove once using system default paths for SSL certificate
743 # verification is implemented
744 if not options.src_ca_file:
745 parser.error("Missing source cluster CA file")
747 if options.parallel < 1:
748 parser.error("Number of simultaneous moves must be >= 1")
750 if not (bool(options.iallocator) ^
751 bool(options.dest_primary_node or options.dest_secondary_node)):
752 parser.error("Destination node and iallocator options exclude each other")
754 if len(instance_names) == 1:
755 # Moving one instance only
756 if not (options.iallocator or
757 options.dest_primary_node or
758 options.dest_secondary_node):
759 parser.error("An iallocator or the destination node is required")
761 # Moving more than one instance
762 if (options.dest_instance_name or options.dest_primary_node or
763 options.dest_secondary_node):
764 parser.error("The options --dest-instance-name, --dest-primary-node and"
765 " --dest-secondary-node can only be used when moving exactly"
768 if not options.iallocator:
769 parser.error("An iallocator must be specified for moving more than one"
772 return (src_cluster_name, dest_cluster_name, instance_names)
775 @rapi.client.UsesRapiClient
780 (parser, options, args) = ParseOptions()
782 SetupLogging(options)
784 (src_cluster_name, dest_cluster_name, instance_names) = \
785 CheckOptions(parser, options, args)
787 logging.info("Source cluster: %s", src_cluster_name)
788 logging.info("Destination cluster: %s", dest_cluster_name)
789 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
791 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
793 CheckRapiSetup(rapi_factory)
795 assert (len(instance_names) == 1 or
796 not (options.dest_primary_node or options.dest_secondary_node))
797 assert len(instance_names) == 1 or options.iallocator
798 assert (len(instance_names) > 1 or options.iallocator or
799 options.dest_primary_node or options.dest_secondary_node)
801 # Prepare list of instance moves
803 for src_instance_name in instance_names:
804 if options.dest_instance_name:
805 assert len(instance_names) == 1
807 dest_instance_name = options.dest_instance_name
809 dest_instance_name = src_instance_name
811 moves.append(InstanceMove(src_instance_name, dest_instance_name,
812 options.dest_primary_node,
813 options.dest_secondary_node,
816 assert len(moves) == len(instance_names)
819 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
821 # Add instance moves to workerpool
823 wp.AddTask((rapi_factory, move))
825 # Wait for all moves to finish
829 wp.TerminateWorkers()
831 # There should be no threads running at this point, hence not using locks
834 logging.info("Instance move results:")
837 if move.dest_instance_name == move.src_instance_name:
838 name = move.src_instance_name
840 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
842 if move.error_message:
843 msg = "Failed (%s)" % move.error_message
847 logging.info("%s: %s", name, msg)
849 if compat.any(move.error_message for move in moves):
850 sys.exit(constants.EXIT_FAILURE)
852 sys.exit(constants.EXIT_SUCCESS)
855 if __name__ == "__main__":