RAPI client: Switch to pycURL
[ganeti-local] / tools / move-instance
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to move instances from one cluster to another.
22
23 """
24
25 # pylint: disable-msg=C0103
26 # C0103: Invalid name move-instance
27
28 import os
29 import sys
30 import time
31 import logging
32 import optparse
33 import threading
34
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import compat
40 from ganeti import rapi
41
42 import ganeti.rapi.client # pylint: disable-msg=W0611
43 import ganeti.rapi.client_utils
44
45
46 SRC_RAPI_PORT_OPT = \
47   cli.cli_option("--src-rapi-port", action="store", type="int",
48                  dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49                  help=("Source cluster RAPI port (defaults to %s)" %
50                        constants.DEFAULT_RAPI_PORT))
51
52 SRC_CA_FILE_OPT = \
53   cli.cli_option("--src-ca-file", action="store", type="string",
54                  dest="src_ca_file",
55                  help=("File containing source cluster Certificate"
56                        " Authority (CA) in PEM format"))
57
58 SRC_USERNAME_OPT = \
59   cli.cli_option("--src-username", action="store", type="string",
60                  dest="src_username", default=None,
61                  help="Source cluster username")
62
63 SRC_PASSWORD_FILE_OPT = \
64   cli.cli_option("--src-password-file", action="store", type="string",
65                  dest="src_password_file",
66                  help="File containing source cluster password")
67
68 DEST_RAPI_PORT_OPT = \
69   cli.cli_option("--dest-rapi-port", action="store", type="int",
70                  dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71                  help=("Destination cluster RAPI port (defaults to source"
72                        " cluster RAPI port)"))
73
74 DEST_CA_FILE_OPT = \
75   cli.cli_option("--dest-ca-file", action="store", type="string",
76                  dest="dest_ca_file",
77                  help=("File containing destination cluster Certificate"
78                        " Authority (CA) in PEM format (defaults to source"
79                        " cluster CA)"))
80
81 DEST_USERNAME_OPT = \
82   cli.cli_option("--dest-username", action="store", type="string",
83                  dest="dest_username", default=None,
84                  help=("Destination cluster username (defaults to"
85                        " source cluster username)"))
86
87 DEST_PASSWORD_FILE_OPT = \
88   cli.cli_option("--dest-password-file", action="store", type="string",
89                  dest="dest_password_file",
90                  help=("File containing destination cluster password"
91                        " (defaults to source cluster password)"))
92
93 DEST_INSTANCE_NAME_OPT = \
94   cli.cli_option("--dest-instance-name", action="store", type="string",
95                  dest="dest_instance_name",
96                  help=("Instance name on destination cluster (only"
97                        " when moving exactly one instance)"))
98
99 DEST_PRIMARY_NODE_OPT = \
100   cli.cli_option("--dest-primary-node", action="store", type="string",
101                  dest="dest_primary_node",
102                  help=("Primary node on destination cluster (only"
103                        " when moving exactly one instance)"))
104
105 DEST_SECONDARY_NODE_OPT = \
106   cli.cli_option("--dest-secondary-node", action="store", type="string",
107                  dest="dest_secondary_node",
108                  help=("Secondary node on destination cluster (only"
109                        " when moving exactly one instance)"))
110
111 PARALLEL_OPT = \
112   cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113                  dest="parallel", metavar="<number>",
114                  help="Number of instances to be moved simultaneously")
115
116
117 class Error(Exception):
118   """Generic error.
119
120   """
121
122
123 class Abort(Error):
124   """Special exception for aborting import/export.
125
126   """
127
128
129 class RapiClientFactory:
130   """Factory class for creating RAPI clients.
131
132   @ivar src_cluster_name: Source cluster name
133   @ivar dest_cluster_name: Destination cluster name
134   @ivar GetSourceClient: Callable returning new client for source cluster
135   @ivar GetDestClient: Callable returning new client for destination cluster
136
137   """
138   def __init__(self, options, src_cluster_name, dest_cluster_name):
139     """Initializes this class.
140
141     @param options: Program options
142     @type src_cluster_name: string
143     @param src_cluster_name: Source cluster name
144     @type dest_cluster_name: string
145     @param dest_cluster_name: Destination cluster name
146
147     """
148     self.src_cluster_name = src_cluster_name
149     self.dest_cluster_name = dest_cluster_name
150
151     # TODO: Implement timeouts for RAPI connections
152     # TODO: Support for using system default paths for verifying SSL certificate
153     logging.debug("Using '%s' as source CA", options.src_ca_file)
154     src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
155
156     if options.dest_ca_file:
157       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
158       dest_curl_config = \
159         rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
160     else:
161       logging.debug("Using source CA for destination")
162       dest_curl_config = src_curl_config
163
164     logging.debug("Source RAPI server is %s:%s",
165                   src_cluster_name, options.src_rapi_port)
166     logging.debug("Source username is '%s'", options.src_username)
167
168     if options.src_username is None:
169       src_username = ""
170     else:
171       src_username = options.src_username
172
173     if options.src_password_file:
174       logging.debug("Reading '%s' for source password",
175                     options.src_password_file)
176       src_password = utils.ReadOneLineFile(options.src_password_file,
177                                            strict=True)
178     else:
179       logging.debug("Source has no password")
180       src_password = None
181
182     self.GetSourceClient = lambda: \
183       rapi.client.GanetiRapiClient(src_cluster_name,
184                                    port=options.src_rapi_port,
185                                    curl_config_fn=src_curl_config,
186                                    username=src_username,
187                                    password=src_password)
188
189     if options.dest_rapi_port:
190       dest_rapi_port = options.dest_rapi_port
191     else:
192       dest_rapi_port = options.src_rapi_port
193
194     if options.dest_username is None:
195       dest_username = src_username
196     else:
197       dest_username = options.dest_username
198
199     logging.debug("Destination RAPI server is %s:%s",
200                   dest_cluster_name, dest_rapi_port)
201     logging.debug("Destination username is '%s'", dest_username)
202
203     if options.dest_password_file:
204       logging.debug("Reading '%s' for destination password",
205                     options.dest_password_file)
206       dest_password = utils.ReadOneLineFile(options.dest_password_file,
207                                             strict=True)
208     else:
209       logging.debug("Using source password for destination")
210       dest_password = src_password
211
212     self.GetDestClient = lambda: \
213       rapi.client.GanetiRapiClient(dest_cluster_name,
214                                    port=dest_rapi_port,
215                                    curl_config_fn=dest_curl_config,
216                                    username=dest_username,
217                                    password=dest_password)
218
219
220 class MoveJobPollReportCb(cli.JobPollReportCbBase):
221   def __init__(self, abort_check_fn, remote_import_fn):
222     """Initializes this class.
223
224     @type abort_check_fn: callable
225     @param abort_check_fn: Function to check whether move is aborted
226     @type remote_import_fn: callable or None
227     @param remote_import_fn: Callback for reporting received remote import
228                              information
229
230     """
231     cli.JobPollReportCbBase.__init__(self)
232     self._abort_check_fn = abort_check_fn
233     self._remote_import_fn = remote_import_fn
234
235   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236     """Handles a log message.
237
238     """
239     if log_type == constants.ELOG_REMOTE_IMPORT:
240       logging.debug("Received remote import information")
241
242       if not self._remote_import_fn:
243         raise RuntimeError("Received unexpected remote import information")
244
245       assert "x509_ca" in log_msg
246       assert "disks" in log_msg
247
248       self._remote_import_fn(log_msg)
249
250       return
251
252     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253                  utils.SafeEncode(log_msg))
254
255   def ReportNotChanged(self, job_id, status):
256     """Called if a job hasn't changed in a while.
257
258     """
259     try:
260       # Check whether we were told to abort by the other thread
261       self._abort_check_fn()
262     except Abort:
263       logging.warning("Aborting despite job %s still running", job_id)
264       raise
265
266
267 class InstanceMove(object):
268   """Status class for instance moves.
269
270   """
271   def __init__(self, src_instance_name, dest_instance_name,
272                dest_pnode, dest_snode, dest_iallocator):
273     """Initializes this class.
274
275     @type src_instance_name: string
276     @param src_instance_name: Instance name on source cluster
277     @type dest_instance_name: string
278     @param dest_instance_name: Instance name on destination cluster
279     @type dest_pnode: string or None
280     @param dest_pnode: Name of primary node on destination cluster
281     @type dest_snode: string or None
282     @param dest_snode: Name of secondary node on destination cluster
283     @type dest_iallocator: string or None
284     @param dest_iallocator: Name of iallocator to use
285
286     """
287     self.src_instance_name = src_instance_name
288     self.dest_instance_name = dest_instance_name
289     self.dest_pnode = dest_pnode
290     self.dest_snode = dest_snode
291     self.dest_iallocator = dest_iallocator
292
293     self.error_message = None
294
295
296 class MoveRuntime(object):
297   """Class to keep track of instance move.
298
299   """
300   def __init__(self, move):
301     """Initializes this class.
302
303     @type move: L{InstanceMove}
304
305     """
306     self.move = move
307
308     # Thread synchronization
309     self.lock = threading.Lock()
310     self.source_to_dest = threading.Condition(self.lock)
311     self.dest_to_source = threading.Condition(self.lock)
312
313     # Source information
314     self.src_error_message = None
315     self.src_expinfo = None
316     self.src_instinfo = None
317
318     # Destination information
319     self.dest_error_message = None
320     self.dest_impinfo = None
321
322   def HandleErrors(self, prefix, fn, *args):
323     """Wrapper to catch errors and abort threads.
324
325     @type prefix: string
326     @param prefix: Variable name prefix ("src" or "dest")
327     @type fn: callable
328     @param fn: Function
329
330     """
331     assert prefix in ("dest", "src")
332
333     try:
334       # Call inner function
335       fn(*args)
336
337       errmsg = None
338     except Abort:
339       errmsg = "Aborted"
340     except Exception, err:
341       logging.exception("Caught unhandled exception")
342       errmsg = str(err)
343
344     setattr(self, "%s_error_message" % prefix, errmsg)
345
346     self.lock.acquire()
347     try:
348       self.source_to_dest.notifyAll()
349       self.dest_to_source.notifyAll()
350     finally:
351       self.lock.release()
352
353   def CheckAbort(self):
354     """Check whether thread should be aborted.
355
356     @raise Abort: When thread should be aborted
357
358     """
359     if not (self.src_error_message is None and
360             self.dest_error_message is None):
361       logging.info("Aborting")
362       raise Abort()
363
364   def Wait(self, cond, check_fn):
365     """Waits for a condition to become true.
366
367     @type cond: threading.Condition
368     @param cond: Threading condition
369     @type check_fn: callable
370     @param check_fn: Function to check whether condition is true
371
372     """
373     cond.acquire()
374     try:
375       while check_fn(self):
376         self.CheckAbort()
377         cond.wait()
378     finally:
379       cond.release()
380
381   def PollJob(self, cl, job_id, remote_import_fn=None):
382     """Wrapper for polling a job.
383
384     @type cl: L{rapi.client.GanetiRapiClient}
385     @param cl: RAPI client
386     @type job_id: string
387     @param job_id: Job ID
388     @type remote_import_fn: callable or None
389     @param remote_import_fn: Callback for reporting received remote import
390                              information
391
392     """
393     return rapi.client_utils.PollJob(cl, job_id,
394                                      MoveJobPollReportCb(self.CheckAbort,
395                                                          remote_import_fn))
396
397
398 class MoveDestExecutor(object):
399   def __init__(self, dest_client, mrt):
400     """Destination side of an instance move.
401
402     @type dest_client: L{rapi.client.GanetiRapiClient}
403     @param dest_client: RAPI client
404     @type mrt: L{MoveRuntime}
405     @param mrt: Instance move runtime information
406
407     """
408     logging.debug("Waiting for instance information to become available")
409     mrt.Wait(mrt.source_to_dest,
410              lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
411
412     logging.info("Creating instance %s in remote-import mode",
413                  mrt.move.dest_instance_name)
414     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
415                                   mrt.move.dest_pnode, mrt.move.dest_snode,
416                                   mrt.move.dest_iallocator,
417                                   mrt.src_instinfo, mrt.src_expinfo)
418     mrt.PollJob(dest_client, job_id,
419                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
420
421     logging.info("Import successful")
422
423   @staticmethod
424   def _SetImportInfo(mrt, impinfo):
425     """Sets the remote import information and notifies source thread.
426
427     @type mrt: L{MoveRuntime}
428     @param mrt: Instance move runtime information
429     @param impinfo: Remote import information
430
431     """
432     mrt.dest_to_source.acquire()
433     try:
434       mrt.dest_impinfo = impinfo
435       mrt.dest_to_source.notifyAll()
436     finally:
437       mrt.dest_to_source.release()
438
439   @staticmethod
440   def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
441     """Starts the instance creation in remote import mode.
442
443     @type cl: L{rapi.client.GanetiRapiClient}
444     @param cl: RAPI client
445     @type name: string
446     @param name: Instance name
447     @type pnode: string or None
448     @param pnode: Name of primary node on destination cluster
449     @type snode: string or None
450     @param snode: Name of secondary node on destination cluster
451     @type iallocator: string or None
452     @param iallocator: Name of iallocator to use
453     @type instance: dict
454     @param instance: Instance details from source cluster
455     @type expinfo: dict
456     @param expinfo: Prepared export information from source cluster
457     @return: Job ID
458
459     """
460     disk_template = instance["disk_template"]
461
462     disks = [{
463       "size": i["size"],
464       "mode": i["mode"],
465       } for i in instance["disks"]]
466
467     nics = [{
468       "ip": ip,
469       "mac": mac,
470       "mode": mode,
471       "link": link,
472       } for ip, mac, mode, link in instance["nics"]]
473
474     # TODO: Should this be the actual up/down status? (run_state)
475     start = (instance["config_state"] == "up")
476
477     assert len(disks) == len(instance["disks"])
478     assert len(nics) == len(instance["nics"])
479
480     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
481                              name, disk_template, disks, nics,
482                              os=instance["os"],
483                              pnode=pnode,
484                              snode=snode,
485                              start=start,
486                              ip_check=False,
487                              iallocator=iallocator,
488                              hypervisor=instance["hypervisor"],
489                              source_handshake=expinfo["handshake"],
490                              source_x509_ca=expinfo["x509_ca"],
491                              source_instance_name=instance["name"],
492                              beparams=instance["be_instance"],
493                              hvparams=instance["hv_instance"])
494
495
496 class MoveSourceExecutor(object):
497   def __init__(self, src_client, mrt):
498     """Source side of an instance move.
499
500     @type src_client: L{rapi.client.GanetiRapiClient}
501     @param src_client: RAPI client
502     @type mrt: L{MoveRuntime}
503     @param mrt: Instance move runtime information
504
505     """
506     logging.info("Checking whether instance exists")
507     self._CheckInstance(src_client, mrt.move.src_instance_name)
508
509     logging.info("Retrieving instance information from source cluster")
510     instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
511                                      mrt.move.src_instance_name)
512
513     logging.info("Preparing export on source cluster")
514     expinfo = self._PrepareExport(src_client, mrt.PollJob,
515                                   mrt.move.src_instance_name)
516     assert "handshake" in expinfo
517     assert "x509_key_name" in expinfo
518     assert "x509_ca" in expinfo
519
520     # Hand information to destination thread
521     mrt.source_to_dest.acquire()
522     try:
523       mrt.src_instinfo = instinfo
524       mrt.src_expinfo = expinfo
525       mrt.source_to_dest.notifyAll()
526     finally:
527       mrt.source_to_dest.release()
528
529     logging.info("Waiting for destination information to become available")
530     mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
531
532     logging.info("Starting remote export on source cluster")
533     self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
534                          expinfo["x509_key_name"], mrt.dest_impinfo)
535
536     logging.info("Export successful")
537
538   @staticmethod
539   def _CheckInstance(cl, name):
540     """Checks whether the instance exists on the source cluster.
541
542     @type cl: L{rapi.client.GanetiRapiClient}
543     @param cl: RAPI client
544     @type name: string
545     @param name: Instance name
546
547     """
548     try:
549       cl.GetInstance(name)
550     except rapi.client.GanetiApiError, err:
551       if err.code == rapi.client.HTTP_NOT_FOUND:
552         raise Error("Instance %s not found (%s)" % (name, str(err)))
553       raise
554
555   @staticmethod
556   def _GetInstanceInfo(cl, poll_job_fn, name):
557     """Retrieves detailed instance information from source cluster.
558
559     @type cl: L{rapi.client.GanetiRapiClient}
560     @param cl: RAPI client
561     @type poll_job_fn: callable
562     @param poll_job_fn: Function to poll for job result
563     @type name: string
564     @param name: Instance name
565
566     """
567     job_id = cl.GetInstanceInfo(name, static=True)
568     result = poll_job_fn(cl, job_id)
569     assert len(result[0].keys()) == 1
570     return result[0][result[0].keys()[0]]
571
572   @staticmethod
573   def _PrepareExport(cl, poll_job_fn, name):
574     """Prepares export on source cluster.
575
576     @type cl: L{rapi.client.GanetiRapiClient}
577     @param cl: RAPI client
578     @type poll_job_fn: callable
579     @param poll_job_fn: Function to poll for job result
580     @type name: string
581     @param name: Instance name
582
583     """
584     job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
585     return poll_job_fn(cl, job_id)[0]
586
587   @staticmethod
588   def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
589     """Exports instance from source cluster.
590
591     @type cl: L{rapi.client.GanetiRapiClient}
592     @param cl: RAPI client
593     @type poll_job_fn: callable
594     @param poll_job_fn: Function to poll for job result
595     @type name: string
596     @param name: Instance name
597     @param x509_key_name: Source X509 key
598     @param impinfo: Import information from destination cluster
599
600     """
601     job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
602                                impinfo["disks"], shutdown=True,
603                                remove_instance=True,
604                                x509_key_name=x509_key_name,
605                                destination_x509_ca=impinfo["x509_ca"])
606     (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
607
608     if not (fin_resu and compat.all(dresults)):
609       raise Error("Export failed for disks %s" %
610                   utils.CommaJoin(str(idx) for idx, result
611                                   in enumerate(dresults) if not result))
612
613
614 class MoveSourceWorker(workerpool.BaseWorker):
615   def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
616     """Executes an instance move.
617
618     @type rapi_factory: L{RapiClientFactory}
619     @param rapi_factory: RAPI client factory
620     @type move: L{InstanceMove}
621     @param move: Instance move information
622
623     """
624     try:
625       logging.info("Preparing to move %s from cluster %s to %s as %s",
626                    move.src_instance_name, rapi_factory.src_cluster_name,
627                    rapi_factory.dest_cluster_name, move.dest_instance_name)
628
629       mrt = MoveRuntime(move)
630
631       logging.debug("Starting destination thread")
632       dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
633                                      target=mrt.HandleErrors,
634                                      args=("dest", MoveDestExecutor,
635                                            rapi_factory.GetDestClient(),
636                                            mrt, ))
637       dest_thread.start()
638       try:
639         mrt.HandleErrors("src", MoveSourceExecutor,
640                          rapi_factory.GetSourceClient(), mrt)
641       finally:
642         dest_thread.join()
643
644       if mrt.src_error_message or mrt.dest_error_message:
645         move.error_message = ("Source error: %s, destination error: %s" %
646                               (mrt.src_error_message, mrt.dest_error_message))
647       else:
648         move.error_message = None
649     except Exception, err: # pylint: disable-msg=W0703
650       logging.exception("Caught unhandled exception")
651       move.error_message = str(err)
652
653
654 def CheckRapiSetup(rapi_factory):
655   """Checks the RAPI setup by retrieving the version.
656
657   @type rapi_factory: L{RapiClientFactory}
658   @param rapi_factory: RAPI client factory
659
660   """
661   src_client = rapi_factory.GetSourceClient()
662   logging.info("Connecting to source RAPI server")
663   logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
664
665   dest_client = rapi_factory.GetDestClient()
666   logging.info("Connecting to destination RAPI server")
667   logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
668
669
670 def SetupLogging(options):
671   """Setting up logging infrastructure.
672
673   @param options: Parsed command line options
674
675   """
676   fmt = "%(asctime)s: %(threadName)s "
677   if options.debug or options.verbose:
678     fmt += "%(levelname)s "
679   fmt += "%(message)s"
680
681   formatter = logging.Formatter(fmt)
682
683   stderr_handler = logging.StreamHandler()
684   stderr_handler.setFormatter(formatter)
685   if options.debug:
686     stderr_handler.setLevel(logging.NOTSET)
687   elif options.verbose:
688     stderr_handler.setLevel(logging.INFO)
689   else:
690     stderr_handler.setLevel(logging.ERROR)
691
692   root_logger = logging.getLogger("")
693   root_logger.setLevel(logging.NOTSET)
694   root_logger.addHandler(stderr_handler)
695
696
697 def ParseOptions():
698   """Parses options passed to program.
699
700   """
701   program = os.path.basename(sys.argv[0])
702
703   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
704                                         " <source-cluster> <dest-cluster>"
705                                         " <instance...>"),
706                                  prog=program)
707   parser.add_option(cli.DEBUG_OPT)
708   parser.add_option(cli.VERBOSE_OPT)
709   parser.add_option(cli.IALLOCATOR_OPT)
710   parser.add_option(SRC_RAPI_PORT_OPT)
711   parser.add_option(SRC_CA_FILE_OPT)
712   parser.add_option(SRC_USERNAME_OPT)
713   parser.add_option(SRC_PASSWORD_FILE_OPT)
714   parser.add_option(DEST_RAPI_PORT_OPT)
715   parser.add_option(DEST_CA_FILE_OPT)
716   parser.add_option(DEST_USERNAME_OPT)
717   parser.add_option(DEST_PASSWORD_FILE_OPT)
718   parser.add_option(DEST_INSTANCE_NAME_OPT)
719   parser.add_option(DEST_PRIMARY_NODE_OPT)
720   parser.add_option(DEST_SECONDARY_NODE_OPT)
721   parser.add_option(PARALLEL_OPT)
722
723   (options, args) = parser.parse_args()
724
725   return (parser, options, args)
726
727
728 def CheckOptions(parser, options, args):
729   """Checks options and arguments for validity.
730
731   """
732   if len(args) < 3:
733     parser.error("Not enough arguments")
734
735   src_cluster_name = args.pop(0)
736   dest_cluster_name = args.pop(0)
737   instance_names = args
738
739   assert len(instance_names) > 0
740
741   # TODO: Remove once using system default paths for SSL certificate
742   # verification is implemented
743   if not options.src_ca_file:
744     parser.error("Missing source cluster CA file")
745
746   if options.parallel < 1:
747     parser.error("Number of simultaneous moves must be >= 1")
748
749   if not (bool(options.iallocator) ^
750           bool(options.dest_primary_node or options.dest_secondary_node)):
751     parser.error("Destination node and iallocator options exclude each other")
752
753   if len(instance_names) == 1:
754     # Moving one instance only
755     if not (options.iallocator or
756             options.dest_primary_node or
757             options.dest_secondary_node):
758       parser.error("An iallocator or the destination node is required")
759   else:
760     # Moving more than one instance
761     if (options.dest_instance_name or options.dest_primary_node or
762         options.dest_secondary_node):
763       parser.error("The options --dest-instance-name, --dest-primary-node and"
764                    " --dest-secondary-node can only be used when moving exactly"
765                    " one instance")
766
767     if not options.iallocator:
768       parser.error("An iallocator must be specified for moving more than one"
769                    " instance")
770
771   return (src_cluster_name, dest_cluster_name, instance_names)
772
773
774 @rapi.client.UsesRapiClient
775 def main():
776   """Main routine.
777
778   """
779   (parser, options, args) = ParseOptions()
780
781   SetupLogging(options)
782
783   (src_cluster_name, dest_cluster_name, instance_names) = \
784     CheckOptions(parser, options, args)
785
786   logging.info("Source cluster: %s", src_cluster_name)
787   logging.info("Destination cluster: %s", dest_cluster_name)
788   logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
789
790   rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
791
792   CheckRapiSetup(rapi_factory)
793
794   assert (len(instance_names) == 1 or
795           not (options.dest_primary_node or options.dest_secondary_node))
796   assert len(instance_names) == 1 or options.iallocator
797   assert (len(instance_names) > 1 or options.iallocator or
798           options.dest_primary_node or options.dest_secondary_node)
799
800   # Prepare list of instance moves
801   moves = []
802   for src_instance_name in instance_names:
803     if options.dest_instance_name:
804       assert len(instance_names) == 1
805       # Rename instance
806       dest_instance_name = options.dest_instance_name
807     else:
808       dest_instance_name = src_instance_name
809
810     moves.append(InstanceMove(src_instance_name, dest_instance_name,
811                               options.dest_primary_node,
812                               options.dest_secondary_node,
813                               options.iallocator))
814
815   assert len(moves) == len(instance_names)
816
817   # Start workerpool
818   wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
819   try:
820     # Add instance moves to workerpool
821     for move in moves:
822       wp.AddTask(rapi_factory, move)
823
824     # Wait for all moves to finish
825     wp.Quiesce()
826
827   finally:
828     wp.TerminateWorkers()
829
830   # There should be no threads running at this point, hence not using locks
831   # anymore
832
833   logging.info("Instance move results:")
834
835   for move in moves:
836     if move.dest_instance_name == move.src_instance_name:
837       name = move.src_instance_name
838     else:
839       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
840
841     if move.error_message:
842       msg = "Failed (%s)" % move.error_message
843     else:
844       msg = "Success"
845
846     logging.info("%s: %s", name, msg)
847
848   if compat.any(move.error_message for move in moves):
849     sys.exit(constants.EXIT_FAILURE)
850
851   sys.exit(constants.EXIT_SUCCESS)
852
853
854 if __name__ == "__main__":
855   main()