move-instance: Pass OS parameters to new instance
[ganeti-local] / tools / move-instance
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to move instances from one cluster to another.
22
23 """
24
25 # pylint: disable-msg=C0103
26 # C0103: Invalid name move-instance
27
28 import os
29 import sys
30 import time
31 import logging
32 import optparse
33 import threading
34
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import compat
40 from ganeti import rapi
41
42 import ganeti.rapi.client # pylint: disable-msg=W0611
43 import ganeti.rapi.client_utils
44
45
46 SRC_RAPI_PORT_OPT = \
47   cli.cli_option("--src-rapi-port", action="store", type="int",
48                  dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49                  help=("Source cluster RAPI port (defaults to %s)" %
50                        constants.DEFAULT_RAPI_PORT))
51
52 SRC_CA_FILE_OPT = \
53   cli.cli_option("--src-ca-file", action="store", type="string",
54                  dest="src_ca_file",
55                  help=("File containing source cluster Certificate"
56                        " Authority (CA) in PEM format"))
57
58 SRC_USERNAME_OPT = \
59   cli.cli_option("--src-username", action="store", type="string",
60                  dest="src_username", default=None,
61                  help="Source cluster username")
62
63 SRC_PASSWORD_FILE_OPT = \
64   cli.cli_option("--src-password-file", action="store", type="string",
65                  dest="src_password_file",
66                  help="File containing source cluster password")
67
68 DEST_RAPI_PORT_OPT = \
69   cli.cli_option("--dest-rapi-port", action="store", type="int",
70                  dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71                  help=("Destination cluster RAPI port (defaults to source"
72                        " cluster RAPI port)"))
73
74 DEST_CA_FILE_OPT = \
75   cli.cli_option("--dest-ca-file", action="store", type="string",
76                  dest="dest_ca_file",
77                  help=("File containing destination cluster Certificate"
78                        " Authority (CA) in PEM format (defaults to source"
79                        " cluster CA)"))
80
81 DEST_USERNAME_OPT = \
82   cli.cli_option("--dest-username", action="store", type="string",
83                  dest="dest_username", default=None,
84                  help=("Destination cluster username (defaults to"
85                        " source cluster username)"))
86
87 DEST_PASSWORD_FILE_OPT = \
88   cli.cli_option("--dest-password-file", action="store", type="string",
89                  dest="dest_password_file",
90                  help=("File containing destination cluster password"
91                        " (defaults to source cluster password)"))
92
93 DEST_INSTANCE_NAME_OPT = \
94   cli.cli_option("--dest-instance-name", action="store", type="string",
95                  dest="dest_instance_name",
96                  help=("Instance name on destination cluster (only"
97                        " when moving exactly one instance)"))
98
99 DEST_PRIMARY_NODE_OPT = \
100   cli.cli_option("--dest-primary-node", action="store", type="string",
101                  dest="dest_primary_node",
102                  help=("Primary node on destination cluster (only"
103                        " when moving exactly one instance)"))
104
105 DEST_SECONDARY_NODE_OPT = \
106   cli.cli_option("--dest-secondary-node", action="store", type="string",
107                  dest="dest_secondary_node",
108                  help=("Secondary node on destination cluster (only"
109                        " when moving exactly one instance)"))
110
111 PARALLEL_OPT = \
112   cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113                  dest="parallel", metavar="<number>",
114                  help="Number of instances to be moved simultaneously")
115
116
117 class Error(Exception):
118   """Generic error.
119
120   """
121
122
123 class Abort(Error):
124   """Special exception for aborting import/export.
125
126   """
127
128
129 class RapiClientFactory:
130   """Factory class for creating RAPI clients.
131
132   @ivar src_cluster_name: Source cluster name
133   @ivar dest_cluster_name: Destination cluster name
134   @ivar GetSourceClient: Callable returning new client for source cluster
135   @ivar GetDestClient: Callable returning new client for destination cluster
136
137   """
138   def __init__(self, options, src_cluster_name, dest_cluster_name):
139     """Initializes this class.
140
141     @param options: Program options
142     @type src_cluster_name: string
143     @param src_cluster_name: Source cluster name
144     @type dest_cluster_name: string
145     @param dest_cluster_name: Destination cluster name
146
147     """
148     self.src_cluster_name = src_cluster_name
149     self.dest_cluster_name = dest_cluster_name
150
151     # TODO: Implement timeouts for RAPI connections
152     # TODO: Support for using system default paths for verifying SSL certificate
153     logging.debug("Using '%s' as source CA", options.src_ca_file)
154     src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
155
156     if options.dest_ca_file:
157       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
158       dest_curl_config = \
159         rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
160     else:
161       logging.debug("Using source CA for destination")
162       dest_curl_config = src_curl_config
163
164     logging.debug("Source RAPI server is %s:%s",
165                   src_cluster_name, options.src_rapi_port)
166     logging.debug("Source username is '%s'", options.src_username)
167
168     if options.src_username is None:
169       src_username = ""
170     else:
171       src_username = options.src_username
172
173     if options.src_password_file:
174       logging.debug("Reading '%s' for source password",
175                     options.src_password_file)
176       src_password = utils.ReadOneLineFile(options.src_password_file,
177                                            strict=True)
178     else:
179       logging.debug("Source has no password")
180       src_password = None
181
182     self.GetSourceClient = lambda: \
183       rapi.client.GanetiRapiClient(src_cluster_name,
184                                    port=options.src_rapi_port,
185                                    curl_config_fn=src_curl_config,
186                                    username=src_username,
187                                    password=src_password)
188
189     if options.dest_rapi_port:
190       dest_rapi_port = options.dest_rapi_port
191     else:
192       dest_rapi_port = options.src_rapi_port
193
194     if options.dest_username is None:
195       dest_username = src_username
196     else:
197       dest_username = options.dest_username
198
199     logging.debug("Destination RAPI server is %s:%s",
200                   dest_cluster_name, dest_rapi_port)
201     logging.debug("Destination username is '%s'", dest_username)
202
203     if options.dest_password_file:
204       logging.debug("Reading '%s' for destination password",
205                     options.dest_password_file)
206       dest_password = utils.ReadOneLineFile(options.dest_password_file,
207                                             strict=True)
208     else:
209       logging.debug("Using source password for destination")
210       dest_password = src_password
211
212     self.GetDestClient = lambda: \
213       rapi.client.GanetiRapiClient(dest_cluster_name,
214                                    port=dest_rapi_port,
215                                    curl_config_fn=dest_curl_config,
216                                    username=dest_username,
217                                    password=dest_password)
218
219
220 class MoveJobPollReportCb(cli.JobPollReportCbBase):
221   def __init__(self, abort_check_fn, remote_import_fn):
222     """Initializes this class.
223
224     @type abort_check_fn: callable
225     @param abort_check_fn: Function to check whether move is aborted
226     @type remote_import_fn: callable or None
227     @param remote_import_fn: Callback for reporting received remote import
228                              information
229
230     """
231     cli.JobPollReportCbBase.__init__(self)
232     self._abort_check_fn = abort_check_fn
233     self._remote_import_fn = remote_import_fn
234
235   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236     """Handles a log message.
237
238     """
239     if log_type == constants.ELOG_REMOTE_IMPORT:
240       logging.debug("Received remote import information")
241
242       if not self._remote_import_fn:
243         raise RuntimeError("Received unexpected remote import information")
244
245       assert "x509_ca" in log_msg
246       assert "disks" in log_msg
247
248       self._remote_import_fn(log_msg)
249
250       return
251
252     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253                  cli.FormatLogMessage(log_type, log_msg))
254
255   def ReportNotChanged(self, job_id, status):
256     """Called if a job hasn't changed in a while.
257
258     """
259     try:
260       # Check whether we were told to abort by the other thread
261       self._abort_check_fn()
262     except Abort:
263       logging.warning("Aborting despite job %s still running", job_id)
264       raise
265
266
267 class InstanceMove(object):
268   """Status class for instance moves.
269
270   """
271   def __init__(self, src_instance_name, dest_instance_name,
272                dest_pnode, dest_snode, dest_iallocator):
273     """Initializes this class.
274
275     @type src_instance_name: string
276     @param src_instance_name: Instance name on source cluster
277     @type dest_instance_name: string
278     @param dest_instance_name: Instance name on destination cluster
279     @type dest_pnode: string or None
280     @param dest_pnode: Name of primary node on destination cluster
281     @type dest_snode: string or None
282     @param dest_snode: Name of secondary node on destination cluster
283     @type dest_iallocator: string or None
284     @param dest_iallocator: Name of iallocator to use
285
286     """
287     self.src_instance_name = src_instance_name
288     self.dest_instance_name = dest_instance_name
289     self.dest_pnode = dest_pnode
290     self.dest_snode = dest_snode
291     self.dest_iallocator = dest_iallocator
292
293     self.error_message = None
294
295
296 class MoveRuntime(object):
297   """Class to keep track of instance move.
298
299   """
300   def __init__(self, move):
301     """Initializes this class.
302
303     @type move: L{InstanceMove}
304
305     """
306     self.move = move
307
308     # Thread synchronization
309     self.lock = threading.Lock()
310     self.source_to_dest = threading.Condition(self.lock)
311     self.dest_to_source = threading.Condition(self.lock)
312
313     # Source information
314     self.src_error_message = None
315     self.src_expinfo = None
316     self.src_instinfo = None
317
318     # Destination information
319     self.dest_error_message = None
320     self.dest_impinfo = None
321
322   def HandleErrors(self, prefix, fn, *args):
323     """Wrapper to catch errors and abort threads.
324
325     @type prefix: string
326     @param prefix: Variable name prefix ("src" or "dest")
327     @type fn: callable
328     @param fn: Function
329
330     """
331     assert prefix in ("dest", "src")
332
333     try:
334       # Call inner function
335       fn(*args)
336
337       errmsg = None
338     except Abort:
339       errmsg = "Aborted"
340     except Exception, err:
341       logging.exception("Caught unhandled exception")
342       errmsg = str(err)
343
344     setattr(self, "%s_error_message" % prefix, errmsg)
345
346     self.lock.acquire()
347     try:
348       self.source_to_dest.notifyAll()
349       self.dest_to_source.notifyAll()
350     finally:
351       self.lock.release()
352
353   def CheckAbort(self):
354     """Check whether thread should be aborted.
355
356     @raise Abort: When thread should be aborted
357
358     """
359     if not (self.src_error_message is None and
360             self.dest_error_message is None):
361       logging.info("Aborting")
362       raise Abort()
363
364   def Wait(self, cond, check_fn):
365     """Waits for a condition to become true.
366
367     @type cond: threading.Condition
368     @param cond: Threading condition
369     @type check_fn: callable
370     @param check_fn: Function to check whether condition is true
371
372     """
373     cond.acquire()
374     try:
375       while check_fn(self):
376         self.CheckAbort()
377         cond.wait()
378     finally:
379       cond.release()
380
381   def PollJob(self, cl, job_id, remote_import_fn=None):
382     """Wrapper for polling a job.
383
384     @type cl: L{rapi.client.GanetiRapiClient}
385     @param cl: RAPI client
386     @type job_id: string
387     @param job_id: Job ID
388     @type remote_import_fn: callable or None
389     @param remote_import_fn: Callback for reporting received remote import
390                              information
391
392     """
393     return rapi.client_utils.PollJob(cl, job_id,
394                                      MoveJobPollReportCb(self.CheckAbort,
395                                                          remote_import_fn))
396
397
398 class MoveDestExecutor(object):
399   def __init__(self, dest_client, mrt):
400     """Destination side of an instance move.
401
402     @type dest_client: L{rapi.client.GanetiRapiClient}
403     @param dest_client: RAPI client
404     @type mrt: L{MoveRuntime}
405     @param mrt: Instance move runtime information
406
407     """
408     logging.debug("Waiting for instance information to become available")
409     mrt.Wait(mrt.source_to_dest,
410              lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
411
412     logging.info("Creating instance %s in remote-import mode",
413                  mrt.move.dest_instance_name)
414     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
415                                   mrt.move.dest_pnode, mrt.move.dest_snode,
416                                   mrt.move.dest_iallocator,
417                                   mrt.src_instinfo, mrt.src_expinfo)
418     mrt.PollJob(dest_client, job_id,
419                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
420
421     logging.info("Import successful")
422
423   @staticmethod
424   def _SetImportInfo(mrt, impinfo):
425     """Sets the remote import information and notifies source thread.
426
427     @type mrt: L{MoveRuntime}
428     @param mrt: Instance move runtime information
429     @param impinfo: Remote import information
430
431     """
432     mrt.dest_to_source.acquire()
433     try:
434       mrt.dest_impinfo = impinfo
435       mrt.dest_to_source.notifyAll()
436     finally:
437       mrt.dest_to_source.release()
438
439   @staticmethod
440   def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
441     """Starts the instance creation in remote import mode.
442
443     @type cl: L{rapi.client.GanetiRapiClient}
444     @param cl: RAPI client
445     @type name: string
446     @param name: Instance name
447     @type pnode: string or None
448     @param pnode: Name of primary node on destination cluster
449     @type snode: string or None
450     @param snode: Name of secondary node on destination cluster
451     @type iallocator: string or None
452     @param iallocator: Name of iallocator to use
453     @type instance: dict
454     @param instance: Instance details from source cluster
455     @type expinfo: dict
456     @param expinfo: Prepared export information from source cluster
457     @return: Job ID
458
459     """
460     disk_template = instance["disk_template"]
461
462     disks = [{
463       "size": i["size"],
464       "mode": i["mode"],
465       } for i in instance["disks"]]
466
467     nics = [{
468       "ip": ip,
469       "mac": mac,
470       "mode": mode,
471       "link": link,
472       } for ip, mac, mode, link in instance["nics"]]
473
474     # TODO: Should this be the actual up/down status? (run_state)
475     start = (instance["config_state"] == "up")
476
477     assert len(disks) == len(instance["disks"])
478     assert len(nics) == len(instance["nics"])
479
480     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
481                              name, disk_template, disks, nics,
482                              os=instance["os"],
483                              pnode=pnode,
484                              snode=snode,
485                              start=start,
486                              ip_check=False,
487                              iallocator=iallocator,
488                              hypervisor=instance["hypervisor"],
489                              source_handshake=expinfo["handshake"],
490                              source_x509_ca=expinfo["x509_ca"],
491                              source_instance_name=instance["name"],
492                              beparams=instance["be_instance"],
493                              hvparams=instance["hv_instance"],
494                              osparams=instance["os_instance"])
495
496
497 class MoveSourceExecutor(object):
498   def __init__(self, src_client, mrt):
499     """Source side of an instance move.
500
501     @type src_client: L{rapi.client.GanetiRapiClient}
502     @param src_client: RAPI client
503     @type mrt: L{MoveRuntime}
504     @param mrt: Instance move runtime information
505
506     """
507     logging.info("Checking whether instance exists")
508     self._CheckInstance(src_client, mrt.move.src_instance_name)
509
510     logging.info("Retrieving instance information from source cluster")
511     instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
512                                      mrt.move.src_instance_name)
513
514     logging.info("Preparing export on source cluster")
515     expinfo = self._PrepareExport(src_client, mrt.PollJob,
516                                   mrt.move.src_instance_name)
517     assert "handshake" in expinfo
518     assert "x509_key_name" in expinfo
519     assert "x509_ca" in expinfo
520
521     # Hand information to destination thread
522     mrt.source_to_dest.acquire()
523     try:
524       mrt.src_instinfo = instinfo
525       mrt.src_expinfo = expinfo
526       mrt.source_to_dest.notifyAll()
527     finally:
528       mrt.source_to_dest.release()
529
530     logging.info("Waiting for destination information to become available")
531     mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
532
533     logging.info("Starting remote export on source cluster")
534     self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
535                          expinfo["x509_key_name"], mrt.dest_impinfo)
536
537     logging.info("Export successful")
538
539   @staticmethod
540   def _CheckInstance(cl, name):
541     """Checks whether the instance exists on the source cluster.
542
543     @type cl: L{rapi.client.GanetiRapiClient}
544     @param cl: RAPI client
545     @type name: string
546     @param name: Instance name
547
548     """
549     try:
550       cl.GetInstance(name)
551     except rapi.client.GanetiApiError, err:
552       if err.code == rapi.client.HTTP_NOT_FOUND:
553         raise Error("Instance %s not found (%s)" % (name, str(err)))
554       raise
555
556   @staticmethod
557   def _GetInstanceInfo(cl, poll_job_fn, name):
558     """Retrieves detailed instance information from source cluster.
559
560     @type cl: L{rapi.client.GanetiRapiClient}
561     @param cl: RAPI client
562     @type poll_job_fn: callable
563     @param poll_job_fn: Function to poll for job result
564     @type name: string
565     @param name: Instance name
566
567     """
568     job_id = cl.GetInstanceInfo(name, static=True)
569     result = poll_job_fn(cl, job_id)
570     assert len(result[0].keys()) == 1
571     return result[0][result[0].keys()[0]]
572
573   @staticmethod
574   def _PrepareExport(cl, poll_job_fn, name):
575     """Prepares export on source cluster.
576
577     @type cl: L{rapi.client.GanetiRapiClient}
578     @param cl: RAPI client
579     @type poll_job_fn: callable
580     @param poll_job_fn: Function to poll for job result
581     @type name: string
582     @param name: Instance name
583
584     """
585     job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
586     return poll_job_fn(cl, job_id)[0]
587
588   @staticmethod
589   def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
590     """Exports instance from source cluster.
591
592     @type cl: L{rapi.client.GanetiRapiClient}
593     @param cl: RAPI client
594     @type poll_job_fn: callable
595     @param poll_job_fn: Function to poll for job result
596     @type name: string
597     @param name: Instance name
598     @param x509_key_name: Source X509 key
599     @param impinfo: Import information from destination cluster
600
601     """
602     job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
603                                impinfo["disks"], shutdown=True,
604                                remove_instance=True,
605                                x509_key_name=x509_key_name,
606                                destination_x509_ca=impinfo["x509_ca"])
607     (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
608
609     if not (fin_resu and compat.all(dresults)):
610       raise Error("Export failed for disks %s" %
611                   utils.CommaJoin(str(idx) for idx, result
612                                   in enumerate(dresults) if not result))
613
614
615 class MoveSourceWorker(workerpool.BaseWorker):
616   def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
617     """Executes an instance move.
618
619     @type rapi_factory: L{RapiClientFactory}
620     @param rapi_factory: RAPI client factory
621     @type move: L{InstanceMove}
622     @param move: Instance move information
623
624     """
625     try:
626       logging.info("Preparing to move %s from cluster %s to %s as %s",
627                    move.src_instance_name, rapi_factory.src_cluster_name,
628                    rapi_factory.dest_cluster_name, move.dest_instance_name)
629
630       mrt = MoveRuntime(move)
631
632       logging.debug("Starting destination thread")
633       dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
634                                      target=mrt.HandleErrors,
635                                      args=("dest", MoveDestExecutor,
636                                            rapi_factory.GetDestClient(),
637                                            mrt, ))
638       dest_thread.start()
639       try:
640         mrt.HandleErrors("src", MoveSourceExecutor,
641                          rapi_factory.GetSourceClient(), mrt)
642       finally:
643         dest_thread.join()
644
645       if mrt.src_error_message or mrt.dest_error_message:
646         move.error_message = ("Source error: %s, destination error: %s" %
647                               (mrt.src_error_message, mrt.dest_error_message))
648       else:
649         move.error_message = None
650     except Exception, err: # pylint: disable-msg=W0703
651       logging.exception("Caught unhandled exception")
652       move.error_message = str(err)
653
654
655 def CheckRapiSetup(rapi_factory):
656   """Checks the RAPI setup by retrieving the version.
657
658   @type rapi_factory: L{RapiClientFactory}
659   @param rapi_factory: RAPI client factory
660
661   """
662   src_client = rapi_factory.GetSourceClient()
663   logging.info("Connecting to source RAPI server")
664   logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
665
666   dest_client = rapi_factory.GetDestClient()
667   logging.info("Connecting to destination RAPI server")
668   logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
669
670
671 def SetupLogging(options):
672   """Setting up logging infrastructure.
673
674   @param options: Parsed command line options
675
676   """
677   fmt = "%(asctime)s: %(threadName)s "
678   if options.debug or options.verbose:
679     fmt += "%(levelname)s "
680   fmt += "%(message)s"
681
682   formatter = logging.Formatter(fmt)
683
684   stderr_handler = logging.StreamHandler()
685   stderr_handler.setFormatter(formatter)
686   if options.debug:
687     stderr_handler.setLevel(logging.NOTSET)
688   elif options.verbose:
689     stderr_handler.setLevel(logging.INFO)
690   else:
691     stderr_handler.setLevel(logging.ERROR)
692
693   root_logger = logging.getLogger("")
694   root_logger.setLevel(logging.NOTSET)
695   root_logger.addHandler(stderr_handler)
696
697
698 def ParseOptions():
699   """Parses options passed to program.
700
701   """
702   program = os.path.basename(sys.argv[0])
703
704   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
705                                         " <source-cluster> <dest-cluster>"
706                                         " <instance...>"),
707                                  prog=program)
708   parser.add_option(cli.DEBUG_OPT)
709   parser.add_option(cli.VERBOSE_OPT)
710   parser.add_option(cli.IALLOCATOR_OPT)
711   parser.add_option(SRC_RAPI_PORT_OPT)
712   parser.add_option(SRC_CA_FILE_OPT)
713   parser.add_option(SRC_USERNAME_OPT)
714   parser.add_option(SRC_PASSWORD_FILE_OPT)
715   parser.add_option(DEST_RAPI_PORT_OPT)
716   parser.add_option(DEST_CA_FILE_OPT)
717   parser.add_option(DEST_USERNAME_OPT)
718   parser.add_option(DEST_PASSWORD_FILE_OPT)
719   parser.add_option(DEST_INSTANCE_NAME_OPT)
720   parser.add_option(DEST_PRIMARY_NODE_OPT)
721   parser.add_option(DEST_SECONDARY_NODE_OPT)
722   parser.add_option(PARALLEL_OPT)
723
724   (options, args) = parser.parse_args()
725
726   return (parser, options, args)
727
728
729 def CheckOptions(parser, options, args):
730   """Checks options and arguments for validity.
731
732   """
733   if len(args) < 3:
734     parser.error("Not enough arguments")
735
736   src_cluster_name = args.pop(0)
737   dest_cluster_name = args.pop(0)
738   instance_names = args
739
740   assert len(instance_names) > 0
741
742   # TODO: Remove once using system default paths for SSL certificate
743   # verification is implemented
744   if not options.src_ca_file:
745     parser.error("Missing source cluster CA file")
746
747   if options.parallel < 1:
748     parser.error("Number of simultaneous moves must be >= 1")
749
750   if not (bool(options.iallocator) ^
751           bool(options.dest_primary_node or options.dest_secondary_node)):
752     parser.error("Destination node and iallocator options exclude each other")
753
754   if len(instance_names) == 1:
755     # Moving one instance only
756     if not (options.iallocator or
757             options.dest_primary_node or
758             options.dest_secondary_node):
759       parser.error("An iallocator or the destination node is required")
760   else:
761     # Moving more than one instance
762     if (options.dest_instance_name or options.dest_primary_node or
763         options.dest_secondary_node):
764       parser.error("The options --dest-instance-name, --dest-primary-node and"
765                    " --dest-secondary-node can only be used when moving exactly"
766                    " one instance")
767
768     if not options.iallocator:
769       parser.error("An iallocator must be specified for moving more than one"
770                    " instance")
771
772   return (src_cluster_name, dest_cluster_name, instance_names)
773
774
775 @rapi.client.UsesRapiClient
776 def main():
777   """Main routine.
778
779   """
780   (parser, options, args) = ParseOptions()
781
782   SetupLogging(options)
783
784   (src_cluster_name, dest_cluster_name, instance_names) = \
785     CheckOptions(parser, options, args)
786
787   logging.info("Source cluster: %s", src_cluster_name)
788   logging.info("Destination cluster: %s", dest_cluster_name)
789   logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
790
791   rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
792
793   CheckRapiSetup(rapi_factory)
794
795   assert (len(instance_names) == 1 or
796           not (options.dest_primary_node or options.dest_secondary_node))
797   assert len(instance_names) == 1 or options.iallocator
798   assert (len(instance_names) > 1 or options.iallocator or
799           options.dest_primary_node or options.dest_secondary_node)
800
801   # Prepare list of instance moves
802   moves = []
803   for src_instance_name in instance_names:
804     if options.dest_instance_name:
805       assert len(instance_names) == 1
806       # Rename instance
807       dest_instance_name = options.dest_instance_name
808     else:
809       dest_instance_name = src_instance_name
810
811     moves.append(InstanceMove(src_instance_name, dest_instance_name,
812                               options.dest_primary_node,
813                               options.dest_secondary_node,
814                               options.iallocator))
815
816   assert len(moves) == len(instance_names)
817
818   # Start workerpool
819   wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
820   try:
821     # Add instance moves to workerpool
822     for move in moves:
823       wp.AddTask((rapi_factory, move))
824
825     # Wait for all moves to finish
826     wp.Quiesce()
827
828   finally:
829     wp.TerminateWorkers()
830
831   # There should be no threads running at this point, hence not using locks
832   # anymore
833
834   logging.info("Instance move results:")
835
836   for move in moves:
837     if move.dest_instance_name == move.src_instance_name:
838       name = move.src_instance_name
839     else:
840       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
841
842     if move.error_message:
843       msg = "Failed (%s)" % move.error_message
844     else:
845       msg = "Success"
846
847     logging.info("%s: %s", name, msg)
848
849   if compat.any(move.error_message for move in moves):
850     sys.exit(constants.EXIT_FAILURE)
851
852   sys.exit(constants.EXIT_SUCCESS)
853
854
855 if __name__ == "__main__":
856   main()