4 # Copyright (C) 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to move instances from one cluster to another.
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
49 cli.cli_option("--src-rapi-port", action="store", type="int",
50 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51 help=("Source cluster RAPI port (defaults to %s)" %
52 constants.DEFAULT_RAPI_PORT))
55 cli.cli_option("--src-ca-file", action="store", type="string",
57 help=("File containing source cluster Certificate"
58 " Authority (CA) in PEM format"))
61 cli.cli_option("--src-username", action="store", type="string",
62 dest="src_username", default=None,
63 help="Source cluster username")
65 SRC_PASSWORD_FILE_OPT = \
66 cli.cli_option("--src-password-file", action="store", type="string",
67 dest="src_password_file",
68 help="File containing source cluster password")
70 DEST_RAPI_PORT_OPT = \
71 cli.cli_option("--dest-rapi-port", action="store", type="int",
72 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73 help=("Destination cluster RAPI port (defaults to source"
74 " cluster RAPI port)"))
77 cli.cli_option("--dest-ca-file", action="store", type="string",
79 help=("File containing destination cluster Certificate"
80 " Authority (CA) in PEM format (defaults to source"
84 cli.cli_option("--dest-username", action="store", type="string",
85 dest="dest_username", default=None,
86 help=("Destination cluster username (defaults to"
87 " source cluster username)"))
89 DEST_PASSWORD_FILE_OPT = \
90 cli.cli_option("--dest-password-file", action="store", type="string",
91 dest="dest_password_file",
92 help=("File containing destination cluster password"
93 " (defaults to source cluster password)"))
95 DEST_INSTANCE_NAME_OPT = \
96 cli.cli_option("--dest-instance-name", action="store", type="string",
97 dest="dest_instance_name",
98 help=("Instance name on destination cluster (only"
99 " when moving exactly one instance)"))
101 DEST_PRIMARY_NODE_OPT = \
102 cli.cli_option("--dest-primary-node", action="store", type="string",
103 dest="dest_primary_node",
104 help=("Primary node on destination cluster (only"
105 " when moving exactly one instance)"))
107 DEST_SECONDARY_NODE_OPT = \
108 cli.cli_option("--dest-secondary-node", action="store", type="string",
109 dest="dest_secondary_node",
110 help=("Secondary node on destination cluster (only"
111 " when moving exactly one instance)"))
113 DEST_DISK_TEMPLATE_OPT = \
114 cli.cli_option("--dest-disk-template", action="store", type="string",
115 dest="dest_disk_template", default=None,
116 help="Disk template to use on destination cluster")
119 cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
120 dest="parallel", metavar="<number>",
121 help="Number of instances to be moved simultaneously")
124 class Error(Exception):
131 """Special exception for aborting import/export.
136 class RapiClientFactory:
137 """Factory class for creating RAPI clients.
139 @ivar src_cluster_name: Source cluster name
140 @ivar dest_cluster_name: Destination cluster name
141 @ivar GetSourceClient: Callable returning new client for source cluster
142 @ivar GetDestClient: Callable returning new client for destination cluster
145 def __init__(self, options, src_cluster_name, dest_cluster_name):
146 """Initializes this class.
148 @param options: Program options
149 @type src_cluster_name: string
150 @param src_cluster_name: Source cluster name
151 @type dest_cluster_name: string
152 @param dest_cluster_name: Destination cluster name
155 self.src_cluster_name = src_cluster_name
156 self.dest_cluster_name = dest_cluster_name
158 # TODO: Implement timeouts for RAPI connections
159 # TODO: Support for using system default paths for verifying SSL certificate
160 logging.debug("Using '%s' as source CA", options.src_ca_file)
161 src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
163 if options.dest_ca_file:
164 logging.debug("Using '%s' as destination CA", options.dest_ca_file)
166 rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
168 logging.debug("Using source CA for destination")
169 dest_curl_config = src_curl_config
171 logging.debug("Source RAPI server is %s:%s",
172 src_cluster_name, options.src_rapi_port)
173 logging.debug("Source username is '%s'", options.src_username)
175 if options.src_username is None:
178 src_username = options.src_username
180 if options.src_password_file:
181 logging.debug("Reading '%s' for source password",
182 options.src_password_file)
183 src_password = utils.ReadOneLineFile(options.src_password_file,
186 logging.debug("Source has no password")
189 self.GetSourceClient = lambda: \
190 rapi.client.GanetiRapiClient(src_cluster_name,
191 port=options.src_rapi_port,
192 curl_config_fn=src_curl_config,
193 username=src_username,
194 password=src_password)
196 if options.dest_rapi_port:
197 dest_rapi_port = options.dest_rapi_port
199 dest_rapi_port = options.src_rapi_port
201 if options.dest_username is None:
202 dest_username = src_username
204 dest_username = options.dest_username
206 logging.debug("Destination RAPI server is %s:%s",
207 dest_cluster_name, dest_rapi_port)
208 logging.debug("Destination username is '%s'", dest_username)
210 if options.dest_password_file:
211 logging.debug("Reading '%s' for destination password",
212 options.dest_password_file)
213 dest_password = utils.ReadOneLineFile(options.dest_password_file,
216 logging.debug("Using source password for destination")
217 dest_password = src_password
219 self.GetDestClient = lambda: \
220 rapi.client.GanetiRapiClient(dest_cluster_name,
222 curl_config_fn=dest_curl_config,
223 username=dest_username,
224 password=dest_password)
227 class MoveJobPollReportCb(cli.JobPollReportCbBase):
228 def __init__(self, abort_check_fn, remote_import_fn):
229 """Initializes this class.
231 @type abort_check_fn: callable
232 @param abort_check_fn: Function to check whether move is aborted
233 @type remote_import_fn: callable or None
234 @param remote_import_fn: Callback for reporting received remote import
238 cli.JobPollReportCbBase.__init__(self)
239 self._abort_check_fn = abort_check_fn
240 self._remote_import_fn = remote_import_fn
242 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
243 """Handles a log message.
246 if log_type == constants.ELOG_REMOTE_IMPORT:
247 logging.debug("Received remote import information")
249 if not self._remote_import_fn:
250 raise RuntimeError("Received unexpected remote import information")
252 assert "x509_ca" in log_msg
253 assert "disks" in log_msg
255 self._remote_import_fn(log_msg)
259 logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
260 cli.FormatLogMessage(log_type, log_msg))
262 def ReportNotChanged(self, job_id, status):
263 """Called if a job hasn't changed in a while.
267 # Check whether we were told to abort by the other thread
268 self._abort_check_fn()
270 logging.warning("Aborting despite job %s still running", job_id)
274 class InstanceMove(object):
275 """Status class for instance moves.
278 def __init__(self, src_instance_name, dest_instance_name,
279 dest_pnode, dest_snode, dest_iallocator,
280 dest_disk_template, hvparams,
281 beparams, osparams, nics):
282 """Initializes this class.
284 @type src_instance_name: string
285 @param src_instance_name: Instance name on source cluster
286 @type dest_instance_name: string
287 @param dest_instance_name: Instance name on destination cluster
288 @type dest_pnode: string or None
289 @param dest_pnode: Name of primary node on destination cluster
290 @type dest_snode: string or None
291 @param dest_snode: Name of secondary node on destination cluster
292 @type dest_iallocator: string or None
293 @param dest_iallocator: Name of iallocator to use
294 @type dest_disk_template: string or None
295 @param dest_disk_template: Disk template to use instead of the original one
296 @type hvparams: dict or None
297 @param hvparams: Hypervisor parameters to override
298 @type beparams: dict or None
299 @param beparams: Backend parameters to override
300 @type osparams: dict or None
301 @param osparams: OS parameters to override
302 @type nics: dict or None
303 @param nics: NICs to override
306 self.src_instance_name = src_instance_name
307 self.dest_instance_name = dest_instance_name
308 self.dest_pnode = dest_pnode
309 self.dest_snode = dest_snode
310 self.dest_iallocator = dest_iallocator
311 self.dest_disk_template = dest_disk_template
312 self.hvparams = hvparams
313 self.beparams = beparams
314 self.osparams = osparams
317 self.error_message = None
320 class MoveRuntime(object):
321 """Class to keep track of instance move.
324 def __init__(self, move):
325 """Initializes this class.
327 @type move: L{InstanceMove}
332 # Thread synchronization
333 self.lock = threading.Lock()
334 self.source_to_dest = threading.Condition(self.lock)
335 self.dest_to_source = threading.Condition(self.lock)
338 self.src_error_message = None
339 self.src_expinfo = None
340 self.src_instinfo = None
342 # Destination information
343 self.dest_error_message = None
344 self.dest_impinfo = None
346 def HandleErrors(self, prefix, fn, *args):
347 """Wrapper to catch errors and abort threads.
350 @param prefix: Variable name prefix ("src" or "dest")
355 assert prefix in ("dest", "src")
358 # Call inner function
364 except Exception, err:
365 logging.exception("Caught unhandled exception")
368 setattr(self, "%s_error_message" % prefix, errmsg)
372 self.source_to_dest.notifyAll()
373 self.dest_to_source.notifyAll()
377 def CheckAbort(self):
378 """Check whether thread should be aborted.
380 @raise Abort: When thread should be aborted
383 if not (self.src_error_message is None and
384 self.dest_error_message is None):
385 logging.info("Aborting")
388 def Wait(self, cond, check_fn):
389 """Waits for a condition to become true.
391 @type cond: threading.Condition
392 @param cond: Threading condition
393 @type check_fn: callable
394 @param check_fn: Function to check whether condition is true
399 while check_fn(self):
405 def PollJob(self, cl, job_id, remote_import_fn=None):
406 """Wrapper for polling a job.
408 @type cl: L{rapi.client.GanetiRapiClient}
409 @param cl: RAPI client
411 @param job_id: Job ID
412 @type remote_import_fn: callable or None
413 @param remote_import_fn: Callback for reporting received remote import
417 return rapi.client_utils.PollJob(cl, job_id,
418 MoveJobPollReportCb(self.CheckAbort,
422 class MoveDestExecutor(object):
423 def __init__(self, dest_client, mrt):
424 """Destination side of an instance move.
426 @type dest_client: L{rapi.client.GanetiRapiClient}
427 @param dest_client: RAPI client
428 @type mrt: L{MoveRuntime}
429 @param mrt: Instance move runtime information
432 logging.debug("Waiting for instance information to become available")
433 mrt.Wait(mrt.source_to_dest,
434 lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
436 logging.info("Creating instance %s in remote-import mode",
437 mrt.move.dest_instance_name)
438 job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
439 mrt.move.dest_pnode, mrt.move.dest_snode,
440 mrt.move.dest_iallocator,
441 mrt.move.dest_disk_template,
442 mrt.src_instinfo, mrt.src_expinfo,
443 mrt.move.hvparams, mrt.move.beparams,
444 mrt.move.beparams, mrt.move.nics)
445 mrt.PollJob(dest_client, job_id,
446 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
448 logging.info("Import successful")
451 def _SetImportInfo(mrt, impinfo):
452 """Sets the remote import information and notifies source thread.
454 @type mrt: L{MoveRuntime}
455 @param mrt: Instance move runtime information
456 @param impinfo: Remote import information
459 mrt.dest_to_source.acquire()
461 mrt.dest_impinfo = impinfo
462 mrt.dest_to_source.notifyAll()
464 mrt.dest_to_source.release()
467 def _CreateInstance(cl, name, pnode, snode, iallocator, dest_disk_template,
468 instance, expinfo, override_hvparams, override_beparams,
469 override_osparams, override_nics):
470 """Starts the instance creation in remote import mode.
472 @type cl: L{rapi.client.GanetiRapiClient}
473 @param cl: RAPI client
475 @param name: Instance name
476 @type pnode: string or None
477 @param pnode: Name of primary node on destination cluster
478 @type snode: string or None
479 @param snode: Name of secondary node on destination cluster
480 @type iallocator: string or None
481 @param iallocator: Name of iallocator to use
482 @type dest_disk_template: string or None
483 @param dest_disk_template: Disk template to use instead of the original one
485 @param instance: Instance details from source cluster
487 @param expinfo: Prepared export information from source cluster
488 @type override_hvparams: dict or None
489 @param override_hvparams: Hypervisor parameters to override
490 @type override_beparams: dict or None
491 @param override_beparams: Backend parameters to override
492 @type override_osparams: dict or None
493 @param override_osparams: OS parameters to override
494 @type override_nics: dict or None
495 @param override_nics: NICs to override
499 if dest_disk_template:
500 disk_template = dest_disk_template
502 disk_template = instance["disk_template"]
505 for idisk in instance["disks"]:
507 constants.IDISK_SIZE: idisk["size"],
508 constants.IDISK_MODE: idisk["mode"],
509 constants.IDISK_NAME: str(idisk.get("name")),
511 spindles = idisk.get("spindles")
512 if spindles is not None:
513 odisk[constants.IDISK_SPINDLES] = spindles
517 constants.INIC_IP: ip,
518 constants.INIC_MAC: mac,
519 constants.INIC_MODE: mode,
520 constants.INIC_LINK: link,
521 constants.INIC_NAME: vlan,
522 constants.INIC_NETWORK: network,
523 constants.INIC_NAME: nic_name
524 } for nic_name, _, ip, mac, mode, link, vlan, network, _
527 if len(override_nics) > len(nics):
528 raise Error("Can not create new NICs")
531 assert len(override_nics) <= len(nics)
532 for idx, (nic, override) in enumerate(zip(nics, override_nics)):
533 nics[idx] = objects.FillDict(nic, override)
535 # TODO: Should this be the actual up/down status? (run_state)
536 start = (instance["config_state"] == "up")
538 assert len(disks) == len(instance["disks"])
539 assert len(nics) == len(instance["nics"])
541 inst_beparams = instance["be_instance"]
542 if not inst_beparams:
545 inst_hvparams = instance["hv_instance"]
546 if not inst_hvparams:
549 inst_osparams = instance["os_instance"]
550 if not inst_osparams:
553 return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
554 name, disk_template, disks, nics,
560 iallocator=iallocator,
561 hypervisor=instance["hypervisor"],
562 source_handshake=expinfo["handshake"],
563 source_x509_ca=expinfo["x509_ca"],
564 source_instance_name=instance["name"],
565 beparams=objects.FillDict(inst_beparams,
567 hvparams=objects.FillDict(inst_hvparams,
569 osparams=objects.FillDict(inst_osparams,
573 class MoveSourceExecutor(object):
574 def __init__(self, src_client, mrt):
575 """Source side of an instance move.
577 @type src_client: L{rapi.client.GanetiRapiClient}
578 @param src_client: RAPI client
579 @type mrt: L{MoveRuntime}
580 @param mrt: Instance move runtime information
583 logging.info("Checking whether instance exists")
584 self._CheckInstance(src_client, mrt.move.src_instance_name)
586 logging.info("Retrieving instance information from source cluster")
587 instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
588 mrt.move.src_instance_name)
589 if (instinfo["disk_template"] in
590 [constants.DT_FILE, constants.DT_SHARED_FILE]):
591 raise Error("Inter-cluster move of file-based instances is not"
594 logging.info("Preparing export on source cluster")
595 expinfo = self._PrepareExport(src_client, mrt.PollJob,
596 mrt.move.src_instance_name)
597 assert "handshake" in expinfo
598 assert "x509_key_name" in expinfo
599 assert "x509_ca" in expinfo
601 # Hand information to destination thread
602 mrt.source_to_dest.acquire()
604 mrt.src_instinfo = instinfo
605 mrt.src_expinfo = expinfo
606 mrt.source_to_dest.notifyAll()
608 mrt.source_to_dest.release()
610 logging.info("Waiting for destination information to become available")
611 mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
613 logging.info("Starting remote export on source cluster")
614 self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
615 expinfo["x509_key_name"], mrt.dest_impinfo)
617 logging.info("Export successful")
620 def _CheckInstance(cl, name):
621 """Checks whether the instance exists on the source cluster.
623 @type cl: L{rapi.client.GanetiRapiClient}
624 @param cl: RAPI client
626 @param name: Instance name
631 except rapi.client.GanetiApiError, err:
632 if err.code == rapi.client.HTTP_NOT_FOUND:
633 raise Error("Instance %s not found (%s)" % (name, str(err)))
637 def _GetInstanceInfo(cl, poll_job_fn, name):
638 """Retrieves detailed instance information from source cluster.
640 @type cl: L{rapi.client.GanetiRapiClient}
641 @param cl: RAPI client
642 @type poll_job_fn: callable
643 @param poll_job_fn: Function to poll for job result
645 @param name: Instance name
648 job_id = cl.GetInstanceInfo(name, static=True)
649 result = poll_job_fn(cl, job_id)
650 assert len(result[0].keys()) == 1
651 return result[0][result[0].keys()[0]]
654 def _PrepareExport(cl, poll_job_fn, name):
655 """Prepares export on source cluster.
657 @type cl: L{rapi.client.GanetiRapiClient}
658 @param cl: RAPI client
659 @type poll_job_fn: callable
660 @param poll_job_fn: Function to poll for job result
662 @param name: Instance name
665 job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
666 return poll_job_fn(cl, job_id)[0]
669 def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
670 """Exports instance from source cluster.
672 @type cl: L{rapi.client.GanetiRapiClient}
673 @param cl: RAPI client
674 @type poll_job_fn: callable
675 @param poll_job_fn: Function to poll for job result
677 @param name: Instance name
678 @param x509_key_name: Source X509 key
679 @param impinfo: Import information from destination cluster
682 job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
683 impinfo["disks"], shutdown=True,
684 remove_instance=True,
685 x509_key_name=x509_key_name,
686 destination_x509_ca=impinfo["x509_ca"])
687 (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
689 if not (fin_resu and compat.all(dresults)):
690 raise Error("Export failed for disks %s" %
691 utils.CommaJoin(str(idx) for idx, result
692 in enumerate(dresults) if not result))
695 class MoveSourceWorker(workerpool.BaseWorker):
696 def RunTask(self, rapi_factory, move): # pylint: disable=W0221
697 """Executes an instance move.
699 @type rapi_factory: L{RapiClientFactory}
700 @param rapi_factory: RAPI client factory
701 @type move: L{InstanceMove}
702 @param move: Instance move information
706 logging.info("Preparing to move %s from cluster %s to %s as %s",
707 move.src_instance_name, rapi_factory.src_cluster_name,
708 rapi_factory.dest_cluster_name, move.dest_instance_name)
710 mrt = MoveRuntime(move)
712 logging.debug("Starting destination thread")
713 dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
714 target=mrt.HandleErrors,
715 args=("dest", MoveDestExecutor,
716 rapi_factory.GetDestClient(),
720 mrt.HandleErrors("src", MoveSourceExecutor,
721 rapi_factory.GetSourceClient(), mrt)
725 if mrt.src_error_message or mrt.dest_error_message:
726 move.error_message = ("Source error: %s, destination error: %s" %
727 (mrt.src_error_message, mrt.dest_error_message))
729 move.error_message = None
730 except Exception, err: # pylint: disable=W0703
731 logging.exception("Caught unhandled exception")
732 move.error_message = str(err)
735 def CheckRapiSetup(rapi_factory):
736 """Checks the RAPI setup by retrieving the version.
738 @type rapi_factory: L{RapiClientFactory}
739 @param rapi_factory: RAPI client factory
742 src_client = rapi_factory.GetSourceClient()
743 logging.info("Connecting to source RAPI server")
744 logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
746 dest_client = rapi_factory.GetDestClient()
747 logging.info("Connecting to destination RAPI server")
748 logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
752 """Parses options passed to program.
755 program = os.path.basename(sys.argv[0])
757 parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
758 " <source-cluster> <dest-cluster>"
761 parser.add_option(cli.DEBUG_OPT)
762 parser.add_option(cli.VERBOSE_OPT)
763 parser.add_option(cli.IALLOCATOR_OPT)
764 parser.add_option(cli.BACKEND_OPT)
765 parser.add_option(cli.HVOPTS_OPT)
766 parser.add_option(cli.OSPARAMS_OPT)
767 parser.add_option(cli.NET_OPT)
768 parser.add_option(SRC_RAPI_PORT_OPT)
769 parser.add_option(SRC_CA_FILE_OPT)
770 parser.add_option(SRC_USERNAME_OPT)
771 parser.add_option(SRC_PASSWORD_FILE_OPT)
772 parser.add_option(DEST_RAPI_PORT_OPT)
773 parser.add_option(DEST_CA_FILE_OPT)
774 parser.add_option(DEST_USERNAME_OPT)
775 parser.add_option(DEST_PASSWORD_FILE_OPT)
776 parser.add_option(DEST_INSTANCE_NAME_OPT)
777 parser.add_option(DEST_PRIMARY_NODE_OPT)
778 parser.add_option(DEST_SECONDARY_NODE_OPT)
779 parser.add_option(DEST_DISK_TEMPLATE_OPT)
780 parser.add_option(PARALLEL_OPT)
782 (options, args) = parser.parse_args()
784 return (parser, options, args)
787 def CheckOptions(parser, options, args):
788 """Checks options and arguments for validity.
792 parser.error("Not enough arguments")
794 src_cluster_name = args.pop(0)
795 dest_cluster_name = args.pop(0)
796 instance_names = args
798 assert len(instance_names) > 0
800 # TODO: Remove once using system default paths for SSL certificate
801 # verification is implemented
802 if not options.src_ca_file:
803 parser.error("Missing source cluster CA file")
805 if options.parallel < 1:
806 parser.error("Number of simultaneous moves must be >= 1")
808 if (bool(options.iallocator) and
809 bool(options.dest_primary_node or options.dest_secondary_node)):
810 parser.error("Destination node and iallocator options exclude each other")
812 if len(instance_names) == 1:
813 # Moving one instance only
815 utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
818 utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
821 options.nics = cli.ParseNicOption(options.nics)
823 # Moving more than one instance
824 if (options.dest_instance_name or options.dest_primary_node or
825 options.dest_secondary_node or options.hvparams or
826 options.beparams or options.osparams or options.nics):
827 parser.error("The options --dest-instance-name, --dest-primary-node,"
828 " --dest-secondary-node, --hypervisor-parameters,"
829 " --backend-parameters, --os-parameters and --net can"
830 " only be used when moving exactly one instance")
832 return (src_cluster_name, dest_cluster_name, instance_names)
835 def DestClusterHasDefaultIAllocator(rapi_factory):
836 """Determines if a given cluster has a default iallocator.
839 result = rapi_factory.GetDestClient().GetInfo()
840 ia_name = "default_iallocator"
841 return ia_name in result and result[ia_name]
844 def ExitWithError(message):
845 """Exits after an error and shows a message.
848 sys.stderr.write("move-instance: error: " + message + "\n")
849 sys.exit(constants.EXIT_FAILURE)
857 (parser, options, args) = ParseOptions()
859 utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
861 (src_cluster_name, dest_cluster_name, instance_names) = \
862 CheckOptions(parser, options, args)
864 logging.info("Source cluster: %s", src_cluster_name)
865 logging.info("Destination cluster: %s", dest_cluster_name)
866 logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
868 rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
870 CheckRapiSetup(rapi_factory)
872 has_iallocator = options.iallocator or \
873 DestClusterHasDefaultIAllocator(rapi_factory)
875 if len(instance_names) > 1 and not has_iallocator:
876 ExitWithError("When moving multiple nodes, an iallocator must be used. "
877 "None was provided and the target cluster does not have "
878 "a default iallocator.")
879 if (len(instance_names) == 1 and not (has_iallocator or
880 options.dest_primary_node or options.dest_secondary_node)):
881 ExitWithError("Target cluster does not have a default iallocator, "
882 "please specify either destination nodes or an iallocator.")
884 # Prepare list of instance moves
886 for src_instance_name in instance_names:
887 if options.dest_instance_name:
888 assert len(instance_names) == 1
890 dest_instance_name = options.dest_instance_name
892 dest_instance_name = src_instance_name
894 moves.append(InstanceMove(src_instance_name, dest_instance_name,
895 options.dest_primary_node,
896 options.dest_secondary_node,
898 options.dest_disk_template,
904 assert len(moves) == len(instance_names)
907 wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
909 # Add instance moves to workerpool
911 wp.AddTask((rapi_factory, move))
913 # Wait for all moves to finish
917 wp.TerminateWorkers()
919 # There should be no threads running at this point, hence not using locks
922 logging.info("Instance move results:")
925 if move.dest_instance_name == move.src_instance_name:
926 name = move.src_instance_name
928 name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
930 if move.error_message:
931 msg = "Failed (%s)" % move.error_message
935 logging.info("%s: %s", name, msg)
937 if compat.any(move.error_message for move in moves):
938 sys.exit(constants.EXIT_FAILURE)
940 sys.exit(constants.EXIT_SUCCESS)
943 if __name__ == "__main__":