4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
48 cli.cli_option("--src-rapi-port", action="store", type="int",
49 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
50 help=("Source cluster RAPI port (defaults to %s)" %
51 constants.DEFAULT_RAPI_PORT))
54 cli.cli_option("--src-ca-file", action="store", type="string",
56 help=("File containing source cluster Certificate"
57 " Authority (CA) in PEM format"))
60 cli.cli_option("--src-username", action="store", type="string",
61 dest="src_username", default=None,
62 help="Source cluster username")
64 SRC_PASSWORD_FILE_OPT = \
65 cli.cli_option("--src-password-file", action="store", type="string",
66 dest="src_password_file",
67 help="File containing source cluster password")
69 DEST_RAPI_PORT_OPT = \
70 cli.cli_option("--dest-rapi-port", action="store", type="int",
71 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
72 help=("Destination cluster RAPI port (defaults to source"
73 " cluster RAPI port)"))
76 cli.cli_option("--dest-ca-file", action="store", type="string",
78 help=("File containing destination cluster Certificate"
79 " Authority (CA) in PEM format (defaults to source"
83 cli.cli_option("--dest-username", action="store", type="string",
84 dest="dest_username", default=None,
85 help=("Destination cluster username (defaults to"
86 " source cluster username)"))
88 DEST_PASSWORD_FILE_OPT = \
89 cli.cli_option("--dest-password-file", action="store", type="string",
90 dest="dest_password_file",
91 help=("File containing destination cluster password"
92 " (defaults to source cluster password)"))
94 DEST_INSTANCE_NAME_OPT = \
95 cli.cli_option("--dest-instance-name", action="store", type="string",
96 dest="dest_instance_name",
97 help=("Instance name on destination cluster (only"
98 " when moving exactly one instance)"))
100 DEST_PRIMARY_NODE_OPT = \
101 cli.cli_option("--dest-primary-node", action="store", type="string",
102 dest="dest_primary_node",
103 help=("Primary node on destination cluster (only"
104 " when moving exactly one instance)"))
106 DEST_SECONDARY_NODE_OPT = \
107 cli.cli_option("--dest-secondary-node", action="store", type="string",
108 dest="dest_secondary_node",
109 help=("Secondary node on destination cluster (only"
110 " when moving exactly one instance)"))
113 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
114 dest="parallel", metavar="<number>",
115 help="Number of instances to be moved simultaneously")
118 class Error(Exception):
125 """Special exception for aborting import/export.
130 class RapiClientFactory:
131 """Factory class for creating RAPI clients.
133 @ivar src_cluster_name: Source cluster name
134 @ivar dest_cluster_name: Destination cluster name
135 @ivar GetSourceClient: Callable returning new client for source cluster
136 @ivar GetDestClient: Callable returning new client for destination cluster
139 def __init__(self, options, src_cluster_name, dest_cluster_name):
140 """Initializes this class.
142 @param options: Program options
143 @type src_cluster_name: string
144 @param src_cluster_name: Source cluster name
145 @type dest_cluster_name: string
146 @param dest_cluster_name: Destination cluster name
149 self.src_cluster_name = src_cluster_name
150 self.dest_cluster_name = dest_cluster_name
152 # TODO: Implement timeouts for RAPI connections
153 # TODO: Support for using system default paths for verifying SSL certificate
154 logging.debug("Using '%s' as source CA", options.src_ca_file)
155 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
157 if options.dest_ca_file:
158 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
160 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
162 logging.debug("Using source CA for destination")
163 dest_curl_config = src_curl_config
165 logging.debug("Source RAPI server is %s:%s",
166 src_cluster_name, options.src_rapi_port)
167 logging.debug("Source username is '%s'", options.src_username)
169 if options.src_username is None:
172 src_username = options.src_username
174 if options.src_password_file:
175 logging.debug("Reading '%s' for source password",
176 options.src_password_file)
177 src_password = utils.ReadOneLineFile(options.src_password_file,
180 logging.debug("Source has no password")
183 self.GetSourceClient = lambda: \
184 rapi.client.GanetiRapiClient(src_cluster_name,
185 port=options.src_rapi_port,
186 curl_config_fn=src_curl_config,
187 username=src_username,
188 password=src_password)
190 if options.dest_rapi_port:
191 dest_rapi_port = options.dest_rapi_port
193 dest_rapi_port = options.src_rapi_port
195 if options.dest_username is None:
196 dest_username = src_username
198 dest_username = options.dest_username
200 logging.debug("Destination RAPI server is %s:%s",
201 dest_cluster_name, dest_rapi_port)
202 logging.debug("Destination username is '%s'", dest_username)
204 if options.dest_password_file:
205 logging.debug("Reading '%s' for destination password",
206 options.dest_password_file)
207 dest_password = utils.ReadOneLineFile(options.dest_password_file,
210 logging.debug("Using source password for destination")
211 dest_password = src_password
213 self.GetDestClient = lambda: \
214 rapi.client.GanetiRapiClient(dest_cluster_name,
216 curl_config_fn=dest_curl_config,
217 username=dest_username,
218 password=dest_password)
221 class MoveJobPollReportCb(cli.JobPollReportCbBase):
222 def __init__(self, abort_check_fn, remote_import_fn):
223 """Initializes this class.
225 @type abort_check_fn: callable
226 @param abort_check_fn: Function to check whether move is aborted
227 @type remote_import_fn: callable or None
228 @param remote_import_fn: Callback for reporting received remote import
232 cli.JobPollReportCbBase.__init__(self)
233 self._abort_check_fn = abort_check_fn
234 self._remote_import_fn = remote_import_fn
236 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
237 """Handles a log message.
240 if log_type == constants.ELOG_REMOTE_IMPORT:
241 logging.debug("Received remote import information")
243 if not self._remote_import_fn:
244 raise RuntimeError("Received unexpected remote import information")
246 assert "x509_ca" in log_msg
247 assert "disks" in log_msg
249 self._remote_import_fn(log_msg)
253 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
254 cli.FormatLogMessage(log_type, log_msg))
256 def ReportNotChanged(self, job_id, status):
257 """Called if a job hasn't changed in a while.
261 # Check whether we were told to abort by the other thread
262 self._abort_check_fn()
264 logging.warning("Aborting despite job %s still running", job_id)
268 class InstanceMove(object):
269 """Status class for instance moves.
272 def __init__(self, src_instance_name, dest_instance_name,
273 dest_pnode, dest_snode, dest_iallocator,
274 hvparams, beparams, osparams, nics):
275 """Initializes this class.
277 @type src_instance_name: string
278 @param src_instance_name: Instance name on source cluster
279 @type dest_instance_name: string
280 @param dest_instance_name: Instance name on destination cluster
281 @type dest_pnode: string or None
282 @param dest_pnode: Name of primary node on destination cluster
283 @type dest_snode: string or None
284 @param dest_snode: Name of secondary node on destination cluster
285 @type dest_iallocator: string or None
286 @param dest_iallocator: Name of iallocator to use
287 @type hvparams: dict or None
288 @param hvparams: Hypervisor parameters to override
289 @type beparams: dict or None
290 @param beparams: Backend parameters to override
291 @type osparams: dict or None
292 @param osparams: OS parameters to override
293 @type nics: dict or None
294 @param nics: NICs to override
297 self.src_instance_name = src_instance_name
298 self.dest_instance_name = dest_instance_name
299 self.dest_pnode = dest_pnode
300 self.dest_snode = dest_snode
301 self.dest_iallocator = dest_iallocator
302 self.hvparams = hvparams
303 self.beparams = beparams
304 self.osparams = osparams
307 self.error_message = None
310 class MoveRuntime(object):
311 """Class to keep track of instance move.
314 def __init__(self, move):
315 """Initializes this class.
317 @type move: L{InstanceMove}
322 # Thread synchronization
323 self.lock = threading.Lock()
324 self.source_to_dest = threading.Condition(self.lock)
325 self.dest_to_source = threading.Condition(self.lock)
328 self.src_error_message = None
329 self.src_expinfo = None
330 self.src_instinfo = None
332 # Destination information
333 self.dest_error_message = None
334 self.dest_impinfo = None
336 def HandleErrors(self, prefix, fn, *args):
337 """Wrapper to catch errors and abort threads.
340 @param prefix: Variable name prefix ("src" or "dest")
345 assert prefix in ("dest", "src")
348 # Call inner function
354 except Exception, err:
355 logging.exception("Caught unhandled exception")
358 setattr(self, "%s_error_message" % prefix, errmsg)
362 self.source_to_dest.notifyAll()
363 self.dest_to_source.notifyAll()
367 def CheckAbort(self):
368 """Check whether thread should be aborted.
370 @raise Abort: When thread should be aborted
373 if not (self.src_error_message is None and
374 self.dest_error_message is None):
375 logging.info("Aborting")
378 def Wait(self, cond, check_fn):
379 """Waits for a condition to become true.
381 @type cond: threading.Condition
382 @param cond: Threading condition
383 @type check_fn: callable
384 @param check_fn: Function to check whether condition is true
389 while check_fn(self):
395 def PollJob(self, cl, job_id, remote_import_fn=None):
396 """Wrapper for polling a job.
398 @type cl: L{rapi.client.GanetiRapiClient}
399 @param cl: RAPI client
401 @param job_id: Job ID
402 @type remote_import_fn: callable or None
403 @param remote_import_fn: Callback for reporting received remote import
407 return rapi.client_utils.PollJob(cl, job_id,
408 MoveJobPollReportCb(self.CheckAbort,
412 class MoveDestExecutor(object):
413 def __init__(self, dest_client, mrt):
414 """Destination side of an instance move.
416 @type dest_client: L{rapi.client.GanetiRapiClient}
417 @param dest_client: RAPI client
418 @type mrt: L{MoveRuntime}
419 @param mrt: Instance move runtime information
422 logging.debug("Waiting for instance information to become available")
423 mrt.Wait(mrt.source_to_dest,
424 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
426 logging.info("Creating instance %s in remote-import mode",
427 mrt.move.dest_instance_name)
428 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
429 mrt.move.dest_pnode, mrt.move.dest_snode,
430 mrt.move.dest_iallocator,
431 mrt.src_instinfo, mrt.src_expinfo,
432 mrt.move.hvparams, mrt.move.beparams,
433 mrt.move.beparams, mrt.move.nics)
434 mrt.PollJob(dest_client, job_id,
435 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
437 logging.info("Import successful")
440 def _SetImportInfo(mrt, impinfo):
441 """Sets the remote import information and notifies source thread.
443 @type mrt: L{MoveRuntime}
444 @param mrt: Instance move runtime information
445 @param impinfo: Remote import information
448 mrt.dest_to_source.acquire()
450 mrt.dest_impinfo = impinfo
451 mrt.dest_to_source.notifyAll()
453 mrt.dest_to_source.release()
456 def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
457 override_hvparams, override_beparams, override_osparams,
459 """Starts the instance creation in remote import mode.
461 @type cl: L{rapi.client.GanetiRapiClient}
462 @param cl: RAPI client
464 @param name: Instance name
465 @type pnode: string or None
466 @param pnode: Name of primary node on destination cluster
467 @type snode: string or None
468 @param snode: Name of secondary node on destination cluster
469 @type iallocator: string or None
470 @param iallocator: Name of iallocator to use
472 @param instance: Instance details from source cluster
474 @param expinfo: Prepared export information from source cluster
475 @type override_hvparams: dict or None
476 @param override_hvparams: Hypervisor parameters to override
477 @type override_beparams: dict or None
478 @param override_beparams: Backend parameters to override
479 @type override_osparams: dict or None
480 @param override_osparams: OS parameters to override
481 @type override_nics: dict or None
482 @param override_nics: NICs to override
486 disk_template = instance["disk_template"]
489 constants.IDISK_SIZE: i["size"],
490 constants.IDISK_MODE: i["mode"],
491 } for i in instance["disks"]]
494 constants.INIC_IP: ip,
495 constants.INIC_MAC: mac,
496 constants.INIC_MODE: mode,
497 constants.INIC_LINK: link,
498 } for ip, mac, mode, link in instance["nics"]]
500 if len(override_nics) > len(nics):
501 raise Error("Can not create new NICs")
504 assert len(override_nics) <= len(nics)
505 for idx, (nic, override) in enumerate(zip(nics, override_nics)):
506 nics[idx] = objects.FillDict(nic, override)
508 # TODO: Should this be the actual up/down status? (run_state)
509 start = (instance["config_state"] == "up")
511 assert len(disks) == len(instance["disks"])
512 assert len(nics) == len(instance["nics"])
514 inst_beparams = instance["be_instance"]
515 if not inst_beparams:
518 inst_hvparams = instance["hv_instance"]
519 if not inst_hvparams:
522 inst_osparams = instance["os_instance"]
523 if not inst_osparams:
526 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
527 name, disk_template, disks, nics,
533 iallocator=iallocator,
534 hypervisor=instance["hypervisor"],
535 source_handshake=expinfo["handshake"],
536 source_x509_ca=expinfo["x509_ca"],
537 source_instance_name=instance["name"],
538 beparams=objects.FillDict(inst_beparams,
540 hvparams=objects.FillDict(inst_hvparams,
542 osparams=objects.FillDict(inst_osparams,
546 class MoveSourceExecutor(object):
547 def __init__(self, src_client, mrt):
548 """Source side of an instance move.
550 @type src_client: L{rapi.client.GanetiRapiClient}
551 @param src_client: RAPI client
552 @type mrt: L{MoveRuntime}
553 @param mrt: Instance move runtime information
556 logging.info("Checking whether instance exists")
557 self._CheckInstance(src_client, mrt.move.src_instance_name)
559 logging.info("Retrieving instance information from source cluster")
560 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
561 mrt.move.src_instance_name)
563 logging.info("Preparing export on source cluster")
564 expinfo = self._PrepareExport(src_client, mrt.PollJob,
565 mrt.move.src_instance_name)
566 assert "handshake" in expinfo
567 assert "x509_key_name" in expinfo
568 assert "x509_ca" in expinfo
570 # Hand information to destination thread
571 mrt.source_to_dest.acquire()
573 mrt.src_instinfo = instinfo
574 mrt.src_expinfo = expinfo
575 mrt.source_to_dest.notifyAll()
577 mrt.source_to_dest.release()
579 logging.info("Waiting for destination information to become available")
580 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
582 logging.info("Starting remote export on source cluster")
583 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
584 expinfo["x509_key_name"], mrt.dest_impinfo)
586 logging.info("Export successful")
589 def _CheckInstance(cl, name):
590 """Checks whether the instance exists on the source cluster.
592 @type cl: L{rapi.client.GanetiRapiClient}
593 @param cl: RAPI client
595 @param name: Instance name
600 except rapi.client.GanetiApiError, err:
601 if err.code == rapi.client.HTTP_NOT_FOUND:
602 raise Error("Instance %s not found (%s)" % (name, str(err)))
606 def _GetInstanceInfo(cl, poll_job_fn, name):
607 """Retrieves detailed instance information from source cluster.
609 @type cl: L{rapi.client.GanetiRapiClient}
610 @param cl: RAPI client
611 @type poll_job_fn: callable
612 @param poll_job_fn: Function to poll for job result
614 @param name: Instance name
617 job_id = cl.GetInstanceInfo(name, static=True)
618 result = poll_job_fn(cl, job_id)
619 assert len(result[0].keys()) == 1
620 return result[0][result[0].keys()[0]]
623 def _PrepareExport(cl, poll_job_fn, name):
624 """Prepares export on source cluster.
626 @type cl: L{rapi.client.GanetiRapiClient}
627 @param cl: RAPI client
628 @type poll_job_fn: callable
629 @param poll_job_fn: Function to poll for job result
631 @param name: Instance name
634 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
635 return poll_job_fn(cl, job_id)[0]
638 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
639 """Exports instance from source cluster.
641 @type cl: L{rapi.client.GanetiRapiClient}
642 @param cl: RAPI client
643 @type poll_job_fn: callable
644 @param poll_job_fn: Function to poll for job result
646 @param name: Instance name
647 @param x509_key_name: Source X509 key
648 @param impinfo: Import information from destination cluster
651 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
652 impinfo["disks"], shutdown=True,
653 remove_instance=True,
654 x509_key_name=x509_key_name,
655 destination_x509_ca=impinfo["x509_ca"])
656 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
658 if not (fin_resu and compat.all(dresults)):
659 raise Error("Export failed for disks %s" %
660 utils.CommaJoin(str(idx) for idx, result
661 in enumerate(dresults) if not result))
664 class MoveSourceWorker(workerpool.BaseWorker):
665 def RunTask(self, rapi_factory, move): # pylint: disable=W0221
666 """Executes an instance move.
668 @type rapi_factory: L{RapiClientFactory}
669 @param rapi_factory: RAPI client factory
670 @type move: L{InstanceMove}
671 @param move: Instance move information
675 logging.info("Preparing to move %s from cluster %s to %s as %s",
676 move.src_instance_name, rapi_factory.src_cluster_name,
677 rapi_factory.dest_cluster_name, move.dest_instance_name)
679 mrt = MoveRuntime(move)
681 logging.debug("Starting destination thread")
682 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
683 target=mrt.HandleErrors,
684 args=("dest", MoveDestExecutor,
685 rapi_factory.GetDestClient(),
689 mrt.HandleErrors("src", MoveSourceExecutor,
690 rapi_factory.GetSourceClient(), mrt)
694 if mrt.src_error_message or mrt.dest_error_message:
695 move.error_message = ("Source error: %s, destination error: %s" %
696 (mrt.src_error_message, mrt.dest_error_message))
698 move.error_message = None
699 except Exception, err: # pylint: disable=W0703
700 logging.exception("Caught unhandled exception")
701 move.error_message = str(err)
704 def CheckRapiSetup(rapi_factory):
705 """Checks the RAPI setup by retrieving the version.
707 @type rapi_factory: L{RapiClientFactory}
708 @param rapi_factory: RAPI client factory
711 src_client = rapi_factory.GetSourceClient()
712 logging.info("Connecting to source RAPI server")
713 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
715 dest_client = rapi_factory.GetDestClient()
716 logging.info("Connecting to destination RAPI server")
717 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
720 def SetupLogging(options):
721 """Setting up logging infrastructure.
723 @param options: Parsed command line options
726 fmt = "%(asctime)s: %(threadName)s "
727 if options.debug or options.verbose:
728 fmt += "%(levelname)s "
731 formatter = logging.Formatter(fmt)
733 stderr_handler = logging.StreamHandler()
734 stderr_handler.setFormatter(formatter)
736 stderr_handler.setLevel(logging.NOTSET)
737 elif options.verbose:
738 stderr_handler.setLevel(logging.INFO)
740 stderr_handler.setLevel(logging.ERROR)
742 root_logger = logging.getLogger("")
743 root_logger.setLevel(logging.NOTSET)
744 root_logger.addHandler(stderr_handler)
748 """Parses options passed to program.
751 program = os.path.basename(sys.argv[0])
753 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
754 " <source-cluster> <dest-cluster>"
757 parser.add_option(cli.DEBUG_OPT)
758 parser.add_option(cli.VERBOSE_OPT)
759 parser.add_option(cli.IALLOCATOR_OPT)
760 parser.add_option(cli.BACKEND_OPT)
761 parser.add_option(cli.HVOPTS_OPT)
762 parser.add_option(cli.OSPARAMS_OPT)
763 parser.add_option(cli.NET_OPT)
764 parser.add_option(SRC_RAPI_PORT_OPT)
765 parser.add_option(SRC_CA_FILE_OPT)
766 parser.add_option(SRC_USERNAME_OPT)
767 parser.add_option(SRC_PASSWORD_FILE_OPT)
768 parser.add_option(DEST_RAPI_PORT_OPT)
769 parser.add_option(DEST_CA_FILE_OPT)
770 parser.add_option(DEST_USERNAME_OPT)
771 parser.add_option(DEST_PASSWORD_FILE_OPT)
772 parser.add_option(DEST_INSTANCE_NAME_OPT)
773 parser.add_option(DEST_PRIMARY_NODE_OPT)
774 parser.add_option(DEST_SECONDARY_NODE_OPT)
775 parser.add_option(PARALLEL_OPT)
777 (options, args) = parser.parse_args()
779 return (parser, options, args)
782 def CheckOptions(parser, options, args):
783 """Checks options and arguments for validity.
787 parser.error("Not enough arguments")
789 src_cluster_name = args.pop(0)
790 dest_cluster_name = args.pop(0)
791 instance_names = args
793 assert len(instance_names) > 0
795 # TODO: Remove once using system default paths for SSL certificate
796 # verification is implemented
797 if not options.src_ca_file:
798 parser.error("Missing source cluster CA file")
800 if options.parallel < 1:
801 parser.error("Number of simultaneous moves must be >= 1")
803 if not (bool(options.iallocator) ^
804 bool(options.dest_primary_node or options.dest_secondary_node)):
805 parser.error("Destination node and iallocator options exclude each other")
807 if len(instance_names) == 1:
808 # Moving one instance only
809 if not (options.iallocator or
810 options.dest_primary_node or
811 options.dest_secondary_node):
812 parser.error("An iallocator or the destination node is required")
815 utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
818 utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
821 options.nics = cli.ParseNicOption(options.nics)
823 # Moving more than one instance
824 if (options.dest_instance_name or options.dest_primary_node or
825 options.dest_secondary_node or options.hvparams or
826 options.beparams or options.osparams or options.nics):
827 parser.error("The options --dest-instance-name, --dest-primary-node,"
828 " --dest-secondary-node, --hypervisor-parameters,"
829 " --backend-parameters, --os-parameters and --net can"
830 " only be used when moving exactly one instance")
832 if not options.iallocator:
833 parser.error("An iallocator must be specified for moving more than one"
836 return (src_cluster_name, dest_cluster_name, instance_names)
839 @rapi.client.UsesRapiClient
844 (parser, options, args) = ParseOptions()
846 SetupLogging(options)
848 (src_cluster_name, dest_cluster_name, instance_names) = \
849 CheckOptions(parser, options, args)
851 logging.info("Source cluster: %s", src_cluster_name)
852 logging.info("Destination cluster: %s", dest_cluster_name)
853 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
855 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
857 CheckRapiSetup(rapi_factory)
859 assert (len(instance_names) == 1 or
860 not (options.dest_primary_node or options.dest_secondary_node))
861 assert len(instance_names) == 1 or options.iallocator
862 assert (len(instance_names) > 1 or options.iallocator or
863 options.dest_primary_node or options.dest_secondary_node)
864 assert (len(instance_names) == 1 or
865 not (options.hvparams or options.beparams or options.osparams or
868 # Prepare list of instance moves
870 for src_instance_name in instance_names:
871 if options.dest_instance_name:
872 assert len(instance_names) == 1
874 dest_instance_name = options.dest_instance_name
876 dest_instance_name = src_instance_name
878 moves.append(InstanceMove(src_instance_name, dest_instance_name,
879 options.dest_primary_node,
880 options.dest_secondary_node,
881 options.iallocator, options.hvparams,
882 options.beparams, options.osparams,
885 assert len(moves) == len(instance_names)
888 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
890 # Add instance moves to workerpool
892 wp.AddTask((rapi_factory, move))
894 # Wait for all moves to finish
898 wp.TerminateWorkers()
900 # There should be no threads running at this point, hence not using locks
903 logging.info("Instance move results:")
906 if move.dest_instance_name == move.src_instance_name:
907 name = move.src_instance_name
909 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
911 if move.error_message:
912 msg = "Failed (%s)" % move.error_message
916 logging.info("%s: %s", name, msg)
918 if compat.any(move.error_message for move in moves):
919 sys.exit(constants.EXIT_FAILURE)
921 sys.exit(constants.EXIT_SUCCESS)
924 if __name__ == "__main__":