4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
49 cli.cli_option("--src-rapi-port", action="store", type="int",
50 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51 help=("Source cluster RAPI port (defaults to %s)" %
52 constants.DEFAULT_RAPI_PORT))
55 cli.cli_option("--src-ca-file", action="store", type="string",
57 help=("File containing source cluster Certificate"
58 " Authority (CA) in PEM format"))
61 cli.cli_option("--src-username", action="store", type="string",
62 dest="src_username", default=None,
63 help="Source cluster username")
65 SRC_PASSWORD_FILE_OPT = \
66 cli.cli_option("--src-password-file", action="store", type="string",
67 dest="src_password_file",
68 help="File containing source cluster password")
70 DEST_RAPI_PORT_OPT = \
71 cli.cli_option("--dest-rapi-port", action="store", type="int",
72 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73 help=("Destination cluster RAPI port (defaults to source"
74 " cluster RAPI port)"))
77 cli.cli_option("--dest-ca-file", action="store", type="string",
79 help=("File containing destination cluster Certificate"
80 " Authority (CA) in PEM format (defaults to source"
84 cli.cli_option("--dest-username", action="store", type="string",
85 dest="dest_username", default=None,
86 help=("Destination cluster username (defaults to"
87 " source cluster username)"))
89 DEST_PASSWORD_FILE_OPT = \
90 cli.cli_option("--dest-password-file", action="store", type="string",
91 dest="dest_password_file",
92 help=("File containing destination cluster password"
93 " (defaults to source cluster password)"))
95 DEST_INSTANCE_NAME_OPT = \
96 cli.cli_option("--dest-instance-name", action="store", type="string",
97 dest="dest_instance_name",
98 help=("Instance name on destination cluster (only"
99 " when moving exactly one instance)"))
101 DEST_PRIMARY_NODE_OPT = \
102 cli.cli_option("--dest-primary-node", action="store", type="string",
103 dest="dest_primary_node",
104 help=("Primary node on destination cluster (only"
105 " when moving exactly one instance)"))
107 DEST_SECONDARY_NODE_OPT = \
108 cli.cli_option("--dest-secondary-node", action="store", type="string",
109 dest="dest_secondary_node",
110 help=("Secondary node on destination cluster (only"
111 " when moving exactly one instance)"))
114 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
115 dest="parallel", metavar="<number>",
116 help="Number of instances to be moved simultaneously")
119 class Error(Exception):
126 """Special exception for aborting import/export.
131 class RapiClientFactory:
132 """Factory class for creating RAPI clients.
134 @ivar src_cluster_name: Source cluster name
135 @ivar dest_cluster_name: Destination cluster name
136 @ivar GetSourceClient: Callable returning new client for source cluster
137 @ivar GetDestClient: Callable returning new client for destination cluster
140 def __init__(self, options, src_cluster_name, dest_cluster_name):
141 """Initializes this class.
143 @param options: Program options
144 @type src_cluster_name: string
145 @param src_cluster_name: Source cluster name
146 @type dest_cluster_name: string
147 @param dest_cluster_name: Destination cluster name
150 self.src_cluster_name = src_cluster_name
151 self.dest_cluster_name = dest_cluster_name
153 # TODO: Implement timeouts for RAPI connections
154 # TODO: Support for using system default paths for verifying SSL certificate
155 logging.debug("Using '%s' as source CA", options.src_ca_file)
156 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
158 if options.dest_ca_file:
159 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
161 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
163 logging.debug("Using source CA for destination")
164 dest_curl_config = src_curl_config
166 logging.debug("Source RAPI server is %s:%s",
167 src_cluster_name, options.src_rapi_port)
168 logging.debug("Source username is '%s'", options.src_username)
170 if options.src_username is None:
173 src_username = options.src_username
175 if options.src_password_file:
176 logging.debug("Reading '%s' for source password",
177 options.src_password_file)
178 src_password = utils.ReadOneLineFile(options.src_password_file,
181 logging.debug("Source has no password")
184 self.GetSourceClient = lambda: \
185 rapi.client.GanetiRapiClient(src_cluster_name,
186 port=options.src_rapi_port,
187 curl_config_fn=src_curl_config,
188 username=src_username,
189 password=src_password)
191 if options.dest_rapi_port:
192 dest_rapi_port = options.dest_rapi_port
194 dest_rapi_port = options.src_rapi_port
196 if options.dest_username is None:
197 dest_username = src_username
199 dest_username = options.dest_username
201 logging.debug("Destination RAPI server is %s:%s",
202 dest_cluster_name, dest_rapi_port)
203 logging.debug("Destination username is '%s'", dest_username)
205 if options.dest_password_file:
206 logging.debug("Reading '%s' for destination password",
207 options.dest_password_file)
208 dest_password = utils.ReadOneLineFile(options.dest_password_file,
211 logging.debug("Using source password for destination")
212 dest_password = src_password
214 self.GetDestClient = lambda: \
215 rapi.client.GanetiRapiClient(dest_cluster_name,
217 curl_config_fn=dest_curl_config,
218 username=dest_username,
219 password=dest_password)
222 class MoveJobPollReportCb(cli.JobPollReportCbBase):
223 def __init__(self, abort_check_fn, remote_import_fn):
224 """Initializes this class.
226 @type abort_check_fn: callable
227 @param abort_check_fn: Function to check whether move is aborted
228 @type remote_import_fn: callable or None
229 @param remote_import_fn: Callback for reporting received remote import
233 cli.JobPollReportCbBase.__init__(self)
234 self._abort_check_fn = abort_check_fn
235 self._remote_import_fn = remote_import_fn
237 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
238 """Handles a log message.
241 if log_type == constants.ELOG_REMOTE_IMPORT:
242 logging.debug("Received remote import information")
244 if not self._remote_import_fn:
245 raise RuntimeError("Received unexpected remote import information")
247 assert "x509_ca" in log_msg
248 assert "disks" in log_msg
250 self._remote_import_fn(log_msg)
254 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
255 cli.FormatLogMessage(log_type, log_msg))
257 def ReportNotChanged(self, job_id, status):
258 """Called if a job hasn't changed in a while.
262 # Check whether we were told to abort by the other thread
263 self._abort_check_fn()
265 logging.warning("Aborting despite job %s still running", job_id)
269 class InstanceMove(object):
270 """Status class for instance moves.
273 def __init__(self, src_instance_name, dest_instance_name,
274 dest_pnode, dest_snode, dest_iallocator,
275 hvparams, beparams, osparams, nics):
276 """Initializes this class.
278 @type src_instance_name: string
279 @param src_instance_name: Instance name on source cluster
280 @type dest_instance_name: string
281 @param dest_instance_name: Instance name on destination cluster
282 @type dest_pnode: string or None
283 @param dest_pnode: Name of primary node on destination cluster
284 @type dest_snode: string or None
285 @param dest_snode: Name of secondary node on destination cluster
286 @type dest_iallocator: string or None
287 @param dest_iallocator: Name of iallocator to use
288 @type hvparams: dict or None
289 @param hvparams: Hypervisor parameters to override
290 @type beparams: dict or None
291 @param beparams: Backend parameters to override
292 @type osparams: dict or None
293 @param osparams: OS parameters to override
294 @type nics: dict or None
295 @param nics: NICs to override
298 self.src_instance_name = src_instance_name
299 self.dest_instance_name = dest_instance_name
300 self.dest_pnode = dest_pnode
301 self.dest_snode = dest_snode
302 self.dest_iallocator = dest_iallocator
303 self.hvparams = hvparams
304 self.beparams = beparams
305 self.osparams = osparams
308 self.error_message = None
311 class MoveRuntime(object):
312 """Class to keep track of instance move.
315 def __init__(self, move):
316 """Initializes this class.
318 @type move: L{InstanceMove}
323 # Thread synchronization
324 self.lock = threading.Lock()
325 self.source_to_dest = threading.Condition(self.lock)
326 self.dest_to_source = threading.Condition(self.lock)
329 self.src_error_message = None
330 self.src_expinfo = None
331 self.src_instinfo = None
333 # Destination information
334 self.dest_error_message = None
335 self.dest_impinfo = None
337 def HandleErrors(self, prefix, fn, *args):
338 """Wrapper to catch errors and abort threads.
341 @param prefix: Variable name prefix ("src" or "dest")
346 assert prefix in ("dest", "src")
349 # Call inner function
355 except Exception, err:
356 logging.exception("Caught unhandled exception")
359 setattr(self, "%s_error_message" % prefix, errmsg)
363 self.source_to_dest.notifyAll()
364 self.dest_to_source.notifyAll()
368 def CheckAbort(self):
369 """Check whether thread should be aborted.
371 @raise Abort: When thread should be aborted
374 if not (self.src_error_message is None and
375 self.dest_error_message is None):
376 logging.info("Aborting")
379 def Wait(self, cond, check_fn):
380 """Waits for a condition to become true.
382 @type cond: threading.Condition
383 @param cond: Threading condition
384 @type check_fn: callable
385 @param check_fn: Function to check whether condition is true
390 while check_fn(self):
396 def PollJob(self, cl, job_id, remote_import_fn=None):
397 """Wrapper for polling a job.
399 @type cl: L{rapi.client.GanetiRapiClient}
400 @param cl: RAPI client
402 @param job_id: Job ID
403 @type remote_import_fn: callable or None
404 @param remote_import_fn: Callback for reporting received remote import
408 return rapi.client_utils.PollJob(cl, job_id,
409 MoveJobPollReportCb(self.CheckAbort,
413 class MoveDestExecutor(object):
414 def __init__(self, dest_client, mrt):
415 """Destination side of an instance move.
417 @type dest_client: L{rapi.client.GanetiRapiClient}
418 @param dest_client: RAPI client
419 @type mrt: L{MoveRuntime}
420 @param mrt: Instance move runtime information
423 logging.debug("Waiting for instance information to become available")
424 mrt.Wait(mrt.source_to_dest,
425 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
427 logging.info("Creating instance %s in remote-import mode",
428 mrt.move.dest_instance_name)
429 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
430 mrt.move.dest_pnode, mrt.move.dest_snode,
431 mrt.move.dest_iallocator,
432 mrt.src_instinfo, mrt.src_expinfo,
433 mrt.move.hvparams, mrt.move.beparams,
434 mrt.move.beparams, mrt.move.nics)
435 mrt.PollJob(dest_client, job_id,
436 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
438 logging.info("Import successful")
441 def _SetImportInfo(mrt, impinfo):
442 """Sets the remote import information and notifies source thread.
444 @type mrt: L{MoveRuntime}
445 @param mrt: Instance move runtime information
446 @param impinfo: Remote import information
449 mrt.dest_to_source.acquire()
451 mrt.dest_impinfo = impinfo
452 mrt.dest_to_source.notifyAll()
454 mrt.dest_to_source.release()
457 def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
458 override_hvparams, override_beparams, override_osparams,
460 """Starts the instance creation in remote import mode.
462 @type cl: L{rapi.client.GanetiRapiClient}
463 @param cl: RAPI client
465 @param name: Instance name
466 @type pnode: string or None
467 @param pnode: Name of primary node on destination cluster
468 @type snode: string or None
469 @param snode: Name of secondary node on destination cluster
470 @type iallocator: string or None
471 @param iallocator: Name of iallocator to use
473 @param instance: Instance details from source cluster
475 @param expinfo: Prepared export information from source cluster
476 @type override_hvparams: dict or None
477 @param override_hvparams: Hypervisor parameters to override
478 @type override_beparams: dict or None
479 @param override_beparams: Backend parameters to override
480 @type override_osparams: dict or None
481 @param override_osparams: OS parameters to override
482 @type override_nics: dict or None
483 @param override_nics: NICs to override
487 disk_template = instance["disk_template"]
490 constants.IDISK_SIZE: i["size"],
491 constants.IDISK_MODE: i["mode"],
492 constants.IDISK_NAME: str(i.get("name")),
493 } for i in instance["disks"]]
496 constants.INIC_IP: ip,
497 constants.INIC_MAC: mac,
498 constants.INIC_MODE: mode,
499 constants.INIC_LINK: link,
500 constants.INIC_NETWORK: network,
501 constants.INIC_NAME: nic_name
502 } for nic_name, _, ip, mac, mode, link, network, _ in instance["nics"]]
504 if len(override_nics) > len(nics):
505 raise Error("Can not create new NICs")
508 assert len(override_nics) <= len(nics)
509 for idx, (nic, override) in enumerate(zip(nics, override_nics)):
510 nics[idx] = objects.FillDict(nic, override)
512 # TODO: Should this be the actual up/down status? (run_state)
513 start = (instance["config_state"] == "up")
515 assert len(disks) == len(instance["disks"])
516 assert len(nics) == len(instance["nics"])
518 inst_beparams = instance["be_instance"]
519 if not inst_beparams:
522 inst_hvparams = instance["hv_instance"]
523 if not inst_hvparams:
526 inst_osparams = instance["os_instance"]
527 if not inst_osparams:
530 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
531 name, disk_template, disks, nics,
537 iallocator=iallocator,
538 hypervisor=instance["hypervisor"],
539 source_handshake=expinfo["handshake"],
540 source_x509_ca=expinfo["x509_ca"],
541 source_instance_name=instance["name"],
542 beparams=objects.FillDict(inst_beparams,
544 hvparams=objects.FillDict(inst_hvparams,
546 osparams=objects.FillDict(inst_osparams,
550 class MoveSourceExecutor(object):
551 def __init__(self, src_client, mrt):
552 """Source side of an instance move.
554 @type src_client: L{rapi.client.GanetiRapiClient}
555 @param src_client: RAPI client
556 @type mrt: L{MoveRuntime}
557 @param mrt: Instance move runtime information
560 logging.info("Checking whether instance exists")
561 self._CheckInstance(src_client, mrt.move.src_instance_name)
563 logging.info("Retrieving instance information from source cluster")
564 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
565 mrt.move.src_instance_name)
566 if instinfo["disk_template"] == constants.DT_FILE:
567 raise Error("Inter-cluster move of file-based instances is not"
570 logging.info("Preparing export on source cluster")
571 expinfo = self._PrepareExport(src_client, mrt.PollJob,
572 mrt.move.src_instance_name)
573 assert "handshake" in expinfo
574 assert "x509_key_name" in expinfo
575 assert "x509_ca" in expinfo
577 # Hand information to destination thread
578 mrt.source_to_dest.acquire()
580 mrt.src_instinfo = instinfo
581 mrt.src_expinfo = expinfo
582 mrt.source_to_dest.notifyAll()
584 mrt.source_to_dest.release()
586 logging.info("Waiting for destination information to become available")
587 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
589 logging.info("Starting remote export on source cluster")
590 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
591 expinfo["x509_key_name"], mrt.dest_impinfo)
593 logging.info("Export successful")
596 def _CheckInstance(cl, name):
597 """Checks whether the instance exists on the source cluster.
599 @type cl: L{rapi.client.GanetiRapiClient}
600 @param cl: RAPI client
602 @param name: Instance name
607 except rapi.client.GanetiApiError, err:
608 if err.code == rapi.client.HTTP_NOT_FOUND:
609 raise Error("Instance %s not found (%s)" % (name, str(err)))
613 def _GetInstanceInfo(cl, poll_job_fn, name):
614 """Retrieves detailed instance information from source cluster.
616 @type cl: L{rapi.client.GanetiRapiClient}
617 @param cl: RAPI client
618 @type poll_job_fn: callable
619 @param poll_job_fn: Function to poll for job result
621 @param name: Instance name
624 job_id = cl.GetInstanceInfo(name, static=True)
625 result = poll_job_fn(cl, job_id)
626 assert len(result[0].keys()) == 1
627 return result[0][result[0].keys()[0]]
630 def _PrepareExport(cl, poll_job_fn, name):
631 """Prepares export on source cluster.
633 @type cl: L{rapi.client.GanetiRapiClient}
634 @param cl: RAPI client
635 @type poll_job_fn: callable
636 @param poll_job_fn: Function to poll for job result
638 @param name: Instance name
641 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
642 return poll_job_fn(cl, job_id)[0]
645 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
646 """Exports instance from source cluster.
648 @type cl: L{rapi.client.GanetiRapiClient}
649 @param cl: RAPI client
650 @type poll_job_fn: callable
651 @param poll_job_fn: Function to poll for job result
653 @param name: Instance name
654 @param x509_key_name: Source X509 key
655 @param impinfo: Import information from destination cluster
658 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
659 impinfo["disks"], shutdown=True,
660 remove_instance=True,
661 x509_key_name=x509_key_name,
662 destination_x509_ca=impinfo["x509_ca"])
663 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
665 if not (fin_resu and compat.all(dresults)):
666 raise Error("Export failed for disks %s" %
667 utils.CommaJoin(str(idx) for idx, result
668 in enumerate(dresults) if not result))
671 class MoveSourceWorker(workerpool.BaseWorker):
672 def RunTask(self, rapi_factory, move): # pylint: disable=W0221
673 """Executes an instance move.
675 @type rapi_factory: L{RapiClientFactory}
676 @param rapi_factory: RAPI client factory
677 @type move: L{InstanceMove}
678 @param move: Instance move information
682 logging.info("Preparing to move %s from cluster %s to %s as %s",
683 move.src_instance_name, rapi_factory.src_cluster_name,
684 rapi_factory.dest_cluster_name, move.dest_instance_name)
686 mrt = MoveRuntime(move)
688 logging.debug("Starting destination thread")
689 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
690 target=mrt.HandleErrors,
691 args=("dest", MoveDestExecutor,
692 rapi_factory.GetDestClient(),
696 mrt.HandleErrors("src", MoveSourceExecutor,
697 rapi_factory.GetSourceClient(), mrt)
701 if mrt.src_error_message or mrt.dest_error_message:
702 move.error_message = ("Source error: %s, destination error: %s" %
703 (mrt.src_error_message, mrt.dest_error_message))
705 move.error_message = None
706 except Exception, err: # pylint: disable=W0703
707 logging.exception("Caught unhandled exception")
708 move.error_message = str(err)
711 def CheckRapiSetup(rapi_factory):
712 """Checks the RAPI setup by retrieving the version.
714 @type rapi_factory: L{RapiClientFactory}
715 @param rapi_factory: RAPI client factory
718 src_client = rapi_factory.GetSourceClient()
719 logging.info("Connecting to source RAPI server")
720 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
722 dest_client = rapi_factory.GetDestClient()
723 logging.info("Connecting to destination RAPI server")
724 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
728 """Parses options passed to program.
731 program = os.path.basename(sys.argv[0])
733 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
734 " <source-cluster> <dest-cluster>"
737 parser.add_option(cli.DEBUG_OPT)
738 parser.add_option(cli.VERBOSE_OPT)
739 parser.add_option(cli.IALLOCATOR_OPT)
740 parser.add_option(cli.BACKEND_OPT)
741 parser.add_option(cli.HVOPTS_OPT)
742 parser.add_option(cli.OSPARAMS_OPT)
743 parser.add_option(cli.NET_OPT)
744 parser.add_option(SRC_RAPI_PORT_OPT)
745 parser.add_option(SRC_CA_FILE_OPT)
746 parser.add_option(SRC_USERNAME_OPT)
747 parser.add_option(SRC_PASSWORD_FILE_OPT)
748 parser.add_option(DEST_RAPI_PORT_OPT)
749 parser.add_option(DEST_CA_FILE_OPT)
750 parser.add_option(DEST_USERNAME_OPT)
751 parser.add_option(DEST_PASSWORD_FILE_OPT)
752 parser.add_option(DEST_INSTANCE_NAME_OPT)
753 parser.add_option(DEST_PRIMARY_NODE_OPT)
754 parser.add_option(DEST_SECONDARY_NODE_OPT)
755 parser.add_option(PARALLEL_OPT)
757 (options, args) = parser.parse_args()
759 return (parser, options, args)
762 def CheckOptions(parser, options, args):
763 """Checks options and arguments for validity.
767 parser.error("Not enough arguments")
769 src_cluster_name = args.pop(0)
770 dest_cluster_name = args.pop(0)
771 instance_names = args
773 assert len(instance_names) > 0
775 # TODO: Remove once using system default paths for SSL certificate
776 # verification is implemented
777 if not options.src_ca_file:
778 parser.error("Missing source cluster CA file")
780 if options.parallel < 1:
781 parser.error("Number of simultaneous moves must be >= 1")
783 if not (bool(options.iallocator) ^
784 bool(options.dest_primary_node or options.dest_secondary_node)):
785 parser.error("Destination node and iallocator options exclude each other")
787 if len(instance_names) == 1:
788 # Moving one instance only
789 if not (options.iallocator or
790 options.dest_primary_node or
791 options.dest_secondary_node):
792 parser.error("An iallocator or the destination node is required")
795 utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
798 utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
801 options.nics = cli.ParseNicOption(options.nics)
803 # Moving more than one instance
804 if (options.dest_instance_name or options.dest_primary_node or
805 options.dest_secondary_node or options.hvparams or
806 options.beparams or options.osparams or options.nics):
807 parser.error("The options --dest-instance-name, --dest-primary-node,"
808 " --dest-secondary-node, --hypervisor-parameters,"
809 " --backend-parameters, --os-parameters and --net can"
810 " only be used when moving exactly one instance")
812 if not options.iallocator:
813 parser.error("An iallocator must be specified for moving more than one"
816 return (src_cluster_name, dest_cluster_name, instance_names)
824 (parser, options, args) = ParseOptions()
826 utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
828 (src_cluster_name, dest_cluster_name, instance_names) = \
829 CheckOptions(parser, options, args)
831 logging.info("Source cluster: %s", src_cluster_name)
832 logging.info("Destination cluster: %s", dest_cluster_name)
833 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
835 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
837 CheckRapiSetup(rapi_factory)
839 assert (len(instance_names) == 1 or
840 not (options.dest_primary_node or options.dest_secondary_node))
841 assert len(instance_names) == 1 or options.iallocator
842 assert (len(instance_names) > 1 or options.iallocator or
843 options.dest_primary_node or options.dest_secondary_node)
844 assert (len(instance_names) == 1 or
845 not (options.hvparams or options.beparams or options.osparams or
848 # Prepare list of instance moves
850 for src_instance_name in instance_names:
851 if options.dest_instance_name:
852 assert len(instance_names) == 1
854 dest_instance_name = options.dest_instance_name
856 dest_instance_name = src_instance_name
858 moves.append(InstanceMove(src_instance_name, dest_instance_name,
859 options.dest_primary_node,
860 options.dest_secondary_node,
861 options.iallocator, options.hvparams,
862 options.beparams, options.osparams,
865 assert len(moves) == len(instance_names)
868 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
870 # Add instance moves to workerpool
872 wp.AddTask((rapi_factory, move))
874 # Wait for all moves to finish
878 wp.TerminateWorkers()
880 # There should be no threads running at this point, hence not using locks
883 logging.info("Instance move results:")
886 if move.dest_instance_name == move.src_instance_name:
887 name = move.src_instance_name
889 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
891 if move.error_message:
892 msg = "Failed (%s)" % move.error_message
896 logging.info("%s: %s", name, msg)
898 if compat.any(move.error_message for move in moves):
899 sys.exit(constants.EXIT_FAILURE)
901 sys.exit(constants.EXIT_SUCCESS)
904 if __name__ == "__main__":