4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
49 cli.cli_option("--src-rapi-port", action="store", type="int",
50 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51 help=("Source cluster RAPI port (defaults to %s)" %
52 constants.DEFAULT_RAPI_PORT))
55 cli.cli_option("--src-ca-file", action="store", type="string",
57 help=("File containing source cluster Certificate"
58 " Authority (CA) in PEM format"))
61 cli.cli_option("--src-username", action="store", type="string",
62 dest="src_username", default=None,
63 help="Source cluster username")
65 SRC_PASSWORD_FILE_OPT = \
66 cli.cli_option("--src-password-file", action="store", type="string",
67 dest="src_password_file",
68 help="File containing source cluster password")
70 DEST_RAPI_PORT_OPT = \
71 cli.cli_option("--dest-rapi-port", action="store", type="int",
72 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73 help=("Destination cluster RAPI port (defaults to source"
74 " cluster RAPI port)"))
77 cli.cli_option("--dest-ca-file", action="store", type="string",
79 help=("File containing destination cluster Certificate"
80 " Authority (CA) in PEM format (defaults to source"
84 cli.cli_option("--dest-username", action="store", type="string",
85 dest="dest_username", default=None,
86 help=("Destination cluster username (defaults to"
87 " source cluster username)"))
89 DEST_PASSWORD_FILE_OPT = \
90 cli.cli_option("--dest-password-file", action="store", type="string",
91 dest="dest_password_file",
92 help=("File containing destination cluster password"
93 " (defaults to source cluster password)"))
95 DEST_INSTANCE_NAME_OPT = \
96 cli.cli_option("--dest-instance-name", action="store", type="string",
97 dest="dest_instance_name",
98 help=("Instance name on destination cluster (only"
99 " when moving exactly one instance)"))
101 DEST_PRIMARY_NODE_OPT = \
102 cli.cli_option("--dest-primary-node", action="store", type="string",
103 dest="dest_primary_node",
104 help=("Primary node on destination cluster (only"
105 " when moving exactly one instance)"))
107 DEST_SECONDARY_NODE_OPT = \
108 cli.cli_option("--dest-secondary-node", action="store", type="string",
109 dest="dest_secondary_node",
110 help=("Secondary node on destination cluster (only"
111 " when moving exactly one instance)"))
114 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
115 dest="parallel", metavar="<number>",
116 help="Number of instances to be moved simultaneously")
119 class Error(Exception):
126 """Special exception for aborting import/export.
131 class RapiClientFactory:
132 """Factory class for creating RAPI clients.
134 @ivar src_cluster_name: Source cluster name
135 @ivar dest_cluster_name: Destination cluster name
136 @ivar GetSourceClient: Callable returning new client for source cluster
137 @ivar GetDestClient: Callable returning new client for destination cluster
140 def __init__(self, options, src_cluster_name, dest_cluster_name):
141 """Initializes this class.
143 @param options: Program options
144 @type src_cluster_name: string
145 @param src_cluster_name: Source cluster name
146 @type dest_cluster_name: string
147 @param dest_cluster_name: Destination cluster name
150 self.src_cluster_name = src_cluster_name
151 self.dest_cluster_name = dest_cluster_name
153 # TODO: Implement timeouts for RAPI connections
154 # TODO: Support for using system default paths for verifying SSL certificate
155 logging.debug("Using '%s' as source CA", options.src_ca_file)
156 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
158 if options.dest_ca_file:
159 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
161 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
163 logging.debug("Using source CA for destination")
164 dest_curl_config = src_curl_config
166 logging.debug("Source RAPI server is %s:%s",
167 src_cluster_name, options.src_rapi_port)
168 logging.debug("Source username is '%s'", options.src_username)
170 if options.src_username is None:
173 src_username = options.src_username
175 if options.src_password_file:
176 logging.debug("Reading '%s' for source password",
177 options.src_password_file)
178 src_password = utils.ReadOneLineFile(options.src_password_file,
181 logging.debug("Source has no password")
184 self.GetSourceClient = lambda: \
185 rapi.client.GanetiRapiClient(src_cluster_name,
186 port=options.src_rapi_port,
187 curl_config_fn=src_curl_config,
188 username=src_username,
189 password=src_password)
191 if options.dest_rapi_port:
192 dest_rapi_port = options.dest_rapi_port
194 dest_rapi_port = options.src_rapi_port
196 if options.dest_username is None:
197 dest_username = src_username
199 dest_username = options.dest_username
201 logging.debug("Destination RAPI server is %s:%s",
202 dest_cluster_name, dest_rapi_port)
203 logging.debug("Destination username is '%s'", dest_username)
205 if options.dest_password_file:
206 logging.debug("Reading '%s' for destination password",
207 options.dest_password_file)
208 dest_password = utils.ReadOneLineFile(options.dest_password_file,
211 logging.debug("Using source password for destination")
212 dest_password = src_password
214 self.GetDestClient = lambda: \
215 rapi.client.GanetiRapiClient(dest_cluster_name,
217 curl_config_fn=dest_curl_config,
218 username=dest_username,
219 password=dest_password)
222 class MoveJobPollReportCb(cli.JobPollReportCbBase):
223 def __init__(self, abort_check_fn, remote_import_fn):
224 """Initializes this class.
226 @type abort_check_fn: callable
227 @param abort_check_fn: Function to check whether move is aborted
228 @type remote_import_fn: callable or None
229 @param remote_import_fn: Callback for reporting received remote import
233 cli.JobPollReportCbBase.__init__(self)
234 self._abort_check_fn = abort_check_fn
235 self._remote_import_fn = remote_import_fn
237 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
238 """Handles a log message.
241 if log_type == constants.ELOG_REMOTE_IMPORT:
242 logging.debug("Received remote import information")
244 if not self._remote_import_fn:
245 raise RuntimeError("Received unexpected remote import information")
247 assert "x509_ca" in log_msg
248 assert "disks" in log_msg
250 self._remote_import_fn(log_msg)
254 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
255 cli.FormatLogMessage(log_type, log_msg))
257 def ReportNotChanged(self, job_id, status):
258 """Called if a job hasn't changed in a while.
262 # Check whether we were told to abort by the other thread
263 self._abort_check_fn()
265 logging.warning("Aborting despite job %s still running", job_id)
269 class InstanceMove(object):
270 """Status class for instance moves.
273 def __init__(self, src_instance_name, dest_instance_name,
274 dest_pnode, dest_snode, dest_iallocator,
275 hvparams, beparams, osparams, nics):
276 """Initializes this class.
278 @type src_instance_name: string
279 @param src_instance_name: Instance name on source cluster
280 @type dest_instance_name: string
281 @param dest_instance_name: Instance name on destination cluster
282 @type dest_pnode: string or None
283 @param dest_pnode: Name of primary node on destination cluster
284 @type dest_snode: string or None
285 @param dest_snode: Name of secondary node on destination cluster
286 @type dest_iallocator: string or None
287 @param dest_iallocator: Name of iallocator to use
288 @type hvparams: dict or None
289 @param hvparams: Hypervisor parameters to override
290 @type beparams: dict or None
291 @param beparams: Backend parameters to override
292 @type osparams: dict or None
293 @param osparams: OS parameters to override
294 @type nics: dict or None
295 @param nics: NICs to override
298 self.src_instance_name = src_instance_name
299 self.dest_instance_name = dest_instance_name
300 self.dest_pnode = dest_pnode
301 self.dest_snode = dest_snode
302 self.dest_iallocator = dest_iallocator
303 self.hvparams = hvparams
304 self.beparams = beparams
305 self.osparams = osparams
308 self.error_message = None
311 class MoveRuntime(object):
312 """Class to keep track of instance move.
315 def __init__(self, move):
316 """Initializes this class.
318 @type move: L{InstanceMove}
323 # Thread synchronization
324 self.lock = threading.Lock()
325 self.source_to_dest = threading.Condition(self.lock)
326 self.dest_to_source = threading.Condition(self.lock)
329 self.src_error_message = None
330 self.src_expinfo = None
331 self.src_instinfo = None
333 # Destination information
334 self.dest_error_message = None
335 self.dest_impinfo = None
337 def HandleErrors(self, prefix, fn, *args):
338 """Wrapper to catch errors and abort threads.
341 @param prefix: Variable name prefix ("src" or "dest")
346 assert prefix in ("dest", "src")
349 # Call inner function
355 except Exception, err:
356 logging.exception("Caught unhandled exception")
359 setattr(self, "%s_error_message" % prefix, errmsg)
363 self.source_to_dest.notifyAll()
364 self.dest_to_source.notifyAll()
368 def CheckAbort(self):
369 """Check whether thread should be aborted.
371 @raise Abort: When thread should be aborted
374 if not (self.src_error_message is None and
375 self.dest_error_message is None):
376 logging.info("Aborting")
379 def Wait(self, cond, check_fn):
380 """Waits for a condition to become true.
382 @type cond: threading.Condition
383 @param cond: Threading condition
384 @type check_fn: callable
385 @param check_fn: Function to check whether condition is true
390 while check_fn(self):
396 def PollJob(self, cl, job_id, remote_import_fn=None):
397 """Wrapper for polling a job.
399 @type cl: L{rapi.client.GanetiRapiClient}
400 @param cl: RAPI client
402 @param job_id: Job ID
403 @type remote_import_fn: callable or None
404 @param remote_import_fn: Callback for reporting received remote import
408 return rapi.client_utils.PollJob(cl, job_id,
409 MoveJobPollReportCb(self.CheckAbort,
413 class MoveDestExecutor(object):
414 def __init__(self, dest_client, mrt):
415 """Destination side of an instance move.
417 @type dest_client: L{rapi.client.GanetiRapiClient}
418 @param dest_client: RAPI client
419 @type mrt: L{MoveRuntime}
420 @param mrt: Instance move runtime information
423 logging.debug("Waiting for instance information to become available")
424 mrt.Wait(mrt.source_to_dest,
425 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
427 logging.info("Creating instance %s in remote-import mode",
428 mrt.move.dest_instance_name)
429 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
430 mrt.move.dest_pnode, mrt.move.dest_snode,
431 mrt.move.dest_iallocator,
432 mrt.src_instinfo, mrt.src_expinfo,
433 mrt.move.hvparams, mrt.move.beparams,
434 mrt.move.beparams, mrt.move.nics)
435 mrt.PollJob(dest_client, job_id,
436 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
438 logging.info("Import successful")
441 def _SetImportInfo(mrt, impinfo):
442 """Sets the remote import information and notifies source thread.
444 @type mrt: L{MoveRuntime}
445 @param mrt: Instance move runtime information
446 @param impinfo: Remote import information
449 mrt.dest_to_source.acquire()
451 mrt.dest_impinfo = impinfo
452 mrt.dest_to_source.notifyAll()
454 mrt.dest_to_source.release()
457 def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
458 override_hvparams, override_beparams, override_osparams,
460 """Starts the instance creation in remote import mode.
462 @type cl: L{rapi.client.GanetiRapiClient}
463 @param cl: RAPI client
465 @param name: Instance name
466 @type pnode: string or None
467 @param pnode: Name of primary node on destination cluster
468 @type snode: string or None
469 @param snode: Name of secondary node on destination cluster
470 @type iallocator: string or None
471 @param iallocator: Name of iallocator to use
473 @param instance: Instance details from source cluster
475 @param expinfo: Prepared export information from source cluster
476 @type override_hvparams: dict or None
477 @param override_hvparams: Hypervisor parameters to override
478 @type override_beparams: dict or None
479 @param override_beparams: Backend parameters to override
480 @type override_osparams: dict or None
481 @param override_osparams: OS parameters to override
482 @type override_nics: dict or None
483 @param override_nics: NICs to override
487 disk_template = instance["disk_template"]
490 constants.IDISK_SIZE: i["size"],
491 constants.IDISK_MODE: i["mode"],
492 } for i in instance["disks"]]
495 constants.INIC_IP: ip,
496 constants.INIC_MAC: mac,
497 constants.INIC_MODE: mode,
498 constants.INIC_LINK: link,
499 constants.INIC_NETWORK: network
500 } for ip, mac, mode, link, network, _ in instance["nics"]]
502 if len(override_nics) > len(nics):
503 raise Error("Can not create new NICs")
506 assert len(override_nics) <= len(nics)
507 for idx, (nic, override) in enumerate(zip(nics, override_nics)):
508 nics[idx] = objects.FillDict(nic, override)
510 # TODO: Should this be the actual up/down status? (run_state)
511 start = (instance["config_state"] == "up")
513 assert len(disks) == len(instance["disks"])
514 assert len(nics) == len(instance["nics"])
516 inst_beparams = instance["be_instance"]
517 if not inst_beparams:
520 inst_hvparams = instance["hv_instance"]
521 if not inst_hvparams:
524 inst_osparams = instance["os_instance"]
525 if not inst_osparams:
528 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
529 name, disk_template, disks, nics,
535 iallocator=iallocator,
536 hypervisor=instance["hypervisor"],
537 source_handshake=expinfo["handshake"],
538 source_x509_ca=expinfo["x509_ca"],
539 source_instance_name=instance["name"],
540 beparams=objects.FillDict(inst_beparams,
542 hvparams=objects.FillDict(inst_hvparams,
544 osparams=objects.FillDict(inst_osparams,
548 class MoveSourceExecutor(object):
549 def __init__(self, src_client, mrt):
550 """Source side of an instance move.
552 @type src_client: L{rapi.client.GanetiRapiClient}
553 @param src_client: RAPI client
554 @type mrt: L{MoveRuntime}
555 @param mrt: Instance move runtime information
558 logging.info("Checking whether instance exists")
559 self._CheckInstance(src_client, mrt.move.src_instance_name)
561 logging.info("Retrieving instance information from source cluster")
562 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
563 mrt.move.src_instance_name)
565 logging.info("Preparing export on source cluster")
566 expinfo = self._PrepareExport(src_client, mrt.PollJob,
567 mrt.move.src_instance_name)
568 assert "handshake" in expinfo
569 assert "x509_key_name" in expinfo
570 assert "x509_ca" in expinfo
572 # Hand information to destination thread
573 mrt.source_to_dest.acquire()
575 mrt.src_instinfo = instinfo
576 mrt.src_expinfo = expinfo
577 mrt.source_to_dest.notifyAll()
579 mrt.source_to_dest.release()
581 logging.info("Waiting for destination information to become available")
582 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
584 logging.info("Starting remote export on source cluster")
585 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
586 expinfo["x509_key_name"], mrt.dest_impinfo)
588 logging.info("Export successful")
591 def _CheckInstance(cl, name):
592 """Checks whether the instance exists on the source cluster.
594 @type cl: L{rapi.client.GanetiRapiClient}
595 @param cl: RAPI client
597 @param name: Instance name
602 except rapi.client.GanetiApiError, err:
603 if err.code == rapi.client.HTTP_NOT_FOUND:
604 raise Error("Instance %s not found (%s)" % (name, str(err)))
608 def _GetInstanceInfo(cl, poll_job_fn, name):
609 """Retrieves detailed instance information from source cluster.
611 @type cl: L{rapi.client.GanetiRapiClient}
612 @param cl: RAPI client
613 @type poll_job_fn: callable
614 @param poll_job_fn: Function to poll for job result
616 @param name: Instance name
619 job_id = cl.GetInstanceInfo(name, static=True)
620 result = poll_job_fn(cl, job_id)
621 assert len(result[0].keys()) == 1
622 return result[0][result[0].keys()[0]]
625 def _PrepareExport(cl, poll_job_fn, name):
626 """Prepares export on source cluster.
628 @type cl: L{rapi.client.GanetiRapiClient}
629 @param cl: RAPI client
630 @type poll_job_fn: callable
631 @param poll_job_fn: Function to poll for job result
633 @param name: Instance name
636 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
637 return poll_job_fn(cl, job_id)[0]
640 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
641 """Exports instance from source cluster.
643 @type cl: L{rapi.client.GanetiRapiClient}
644 @param cl: RAPI client
645 @type poll_job_fn: callable
646 @param poll_job_fn: Function to poll for job result
648 @param name: Instance name
649 @param x509_key_name: Source X509 key
650 @param impinfo: Import information from destination cluster
653 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
654 impinfo["disks"], shutdown=True,
655 remove_instance=True,
656 x509_key_name=x509_key_name,
657 destination_x509_ca=impinfo["x509_ca"])
658 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
660 if not (fin_resu and compat.all(dresults)):
661 raise Error("Export failed for disks %s" %
662 utils.CommaJoin(str(idx) for idx, result
663 in enumerate(dresults) if not result))
666 class MoveSourceWorker(workerpool.BaseWorker):
667 def RunTask(self, rapi_factory, move): # pylint: disable=W0221
668 """Executes an instance move.
670 @type rapi_factory: L{RapiClientFactory}
671 @param rapi_factory: RAPI client factory
672 @type move: L{InstanceMove}
673 @param move: Instance move information
677 logging.info("Preparing to move %s from cluster %s to %s as %s",
678 move.src_instance_name, rapi_factory.src_cluster_name,
679 rapi_factory.dest_cluster_name, move.dest_instance_name)
681 mrt = MoveRuntime(move)
683 logging.debug("Starting destination thread")
684 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
685 target=mrt.HandleErrors,
686 args=("dest", MoveDestExecutor,
687 rapi_factory.GetDestClient(),
691 mrt.HandleErrors("src", MoveSourceExecutor,
692 rapi_factory.GetSourceClient(), mrt)
696 if mrt.src_error_message or mrt.dest_error_message:
697 move.error_message = ("Source error: %s, destination error: %s" %
698 (mrt.src_error_message, mrt.dest_error_message))
700 move.error_message = None
701 except Exception, err: # pylint: disable=W0703
702 logging.exception("Caught unhandled exception")
703 move.error_message = str(err)
706 def CheckRapiSetup(rapi_factory):
707 """Checks the RAPI setup by retrieving the version.
709 @type rapi_factory: L{RapiClientFactory}
710 @param rapi_factory: RAPI client factory
713 src_client = rapi_factory.GetSourceClient()
714 logging.info("Connecting to source RAPI server")
715 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
717 dest_client = rapi_factory.GetDestClient()
718 logging.info("Connecting to destination RAPI server")
719 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
723 """Parses options passed to program.
726 program = os.path.basename(sys.argv[0])
728 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
729 " <source-cluster> <dest-cluster>"
732 parser.add_option(cli.DEBUG_OPT)
733 parser.add_option(cli.VERBOSE_OPT)
734 parser.add_option(cli.IALLOCATOR_OPT)
735 parser.add_option(cli.BACKEND_OPT)
736 parser.add_option(cli.HVOPTS_OPT)
737 parser.add_option(cli.OSPARAMS_OPT)
738 parser.add_option(cli.NET_OPT)
739 parser.add_option(SRC_RAPI_PORT_OPT)
740 parser.add_option(SRC_CA_FILE_OPT)
741 parser.add_option(SRC_USERNAME_OPT)
742 parser.add_option(SRC_PASSWORD_FILE_OPT)
743 parser.add_option(DEST_RAPI_PORT_OPT)
744 parser.add_option(DEST_CA_FILE_OPT)
745 parser.add_option(DEST_USERNAME_OPT)
746 parser.add_option(DEST_PASSWORD_FILE_OPT)
747 parser.add_option(DEST_INSTANCE_NAME_OPT)
748 parser.add_option(DEST_PRIMARY_NODE_OPT)
749 parser.add_option(DEST_SECONDARY_NODE_OPT)
750 parser.add_option(PARALLEL_OPT)
752 (options, args) = parser.parse_args()
754 return (parser, options, args)
757 def CheckOptions(parser, options, args):
758 """Checks options and arguments for validity.
762 parser.error("Not enough arguments")
764 src_cluster_name = args.pop(0)
765 dest_cluster_name = args.pop(0)
766 instance_names = args
768 assert len(instance_names) > 0
770 # TODO: Remove once using system default paths for SSL certificate
771 # verification is implemented
772 if not options.src_ca_file:
773 parser.error("Missing source cluster CA file")
775 if options.parallel < 1:
776 parser.error("Number of simultaneous moves must be >= 1")
778 if not (bool(options.iallocator) ^
779 bool(options.dest_primary_node or options.dest_secondary_node)):
780 parser.error("Destination node and iallocator options exclude each other")
782 if len(instance_names) == 1:
783 # Moving one instance only
784 if not (options.iallocator or
785 options.dest_primary_node or
786 options.dest_secondary_node):
787 parser.error("An iallocator or the destination node is required")
790 utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
793 utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
796 options.nics = cli.ParseNicOption(options.nics)
798 # Moving more than one instance
799 if (options.dest_instance_name or options.dest_primary_node or
800 options.dest_secondary_node or options.hvparams or
801 options.beparams or options.osparams or options.nics):
802 parser.error("The options --dest-instance-name, --dest-primary-node,"
803 " --dest-secondary-node, --hypervisor-parameters,"
804 " --backend-parameters, --os-parameters and --net can"
805 " only be used when moving exactly one instance")
807 if not options.iallocator:
808 parser.error("An iallocator must be specified for moving more than one"
811 return (src_cluster_name, dest_cluster_name, instance_names)
819 (parser, options, args) = ParseOptions()
821 utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
823 (src_cluster_name, dest_cluster_name, instance_names) = \
824 CheckOptions(parser, options, args)
826 logging.info("Source cluster: %s", src_cluster_name)
827 logging.info("Destination cluster: %s", dest_cluster_name)
828 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
830 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
832 CheckRapiSetup(rapi_factory)
834 assert (len(instance_names) == 1 or
835 not (options.dest_primary_node or options.dest_secondary_node))
836 assert len(instance_names) == 1 or options.iallocator
837 assert (len(instance_names) > 1 or options.iallocator or
838 options.dest_primary_node or options.dest_secondary_node)
839 assert (len(instance_names) == 1 or
840 not (options.hvparams or options.beparams or options.osparams or
843 # Prepare list of instance moves
845 for src_instance_name in instance_names:
846 if options.dest_instance_name:
847 assert len(instance_names) == 1
849 dest_instance_name = options.dest_instance_name
851 dest_instance_name = src_instance_name
853 moves.append(InstanceMove(src_instance_name, dest_instance_name,
854 options.dest_primary_node,
855 options.dest_secondary_node,
856 options.iallocator, options.hvparams,
857 options.beparams, options.osparams,
860 assert len(moves) == len(instance_names)
863 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
865 # Add instance moves to workerpool
867 wp.AddTask((rapi_factory, move))
869 # Wait for all moves to finish
873 wp.TerminateWorkers()
875 # There should be no threads running at this point, hence not using locks
878 logging.info("Instance move results:")
881 if move.dest_instance_name == move.src_instance_name:
882 name = move.src_instance_name
884 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
886 if move.error_message:
887 msg = "Failed (%s)" % move.error_message
891 logging.info("%s: %s", name, msg)
893 if compat.any(move.error_message for move in moves):
894 sys.exit(constants.EXIT_FAILURE)
896 sys.exit(constants.EXIT_SUCCESS)
899 if __name__ == "__main__":