4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
49 cli.cli_option("--src-rapi-port", action="store", type="int",
50 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51 help=("Source cluster RAPI port (defaults to %s)" %
52 constants.DEFAULT_RAPI_PORT))
55 cli.cli_option("--src-ca-file", action="store", type="string",
57 help=("File containing source cluster Certificate"
58 " Authority (CA) in PEM format"))
61 cli.cli_option("--src-username", action="store", type="string",
62 dest="src_username", default=None,
63 help="Source cluster username")
65 SRC_PASSWORD_FILE_OPT = \
66 cli.cli_option("--src-password-file", action="store", type="string",
67 dest="src_password_file",
68 help="File containing source cluster password")
70 DEST_RAPI_PORT_OPT = \
71 cli.cli_option("--dest-rapi-port", action="store", type="int",
72 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73 help=("Destination cluster RAPI port (defaults to source"
74 " cluster RAPI port)"))
77 cli.cli_option("--dest-ca-file", action="store", type="string",
79 help=("File containing destination cluster Certificate"
80 " Authority (CA) in PEM format (defaults to source"
84 cli.cli_option("--dest-username", action="store", type="string",
85 dest="dest_username", default=None,
86 help=("Destination cluster username (defaults to"
87 " source cluster username)"))
89 DEST_PASSWORD_FILE_OPT = \
90 cli.cli_option("--dest-password-file", action="store", type="string",
91 dest="dest_password_file",
92 help=("File containing destination cluster password"
93 " (defaults to source cluster password)"))
95 DEST_INSTANCE_NAME_OPT = \
96 cli.cli_option("--dest-instance-name", action="store", type="string",
97 dest="dest_instance_name",
98 help=("Instance name on destination cluster (only"
99 " when moving exactly one instance)"))
101 DEST_PRIMARY_NODE_OPT = \
102 cli.cli_option("--dest-primary-node", action="store", type="string",
103 dest="dest_primary_node",
104 help=("Primary node on destination cluster (only"
105 " when moving exactly one instance)"))
107 DEST_SECONDARY_NODE_OPT = \
108 cli.cli_option("--dest-secondary-node", action="store", type="string",
109 dest="dest_secondary_node",
110 help=("Secondary node on destination cluster (only"
111 " when moving exactly one instance)"))
114 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
115 dest="parallel", metavar="<number>",
116 help="Number of instances to be moved simultaneously")
119 class Error(Exception):
126 """Special exception for aborting import/export.
131 class RapiClientFactory:
132 """Factory class for creating RAPI clients.
134 @ivar src_cluster_name: Source cluster name
135 @ivar dest_cluster_name: Destination cluster name
136 @ivar GetSourceClient: Callable returning new client for source cluster
137 @ivar GetDestClient: Callable returning new client for destination cluster
140 def __init__(self, options, src_cluster_name, dest_cluster_name):
141 """Initializes this class.
143 @param options: Program options
144 @type src_cluster_name: string
145 @param src_cluster_name: Source cluster name
146 @type dest_cluster_name: string
147 @param dest_cluster_name: Destination cluster name
150 self.src_cluster_name = src_cluster_name
151 self.dest_cluster_name = dest_cluster_name
153 # TODO: Implement timeouts for RAPI connections
154 # TODO: Support for using system default paths for verifying SSL certificate
155 logging.debug("Using '%s' as source CA", options.src_ca_file)
156 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
158 if options.dest_ca_file:
159 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
161 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
163 logging.debug("Using source CA for destination")
164 dest_curl_config = src_curl_config
166 logging.debug("Source RAPI server is %s:%s",
167 src_cluster_name, options.src_rapi_port)
168 logging.debug("Source username is '%s'", options.src_username)
170 if options.src_username is None:
173 src_username = options.src_username
175 if options.src_password_file:
176 logging.debug("Reading '%s' for source password",
177 options.src_password_file)
178 src_password = utils.ReadOneLineFile(options.src_password_file,
181 logging.debug("Source has no password")
184 self.GetSourceClient = lambda: \
185 rapi.client.GanetiRapiClient(src_cluster_name,
186 port=options.src_rapi_port,
187 curl_config_fn=src_curl_config,
188 username=src_username,
189 password=src_password)
191 if options.dest_rapi_port:
192 dest_rapi_port = options.dest_rapi_port
194 dest_rapi_port = options.src_rapi_port
196 if options.dest_username is None:
197 dest_username = src_username
199 dest_username = options.dest_username
201 logging.debug("Destination RAPI server is %s:%s",
202 dest_cluster_name, dest_rapi_port)
203 logging.debug("Destination username is '%s'", dest_username)
205 if options.dest_password_file:
206 logging.debug("Reading '%s' for destination password",
207 options.dest_password_file)
208 dest_password = utils.ReadOneLineFile(options.dest_password_file,
211 logging.debug("Using source password for destination")
212 dest_password = src_password
214 self.GetDestClient = lambda: \
215 rapi.client.GanetiRapiClient(dest_cluster_name,
217 curl_config_fn=dest_curl_config,
218 username=dest_username,
219 password=dest_password)
222 class MoveJobPollReportCb(cli.JobPollReportCbBase):
223 def __init__(self, abort_check_fn, remote_import_fn):
224 """Initializes this class.
226 @type abort_check_fn: callable
227 @param abort_check_fn: Function to check whether move is aborted
228 @type remote_import_fn: callable or None
229 @param remote_import_fn: Callback for reporting received remote import
233 cli.JobPollReportCbBase.__init__(self)
234 self._abort_check_fn = abort_check_fn
235 self._remote_import_fn = remote_import_fn
237 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
238 """Handles a log message.
241 if log_type == constants.ELOG_REMOTE_IMPORT:
242 logging.debug("Received remote import information")
244 if not self._remote_import_fn:
245 raise RuntimeError("Received unexpected remote import information")
247 assert "x509_ca" in log_msg
248 assert "disks" in log_msg
250 self._remote_import_fn(log_msg)
254 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
255 cli.FormatLogMessage(log_type, log_msg))
257 def ReportNotChanged(self, job_id, status):
258 """Called if a job hasn't changed in a while.
262 # Check whether we were told to abort by the other thread
263 self._abort_check_fn()
265 logging.warning("Aborting despite job %s still running", job_id)
269 class InstanceMove(object):
270 """Status class for instance moves.
273 def __init__(self, src_instance_name, dest_instance_name,
274 dest_pnode, dest_snode, dest_iallocator,
275 hvparams, beparams, osparams, nics):
276 """Initializes this class.
278 @type src_instance_name: string
279 @param src_instance_name: Instance name on source cluster
280 @type dest_instance_name: string
281 @param dest_instance_name: Instance name on destination cluster
282 @type dest_pnode: string or None
283 @param dest_pnode: Name of primary node on destination cluster
284 @type dest_snode: string or None
285 @param dest_snode: Name of secondary node on destination cluster
286 @type dest_iallocator: string or None
287 @param dest_iallocator: Name of iallocator to use
288 @type hvparams: dict or None
289 @param hvparams: Hypervisor parameters to override
290 @type beparams: dict or None
291 @param beparams: Backend parameters to override
292 @type osparams: dict or None
293 @param osparams: OS parameters to override
294 @type nics: dict or None
295 @param nics: NICs to override
298 self.src_instance_name = src_instance_name
299 self.dest_instance_name = dest_instance_name
300 self.dest_pnode = dest_pnode
301 self.dest_snode = dest_snode
302 self.dest_iallocator = dest_iallocator
303 self.hvparams = hvparams
304 self.beparams = beparams
305 self.osparams = osparams
308 self.error_message = None
311 class MoveRuntime(object):
312 """Class to keep track of instance move.
315 def __init__(self, move):
316 """Initializes this class.
318 @type move: L{InstanceMove}
323 # Thread synchronization
324 self.lock = threading.Lock()
325 self.source_to_dest = threading.Condition(self.lock)
326 self.dest_to_source = threading.Condition(self.lock)
329 self.src_error_message = None
330 self.src_expinfo = None
331 self.src_instinfo = None
333 # Destination information
334 self.dest_error_message = None
335 self.dest_impinfo = None
337 def HandleErrors(self, prefix, fn, *args):
338 """Wrapper to catch errors and abort threads.
341 @param prefix: Variable name prefix ("src" or "dest")
346 assert prefix in ("dest", "src")
349 # Call inner function
355 except Exception, err:
356 logging.exception("Caught unhandled exception")
359 setattr(self, "%s_error_message" % prefix, errmsg)
363 self.source_to_dest.notifyAll()
364 self.dest_to_source.notifyAll()
368 def CheckAbort(self):
369 """Check whether thread should be aborted.
371 @raise Abort: When thread should be aborted
374 if not (self.src_error_message is None and
375 self.dest_error_message is None):
376 logging.info("Aborting")
379 def Wait(self, cond, check_fn):
380 """Waits for a condition to become true.
382 @type cond: threading.Condition
383 @param cond: Threading condition
384 @type check_fn: callable
385 @param check_fn: Function to check whether condition is true
390 while check_fn(self):
396 def PollJob(self, cl, job_id, remote_import_fn=None):
397 """Wrapper for polling a job.
399 @type cl: L{rapi.client.GanetiRapiClient}
400 @param cl: RAPI client
402 @param job_id: Job ID
403 @type remote_import_fn: callable or None
404 @param remote_import_fn: Callback for reporting received remote import
408 return rapi.client_utils.PollJob(cl, job_id,
409 MoveJobPollReportCb(self.CheckAbort,
413 class MoveDestExecutor(object):
414 def __init__(self, dest_client, mrt):
415 """Destination side of an instance move.
417 @type dest_client: L{rapi.client.GanetiRapiClient}
418 @param dest_client: RAPI client
419 @type mrt: L{MoveRuntime}
420 @param mrt: Instance move runtime information
423 logging.debug("Waiting for instance information to become available")
424 mrt.Wait(mrt.source_to_dest,
425 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
427 logging.info("Creating instance %s in remote-import mode",
428 mrt.move.dest_instance_name)
429 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
430 mrt.move.dest_pnode, mrt.move.dest_snode,
431 mrt.move.dest_iallocator,
432 mrt.src_instinfo, mrt.src_expinfo,
433 mrt.move.hvparams, mrt.move.beparams,
434 mrt.move.beparams, mrt.move.nics)
435 mrt.PollJob(dest_client, job_id,
436 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
438 logging.info("Import successful")
441 def _SetImportInfo(mrt, impinfo):
442 """Sets the remote import information and notifies source thread.
444 @type mrt: L{MoveRuntime}
445 @param mrt: Instance move runtime information
446 @param impinfo: Remote import information
449 mrt.dest_to_source.acquire()
451 mrt.dest_impinfo = impinfo
452 mrt.dest_to_source.notifyAll()
454 mrt.dest_to_source.release()
457 def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
458 override_hvparams, override_beparams, override_osparams,
460 """Starts the instance creation in remote import mode.
462 @type cl: L{rapi.client.GanetiRapiClient}
463 @param cl: RAPI client
465 @param name: Instance name
466 @type pnode: string or None
467 @param pnode: Name of primary node on destination cluster
468 @type snode: string or None
469 @param snode: Name of secondary node on destination cluster
470 @type iallocator: string or None
471 @param iallocator: Name of iallocator to use
473 @param instance: Instance details from source cluster
475 @param expinfo: Prepared export information from source cluster
476 @type override_hvparams: dict or None
477 @param override_hvparams: Hypervisor parameters to override
478 @type override_beparams: dict or None
479 @param override_beparams: Backend parameters to override
480 @type override_osparams: dict or None
481 @param override_osparams: OS parameters to override
482 @type override_nics: dict or None
483 @param override_nics: NICs to override
487 disk_template = instance["disk_template"]
490 constants.IDISK_SIZE: i["size"],
491 constants.IDISK_MODE: i["mode"],
492 } for i in instance["disks"]]
495 constants.INIC_IP: ip,
496 constants.INIC_MAC: mac,
497 constants.INIC_MODE: mode,
498 constants.INIC_LINK: link,
499 constants.INIC_NETWORK: network
500 } for ip, mac, mode, link, network, _ in instance["nics"]]
502 if len(override_nics) > len(nics):
503 raise Error("Can not create new NICs")
506 assert len(override_nics) <= len(nics)
507 for idx, (nic, override) in enumerate(zip(nics, override_nics)):
508 nics[idx] = objects.FillDict(nic, override)
510 # TODO: Should this be the actual up/down status? (run_state)
511 start = (instance["config_state"] == "up")
513 assert len(disks) == len(instance["disks"])
514 assert len(nics) == len(instance["nics"])
516 inst_beparams = instance["be_instance"]
517 if not inst_beparams:
520 inst_hvparams = instance["hv_instance"]
521 if not inst_hvparams:
524 inst_osparams = instance["os_instance"]
525 if not inst_osparams:
528 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
529 name, disk_template, disks, nics,
535 iallocator=iallocator,
536 hypervisor=instance["hypervisor"],
537 source_handshake=expinfo["handshake"],
538 source_x509_ca=expinfo["x509_ca"],
539 source_instance_name=instance["name"],
540 beparams=objects.FillDict(inst_beparams,
542 hvparams=objects.FillDict(inst_hvparams,
544 osparams=objects.FillDict(inst_osparams,
548 class MoveSourceExecutor(object):
549 def __init__(self, src_client, mrt):
550 """Source side of an instance move.
552 @type src_client: L{rapi.client.GanetiRapiClient}
553 @param src_client: RAPI client
554 @type mrt: L{MoveRuntime}
555 @param mrt: Instance move runtime information
558 logging.info("Checking whether instance exists")
559 self._CheckInstance(src_client, mrt.move.src_instance_name)
561 logging.info("Retrieving instance information from source cluster")
562 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
563 mrt.move.src_instance_name)
564 if instinfo["disk_template"] == constants.DT_FILE:
565 raise Error("Inter-cluster move of file-based instances is not"
568 logging.info("Preparing export on source cluster")
569 expinfo = self._PrepareExport(src_client, mrt.PollJob,
570 mrt.move.src_instance_name)
571 assert "handshake" in expinfo
572 assert "x509_key_name" in expinfo
573 assert "x509_ca" in expinfo
575 # Hand information to destination thread
576 mrt.source_to_dest.acquire()
578 mrt.src_instinfo = instinfo
579 mrt.src_expinfo = expinfo
580 mrt.source_to_dest.notifyAll()
582 mrt.source_to_dest.release()
584 logging.info("Waiting for destination information to become available")
585 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
587 logging.info("Starting remote export on source cluster")
588 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
589 expinfo["x509_key_name"], mrt.dest_impinfo)
591 logging.info("Export successful")
594 def _CheckInstance(cl, name):
595 """Checks whether the instance exists on the source cluster.
597 @type cl: L{rapi.client.GanetiRapiClient}
598 @param cl: RAPI client
600 @param name: Instance name
605 except rapi.client.GanetiApiError, err:
606 if err.code == rapi.client.HTTP_NOT_FOUND:
607 raise Error("Instance %s not found (%s)" % (name, str(err)))
611 def _GetInstanceInfo(cl, poll_job_fn, name):
612 """Retrieves detailed instance information from source cluster.
614 @type cl: L{rapi.client.GanetiRapiClient}
615 @param cl: RAPI client
616 @type poll_job_fn: callable
617 @param poll_job_fn: Function to poll for job result
619 @param name: Instance name
622 job_id = cl.GetInstanceInfo(name, static=True)
623 result = poll_job_fn(cl, job_id)
624 assert len(result[0].keys()) == 1
625 return result[0][result[0].keys()[0]]
628 def _PrepareExport(cl, poll_job_fn, name):
629 """Prepares export on source cluster.
631 @type cl: L{rapi.client.GanetiRapiClient}
632 @param cl: RAPI client
633 @type poll_job_fn: callable
634 @param poll_job_fn: Function to poll for job result
636 @param name: Instance name
639 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
640 return poll_job_fn(cl, job_id)[0]
643 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
644 """Exports instance from source cluster.
646 @type cl: L{rapi.client.GanetiRapiClient}
647 @param cl: RAPI client
648 @type poll_job_fn: callable
649 @param poll_job_fn: Function to poll for job result
651 @param name: Instance name
652 @param x509_key_name: Source X509 key
653 @param impinfo: Import information from destination cluster
656 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
657 impinfo["disks"], shutdown=True,
658 remove_instance=True,
659 x509_key_name=x509_key_name,
660 destination_x509_ca=impinfo["x509_ca"])
661 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
663 if not (fin_resu and compat.all(dresults)):
664 raise Error("Export failed for disks %s" %
665 utils.CommaJoin(str(idx) for idx, result
666 in enumerate(dresults) if not result))
669 class MoveSourceWorker(workerpool.BaseWorker):
670 def RunTask(self, rapi_factory, move): # pylint: disable=W0221
671 """Executes an instance move.
673 @type rapi_factory: L{RapiClientFactory}
674 @param rapi_factory: RAPI client factory
675 @type move: L{InstanceMove}
676 @param move: Instance move information
680 logging.info("Preparing to move %s from cluster %s to %s as %s",
681 move.src_instance_name, rapi_factory.src_cluster_name,
682 rapi_factory.dest_cluster_name, move.dest_instance_name)
684 mrt = MoveRuntime(move)
686 logging.debug("Starting destination thread")
687 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
688 target=mrt.HandleErrors,
689 args=("dest", MoveDestExecutor,
690 rapi_factory.GetDestClient(),
694 mrt.HandleErrors("src", MoveSourceExecutor,
695 rapi_factory.GetSourceClient(), mrt)
699 if mrt.src_error_message or mrt.dest_error_message:
700 move.error_message = ("Source error: %s, destination error: %s" %
701 (mrt.src_error_message, mrt.dest_error_message))
703 move.error_message = None
704 except Exception, err: # pylint: disable=W0703
705 logging.exception("Caught unhandled exception")
706 move.error_message = str(err)
709 def CheckRapiSetup(rapi_factory):
710 """Checks the RAPI setup by retrieving the version.
712 @type rapi_factory: L{RapiClientFactory}
713 @param rapi_factory: RAPI client factory
716 src_client = rapi_factory.GetSourceClient()
717 logging.info("Connecting to source RAPI server")
718 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
720 dest_client = rapi_factory.GetDestClient()
721 logging.info("Connecting to destination RAPI server")
722 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
726 """Parses options passed to program.
729 program = os.path.basename(sys.argv[0])
731 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
732 " <source-cluster> <dest-cluster>"
735 parser.add_option(cli.DEBUG_OPT)
736 parser.add_option(cli.VERBOSE_OPT)
737 parser.add_option(cli.IALLOCATOR_OPT)
738 parser.add_option(cli.BACKEND_OPT)
739 parser.add_option(cli.HVOPTS_OPT)
740 parser.add_option(cli.OSPARAMS_OPT)
741 parser.add_option(cli.NET_OPT)
742 parser.add_option(SRC_RAPI_PORT_OPT)
743 parser.add_option(SRC_CA_FILE_OPT)
744 parser.add_option(SRC_USERNAME_OPT)
745 parser.add_option(SRC_PASSWORD_FILE_OPT)
746 parser.add_option(DEST_RAPI_PORT_OPT)
747 parser.add_option(DEST_CA_FILE_OPT)
748 parser.add_option(DEST_USERNAME_OPT)
749 parser.add_option(DEST_PASSWORD_FILE_OPT)
750 parser.add_option(DEST_INSTANCE_NAME_OPT)
751 parser.add_option(DEST_PRIMARY_NODE_OPT)
752 parser.add_option(DEST_SECONDARY_NODE_OPT)
753 parser.add_option(PARALLEL_OPT)
755 (options, args) = parser.parse_args()
757 return (parser, options, args)
760 def CheckOptions(parser, options, args):
761 """Checks options and arguments for validity.
765 parser.error("Not enough arguments")
767 src_cluster_name = args.pop(0)
768 dest_cluster_name = args.pop(0)
769 instance_names = args
771 assert len(instance_names) > 0
773 # TODO: Remove once using system default paths for SSL certificate
774 # verification is implemented
775 if not options.src_ca_file:
776 parser.error("Missing source cluster CA file")
778 if options.parallel < 1:
779 parser.error("Number of simultaneous moves must be >= 1")
781 if not (bool(options.iallocator) ^
782 bool(options.dest_primary_node or options.dest_secondary_node)):
783 parser.error("Destination node and iallocator options exclude each other")
785 if len(instance_names) == 1:
786 # Moving one instance only
787 if not (options.iallocator or
788 options.dest_primary_node or
789 options.dest_secondary_node):
790 parser.error("An iallocator or the destination node is required")
793 utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
796 utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
799 options.nics = cli.ParseNicOption(options.nics)
801 # Moving more than one instance
802 if (options.dest_instance_name or options.dest_primary_node or
803 options.dest_secondary_node or options.hvparams or
804 options.beparams or options.osparams or options.nics):
805 parser.error("The options --dest-instance-name, --dest-primary-node,"
806 " --dest-secondary-node, --hypervisor-parameters,"
807 " --backend-parameters, --os-parameters and --net can"
808 " only be used when moving exactly one instance")
810 if not options.iallocator:
811 parser.error("An iallocator must be specified for moving more than one"
814 return (src_cluster_name, dest_cluster_name, instance_names)
822 (parser, options, args) = ParseOptions()
824 utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
826 (src_cluster_name, dest_cluster_name, instance_names) = \
827 CheckOptions(parser, options, args)
829 logging.info("Source cluster: %s", src_cluster_name)
830 logging.info("Destination cluster: %s", dest_cluster_name)
831 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
833 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
835 CheckRapiSetup(rapi_factory)
837 assert (len(instance_names) == 1 or
838 not (options.dest_primary_node or options.dest_secondary_node))
839 assert len(instance_names) == 1 or options.iallocator
840 assert (len(instance_names) > 1 or options.iallocator or
841 options.dest_primary_node or options.dest_secondary_node)
842 assert (len(instance_names) == 1 or
843 not (options.hvparams or options.beparams or options.osparams or
846 # Prepare list of instance moves
848 for src_instance_name in instance_names:
849 if options.dest_instance_name:
850 assert len(instance_names) == 1
852 dest_instance_name = options.dest_instance_name
854 dest_instance_name = src_instance_name
856 moves.append(InstanceMove(src_instance_name, dest_instance_name,
857 options.dest_primary_node,
858 options.dest_secondary_node,
859 options.iallocator, options.hvparams,
860 options.beparams, options.osparams,
863 assert len(moves) == len(instance_names)
866 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
868 # Add instance moves to workerpool
870 wp.AddTask((rapi_factory, move))
872 # Wait for all moves to finish
876 wp.TerminateWorkers()
878 # There should be no threads running at this point, hence not using locks
881 logging.info("Instance move results:")
884 if move.dest_instance_name == move.src_instance_name:
885 name = move.src_instance_name
887 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
889 if move.error_message:
890 msg = "Failed (%s)" % move.error_message
894 logging.info("%s: %s", name, msg)
896 if compat.any(move.error_message for move in moves):
897 sys.exit(constants.EXIT_FAILURE)
899 sys.exit(constants.EXIT_SUCCESS)
902 if __name__ == "__main__":