4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
49 cli.cli_option("--src-rapi-port", action="store", type="int",
50 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51 help=("Source cluster RAPI port (defaults to %s)" %
52 constants.DEFAULT_RAPI_PORT))
55 cli.cli_option("--src-ca-file", action="store", type="string",
57 help=("File containing source cluster Certificate"
58 " Authority (CA) in PEM format"))
61 cli.cli_option("--src-username", action="store", type="string",
62 dest="src_username", default=None,
63 help="Source cluster username")
65 SRC_PASSWORD_FILE_OPT = \
66 cli.cli_option("--src-password-file", action="store", type="string",
67 dest="src_password_file",
68 help="File containing source cluster password")
70 DEST_RAPI_PORT_OPT = \
71 cli.cli_option("--dest-rapi-port", action="store", type="int",
72 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73 help=("Destination cluster RAPI port (defaults to source"
74 " cluster RAPI port)"))
77 cli.cli_option("--dest-ca-file", action="store", type="string",
79 help=("File containing destination cluster Certificate"
80 " Authority (CA) in PEM format (defaults to source"
84 cli.cli_option("--dest-username", action="store", type="string",
85 dest="dest_username", default=None,
86 help=("Destination cluster username (defaults to"
87 " source cluster username)"))
89 DEST_PASSWORD_FILE_OPT = \
90 cli.cli_option("--dest-password-file", action="store", type="string",
91 dest="dest_password_file",
92 help=("File containing destination cluster password"
93 " (defaults to source cluster password)"))
95 DEST_INSTANCE_NAME_OPT = \
96 cli.cli_option("--dest-instance-name", action="store", type="string",
97 dest="dest_instance_name",
98 help=("Instance name on destination cluster (only"
99 " when moving exactly one instance)"))
101 DEST_PRIMARY_NODE_OPT = \
102 cli.cli_option("--dest-primary-node", action="store", type="string",
103 dest="dest_primary_node",
104 help=("Primary node on destination cluster (only"
105 " when moving exactly one instance)"))
107 DEST_SECONDARY_NODE_OPT = \
108 cli.cli_option("--dest-secondary-node", action="store", type="string",
109 dest="dest_secondary_node",
110 help=("Secondary node on destination cluster (only"
111 " when moving exactly one instance)"))
114 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
115 dest="parallel", metavar="<number>",
116 help="Number of instances to be moved simultaneously")
119 class Error(Exception):
126 """Special exception for aborting import/export.
131 class RapiClientFactory:
132 """Factory class for creating RAPI clients.
134 @ivar src_cluster_name: Source cluster name
135 @ivar dest_cluster_name: Destination cluster name
136 @ivar GetSourceClient: Callable returning new client for source cluster
137 @ivar GetDestClient: Callable returning new client for destination cluster
140 def __init__(self, options, src_cluster_name, dest_cluster_name):
141 """Initializes this class.
143 @param options: Program options
144 @type src_cluster_name: string
145 @param src_cluster_name: Source cluster name
146 @type dest_cluster_name: string
147 @param dest_cluster_name: Destination cluster name
150 self.src_cluster_name = src_cluster_name
151 self.dest_cluster_name = dest_cluster_name
153 # TODO: Implement timeouts for RAPI connections
154 # TODO: Support for using system default paths for verifying SSL certificate
155 logging.debug("Using '%s' as source CA", options.src_ca_file)
156 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
158 if options.dest_ca_file:
159 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
161 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
163 logging.debug("Using source CA for destination")
164 dest_curl_config = src_curl_config
166 logging.debug("Source RAPI server is %s:%s",
167 src_cluster_name, options.src_rapi_port)
168 logging.debug("Source username is '%s'", options.src_username)
170 if options.src_username is None:
173 src_username = options.src_username
175 if options.src_password_file:
176 logging.debug("Reading '%s' for source password",
177 options.src_password_file)
178 src_password = utils.ReadOneLineFile(options.src_password_file,
181 logging.debug("Source has no password")
184 self.GetSourceClient = lambda: \
185 rapi.client.GanetiRapiClient(src_cluster_name,
186 port=options.src_rapi_port,
187 curl_config_fn=src_curl_config,
188 username=src_username,
189 password=src_password)
191 if options.dest_rapi_port:
192 dest_rapi_port = options.dest_rapi_port
194 dest_rapi_port = options.src_rapi_port
196 if options.dest_username is None:
197 dest_username = src_username
199 dest_username = options.dest_username
201 logging.debug("Destination RAPI server is %s:%s",
202 dest_cluster_name, dest_rapi_port)
203 logging.debug("Destination username is '%s'", dest_username)
205 if options.dest_password_file:
206 logging.debug("Reading '%s' for destination password",
207 options.dest_password_file)
208 dest_password = utils.ReadOneLineFile(options.dest_password_file,
211 logging.debug("Using source password for destination")
212 dest_password = src_password
214 self.GetDestClient = lambda: \
215 rapi.client.GanetiRapiClient(dest_cluster_name,
217 curl_config_fn=dest_curl_config,
218 username=dest_username,
219 password=dest_password)
222 class MoveJobPollReportCb(cli.JobPollReportCbBase):
223 def __init__(self, abort_check_fn, remote_import_fn):
224 """Initializes this class.
226 @type abort_check_fn: callable
227 @param abort_check_fn: Function to check whether move is aborted
228 @type remote_import_fn: callable or None
229 @param remote_import_fn: Callback for reporting received remote import
233 cli.JobPollReportCbBase.__init__(self)
234 self._abort_check_fn = abort_check_fn
235 self._remote_import_fn = remote_import_fn
237 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
238 """Handles a log message.
241 if log_type == constants.ELOG_REMOTE_IMPORT:
242 logging.debug("Received remote import information")
244 if not self._remote_import_fn:
245 raise RuntimeError("Received unexpected remote import information")
247 assert "x509_ca" in log_msg
248 assert "disks" in log_msg
250 self._remote_import_fn(log_msg)
254 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
255 cli.FormatLogMessage(log_type, log_msg))
257 def ReportNotChanged(self, job_id, status):
258 """Called if a job hasn't changed in a while.
262 # Check whether we were told to abort by the other thread
263 self._abort_check_fn()
265 logging.warning("Aborting despite job %s still running", job_id)
269 class InstanceMove(object):
270 """Status class for instance moves.
273 def __init__(self, src_instance_name, dest_instance_name,
274 dest_pnode, dest_snode, dest_iallocator,
275 hvparams, beparams, osparams, nics):
276 """Initializes this class.
278 @type src_instance_name: string
279 @param src_instance_name: Instance name on source cluster
280 @type dest_instance_name: string
281 @param dest_instance_name: Instance name on destination cluster
282 @type dest_pnode: string or None
283 @param dest_pnode: Name of primary node on destination cluster
284 @type dest_snode: string or None
285 @param dest_snode: Name of secondary node on destination cluster
286 @type dest_iallocator: string or None
287 @param dest_iallocator: Name of iallocator to use
288 @type hvparams: dict or None
289 @param hvparams: Hypervisor parameters to override
290 @type beparams: dict or None
291 @param beparams: Backend parameters to override
292 @type osparams: dict or None
293 @param osparams: OS parameters to override
294 @type nics: dict or None
295 @param nics: NICs to override
298 self.src_instance_name = src_instance_name
299 self.dest_instance_name = dest_instance_name
300 self.dest_pnode = dest_pnode
301 self.dest_snode = dest_snode
302 self.dest_iallocator = dest_iallocator
303 self.hvparams = hvparams
304 self.beparams = beparams
305 self.osparams = osparams
308 self.error_message = None
311 class MoveRuntime(object):
312 """Class to keep track of instance move.
315 def __init__(self, move):
316 """Initializes this class.
318 @type move: L{InstanceMove}
323 # Thread synchronization
324 self.lock = threading.Lock()
325 self.source_to_dest = threading.Condition(self.lock)
326 self.dest_to_source = threading.Condition(self.lock)
329 self.src_error_message = None
330 self.src_expinfo = None
331 self.src_instinfo = None
333 # Destination information
334 self.dest_error_message = None
335 self.dest_impinfo = None
337 def HandleErrors(self, prefix, fn, *args):
338 """Wrapper to catch errors and abort threads.
341 @param prefix: Variable name prefix ("src" or "dest")
346 assert prefix in ("dest", "src")
349 # Call inner function
355 except Exception, err:
356 logging.exception("Caught unhandled exception")
359 setattr(self, "%s_error_message" % prefix, errmsg)
363 self.source_to_dest.notifyAll()
364 self.dest_to_source.notifyAll()
368 def CheckAbort(self):
369 """Check whether thread should be aborted.
371 @raise Abort: When thread should be aborted
374 if not (self.src_error_message is None and
375 self.dest_error_message is None):
376 logging.info("Aborting")
379 def Wait(self, cond, check_fn):
380 """Waits for a condition to become true.
382 @type cond: threading.Condition
383 @param cond: Threading condition
384 @type check_fn: callable
385 @param check_fn: Function to check whether condition is true
390 while check_fn(self):
396 def PollJob(self, cl, job_id, remote_import_fn=None):
397 """Wrapper for polling a job.
399 @type cl: L{rapi.client.GanetiRapiClient}
400 @param cl: RAPI client
402 @param job_id: Job ID
403 @type remote_import_fn: callable or None
404 @param remote_import_fn: Callback for reporting received remote import
408 return rapi.client_utils.PollJob(cl, job_id,
409 MoveJobPollReportCb(self.CheckAbort,
413 class MoveDestExecutor(object):
414 def __init__(self, dest_client, mrt):
415 """Destination side of an instance move.
417 @type dest_client: L{rapi.client.GanetiRapiClient}
418 @param dest_client: RAPI client
419 @type mrt: L{MoveRuntime}
420 @param mrt: Instance move runtime information
423 logging.debug("Waiting for instance information to become available")
424 mrt.Wait(mrt.source_to_dest,
425 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
427 logging.info("Creating instance %s in remote-import mode",
428 mrt.move.dest_instance_name)
429 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
430 mrt.move.dest_pnode, mrt.move.dest_snode,
431 mrt.move.dest_iallocator,
432 mrt.src_instinfo, mrt.src_expinfo,
433 mrt.move.hvparams, mrt.move.beparams,
434 mrt.move.beparams, mrt.move.nics)
435 mrt.PollJob(dest_client, job_id,
436 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
438 logging.info("Import successful")
441 def _SetImportInfo(mrt, impinfo):
442 """Sets the remote import information and notifies source thread.
444 @type mrt: L{MoveRuntime}
445 @param mrt: Instance move runtime information
446 @param impinfo: Remote import information
449 mrt.dest_to_source.acquire()
451 mrt.dest_impinfo = impinfo
452 mrt.dest_to_source.notifyAll()
454 mrt.dest_to_source.release()
457 def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
458 override_hvparams, override_beparams, override_osparams,
460 """Starts the instance creation in remote import mode.
462 @type cl: L{rapi.client.GanetiRapiClient}
463 @param cl: RAPI client
465 @param name: Instance name
466 @type pnode: string or None
467 @param pnode: Name of primary node on destination cluster
468 @type snode: string or None
469 @param snode: Name of secondary node on destination cluster
470 @type iallocator: string or None
471 @param iallocator: Name of iallocator to use
473 @param instance: Instance details from source cluster
475 @param expinfo: Prepared export information from source cluster
476 @type override_hvparams: dict or None
477 @param override_hvparams: Hypervisor parameters to override
478 @type override_beparams: dict or None
479 @param override_beparams: Backend parameters to override
480 @type override_osparams: dict or None
481 @param override_osparams: OS parameters to override
482 @type override_nics: dict or None
483 @param override_nics: NICs to override
487 disk_template = instance["disk_template"]
490 for idisk in instance["disks"]:
492 constants.IDISK_SIZE: idisk["size"],
493 constants.IDISK_MODE: idisk["mode"],
494 constants.IDISK_NAME: str(idisk.get("name")),
496 spindles = idisk.get("spindles")
497 if spindles is not None:
498 odisk[constants.IDISK_SPINDLES] = spindles
502 constants.INIC_IP: ip,
503 constants.INIC_MAC: mac,
504 constants.INIC_MODE: mode,
505 constants.INIC_LINK: link,
506 constants.INIC_NETWORK: network,
507 constants.INIC_NAME: nic_name
508 } for nic_name, _, ip, mac, mode, link, network, _ in instance["nics"]]
510 if len(override_nics) > len(nics):
511 raise Error("Can not create new NICs")
514 assert len(override_nics) <= len(nics)
515 for idx, (nic, override) in enumerate(zip(nics, override_nics)):
516 nics[idx] = objects.FillDict(nic, override)
518 # TODO: Should this be the actual up/down status? (run_state)
519 start = (instance["config_state"] == "up")
521 assert len(disks) == len(instance["disks"])
522 assert len(nics) == len(instance["nics"])
524 inst_beparams = instance["be_instance"]
525 if not inst_beparams:
528 inst_hvparams = instance["hv_instance"]
529 if not inst_hvparams:
532 inst_osparams = instance["os_instance"]
533 if not inst_osparams:
536 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
537 name, disk_template, disks, nics,
543 iallocator=iallocator,
544 hypervisor=instance["hypervisor"],
545 source_handshake=expinfo["handshake"],
546 source_x509_ca=expinfo["x509_ca"],
547 source_instance_name=instance["name"],
548 beparams=objects.FillDict(inst_beparams,
550 hvparams=objects.FillDict(inst_hvparams,
552 osparams=objects.FillDict(inst_osparams,
556 class MoveSourceExecutor(object):
557 def __init__(self, src_client, mrt):
558 """Source side of an instance move.
560 @type src_client: L{rapi.client.GanetiRapiClient}
561 @param src_client: RAPI client
562 @type mrt: L{MoveRuntime}
563 @param mrt: Instance move runtime information
566 logging.info("Checking whether instance exists")
567 self._CheckInstance(src_client, mrt.move.src_instance_name)
569 logging.info("Retrieving instance information from source cluster")
570 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
571 mrt.move.src_instance_name)
572 if (instinfo["disk_template"] in
573 [constants.DT_FILE, constants.DT_SHARED_FILE]):
574 raise Error("Inter-cluster move of file-based instances is not"
577 logging.info("Preparing export on source cluster")
578 expinfo = self._PrepareExport(src_client, mrt.PollJob,
579 mrt.move.src_instance_name)
580 assert "handshake" in expinfo
581 assert "x509_key_name" in expinfo
582 assert "x509_ca" in expinfo
584 # Hand information to destination thread
585 mrt.source_to_dest.acquire()
587 mrt.src_instinfo = instinfo
588 mrt.src_expinfo = expinfo
589 mrt.source_to_dest.notifyAll()
591 mrt.source_to_dest.release()
593 logging.info("Waiting for destination information to become available")
594 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
596 logging.info("Starting remote export on source cluster")
597 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
598 expinfo["x509_key_name"], mrt.dest_impinfo)
600 logging.info("Export successful")
603 def _CheckInstance(cl, name):
604 """Checks whether the instance exists on the source cluster.
606 @type cl: L{rapi.client.GanetiRapiClient}
607 @param cl: RAPI client
609 @param name: Instance name
614 except rapi.client.GanetiApiError, err:
615 if err.code == rapi.client.HTTP_NOT_FOUND:
616 raise Error("Instance %s not found (%s)" % (name, str(err)))
620 def _GetInstanceInfo(cl, poll_job_fn, name):
621 """Retrieves detailed instance information from source cluster.
623 @type cl: L{rapi.client.GanetiRapiClient}
624 @param cl: RAPI client
625 @type poll_job_fn: callable
626 @param poll_job_fn: Function to poll for job result
628 @param name: Instance name
631 job_id = cl.GetInstanceInfo(name, static=True)
632 result = poll_job_fn(cl, job_id)
633 assert len(result[0].keys()) == 1
634 return result[0][result[0].keys()[0]]
637 def _PrepareExport(cl, poll_job_fn, name):
638 """Prepares export on source cluster.
640 @type cl: L{rapi.client.GanetiRapiClient}
641 @param cl: RAPI client
642 @type poll_job_fn: callable
643 @param poll_job_fn: Function to poll for job result
645 @param name: Instance name
648 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
649 return poll_job_fn(cl, job_id)[0]
652 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
653 """Exports instance from source cluster.
655 @type cl: L{rapi.client.GanetiRapiClient}
656 @param cl: RAPI client
657 @type poll_job_fn: callable
658 @param poll_job_fn: Function to poll for job result
660 @param name: Instance name
661 @param x509_key_name: Source X509 key
662 @param impinfo: Import information from destination cluster
665 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
666 impinfo["disks"], shutdown=True,
667 remove_instance=True,
668 x509_key_name=x509_key_name,
669 destination_x509_ca=impinfo["x509_ca"])
670 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
672 if not (fin_resu and compat.all(dresults)):
673 raise Error("Export failed for disks %s" %
674 utils.CommaJoin(str(idx) for idx, result
675 in enumerate(dresults) if not result))
678 class MoveSourceWorker(workerpool.BaseWorker):
679 def RunTask(self, rapi_factory, move): # pylint: disable=W0221
680 """Executes an instance move.
682 @type rapi_factory: L{RapiClientFactory}
683 @param rapi_factory: RAPI client factory
684 @type move: L{InstanceMove}
685 @param move: Instance move information
689 logging.info("Preparing to move %s from cluster %s to %s as %s",
690 move.src_instance_name, rapi_factory.src_cluster_name,
691 rapi_factory.dest_cluster_name, move.dest_instance_name)
693 mrt = MoveRuntime(move)
695 logging.debug("Starting destination thread")
696 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
697 target=mrt.HandleErrors,
698 args=("dest", MoveDestExecutor,
699 rapi_factory.GetDestClient(),
703 mrt.HandleErrors("src", MoveSourceExecutor,
704 rapi_factory.GetSourceClient(), mrt)
708 if mrt.src_error_message or mrt.dest_error_message:
709 move.error_message = ("Source error: %s, destination error: %s" %
710 (mrt.src_error_message, mrt.dest_error_message))
712 move.error_message = None
713 except Exception, err: # pylint: disable=W0703
714 logging.exception("Caught unhandled exception")
715 move.error_message = str(err)
718 def CheckRapiSetup(rapi_factory):
719 """Checks the RAPI setup by retrieving the version.
721 @type rapi_factory: L{RapiClientFactory}
722 @param rapi_factory: RAPI client factory
725 src_client = rapi_factory.GetSourceClient()
726 logging.info("Connecting to source RAPI server")
727 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
729 dest_client = rapi_factory.GetDestClient()
730 logging.info("Connecting to destination RAPI server")
731 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
735 """Parses options passed to program.
738 program = os.path.basename(sys.argv[0])
740 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
741 " <source-cluster> <dest-cluster>"
744 parser.add_option(cli.DEBUG_OPT)
745 parser.add_option(cli.VERBOSE_OPT)
746 parser.add_option(cli.IALLOCATOR_OPT)
747 parser.add_option(cli.BACKEND_OPT)
748 parser.add_option(cli.HVOPTS_OPT)
749 parser.add_option(cli.OSPARAMS_OPT)
750 parser.add_option(cli.NET_OPT)
751 parser.add_option(SRC_RAPI_PORT_OPT)
752 parser.add_option(SRC_CA_FILE_OPT)
753 parser.add_option(SRC_USERNAME_OPT)
754 parser.add_option(SRC_PASSWORD_FILE_OPT)
755 parser.add_option(DEST_RAPI_PORT_OPT)
756 parser.add_option(DEST_CA_FILE_OPT)
757 parser.add_option(DEST_USERNAME_OPT)
758 parser.add_option(DEST_PASSWORD_FILE_OPT)
759 parser.add_option(DEST_INSTANCE_NAME_OPT)
760 parser.add_option(DEST_PRIMARY_NODE_OPT)
761 parser.add_option(DEST_SECONDARY_NODE_OPT)
762 parser.add_option(PARALLEL_OPT)
764 (options, args) = parser.parse_args()
766 return (parser, options, args)
769 def CheckOptions(parser, options, args):
770 """Checks options and arguments for validity.
774 parser.error("Not enough arguments")
776 src_cluster_name = args.pop(0)
777 dest_cluster_name = args.pop(0)
778 instance_names = args
780 assert len(instance_names) > 0
782 # TODO: Remove once using system default paths for SSL certificate
783 # verification is implemented
784 if not options.src_ca_file:
785 parser.error("Missing source cluster CA file")
787 if options.parallel < 1:
788 parser.error("Number of simultaneous moves must be >= 1")
790 if not (bool(options.iallocator) ^
791 bool(options.dest_primary_node or options.dest_secondary_node)):
792 parser.error("Destination node and iallocator options exclude each other")
794 if len(instance_names) == 1:
795 # Moving one instance only
796 if not (options.iallocator or
797 options.dest_primary_node or
798 options.dest_secondary_node):
799 parser.error("An iallocator or the destination node is required")
802 utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
805 utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
808 options.nics = cli.ParseNicOption(options.nics)
810 # Moving more than one instance
811 if (options.dest_instance_name or options.dest_primary_node or
812 options.dest_secondary_node or options.hvparams or
813 options.beparams or options.osparams or options.nics):
814 parser.error("The options --dest-instance-name, --dest-primary-node,"
815 " --dest-secondary-node, --hypervisor-parameters,"
816 " --backend-parameters, --os-parameters and --net can"
817 " only be used when moving exactly one instance")
819 if not options.iallocator:
820 parser.error("An iallocator must be specified for moving more than one"
823 return (src_cluster_name, dest_cluster_name, instance_names)
831 (parser, options, args) = ParseOptions()
833 utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
835 (src_cluster_name, dest_cluster_name, instance_names) = \
836 CheckOptions(parser, options, args)
838 logging.info("Source cluster: %s", src_cluster_name)
839 logging.info("Destination cluster: %s", dest_cluster_name)
840 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
842 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
844 CheckRapiSetup(rapi_factory)
846 assert (len(instance_names) == 1 or
847 not (options.dest_primary_node or options.dest_secondary_node))
848 assert len(instance_names) == 1 or options.iallocator
849 assert (len(instance_names) > 1 or options.iallocator or
850 options.dest_primary_node or options.dest_secondary_node)
851 assert (len(instance_names) == 1 or
852 not (options.hvparams or options.beparams or options.osparams or
855 # Prepare list of instance moves
857 for src_instance_name in instance_names:
858 if options.dest_instance_name:
859 assert len(instance_names) == 1
861 dest_instance_name = options.dest_instance_name
863 dest_instance_name = src_instance_name
865 moves.append(InstanceMove(src_instance_name, dest_instance_name,
866 options.dest_primary_node,
867 options.dest_secondary_node,
868 options.iallocator, options.hvparams,
869 options.beparams, options.osparams,
872 assert len(moves) == len(instance_names)
875 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
877 # Add instance moves to workerpool
879 wp.AddTask((rapi_factory, move))
881 # Wait for all moves to finish
885 wp.TerminateWorkers()
887 # There should be no threads running at this point, hence not using locks
890 logging.info("Instance move results:")
893 if move.dest_instance_name == move.src_instance_name:
894 name = move.src_instance_name
896 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
898 if move.error_message:
899 msg = "Failed (%s)" % move.error_message
903 logging.info("%s: %s", name, msg)
905 if compat.any(move.error_message for move in moves):
906 sys.exit(constants.EXIT_FAILURE)
908 sys.exit(constants.EXIT_SUCCESS)
911 if __name__ == "__main__":