Merge branch 'devel-2.5'
[ganeti-local] / tools / move-instance
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to move instances from one cluster to another.
22
23 """
24
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
27
28 import os
29 import sys
30 import time
31 import logging
32 import optparse
33 import threading
34
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
42
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45
46
47 SRC_RAPI_PORT_OPT = \
48   cli.cli_option("--src-rapi-port", action="store", type="int",
49                  dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
50                  help=("Source cluster RAPI port (defaults to %s)" %
51                        constants.DEFAULT_RAPI_PORT))
52
53 SRC_CA_FILE_OPT = \
54   cli.cli_option("--src-ca-file", action="store", type="string",
55                  dest="src_ca_file",
56                  help=("File containing source cluster Certificate"
57                        " Authority (CA) in PEM format"))
58
59 SRC_USERNAME_OPT = \
60   cli.cli_option("--src-username", action="store", type="string",
61                  dest="src_username", default=None,
62                  help="Source cluster username")
63
64 SRC_PASSWORD_FILE_OPT = \
65   cli.cli_option("--src-password-file", action="store", type="string",
66                  dest="src_password_file",
67                  help="File containing source cluster password")
68
69 DEST_RAPI_PORT_OPT = \
70   cli.cli_option("--dest-rapi-port", action="store", type="int",
71                  dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
72                  help=("Destination cluster RAPI port (defaults to source"
73                        " cluster RAPI port)"))
74
75 DEST_CA_FILE_OPT = \
76   cli.cli_option("--dest-ca-file", action="store", type="string",
77                  dest="dest_ca_file",
78                  help=("File containing destination cluster Certificate"
79                        " Authority (CA) in PEM format (defaults to source"
80                        " cluster CA)"))
81
82 DEST_USERNAME_OPT = \
83   cli.cli_option("--dest-username", action="store", type="string",
84                  dest="dest_username", default=None,
85                  help=("Destination cluster username (defaults to"
86                        " source cluster username)"))
87
88 DEST_PASSWORD_FILE_OPT = \
89   cli.cli_option("--dest-password-file", action="store", type="string",
90                  dest="dest_password_file",
91                  help=("File containing destination cluster password"
92                        " (defaults to source cluster password)"))
93
94 DEST_INSTANCE_NAME_OPT = \
95   cli.cli_option("--dest-instance-name", action="store", type="string",
96                  dest="dest_instance_name",
97                  help=("Instance name on destination cluster (only"
98                        " when moving exactly one instance)"))
99
100 DEST_PRIMARY_NODE_OPT = \
101   cli.cli_option("--dest-primary-node", action="store", type="string",
102                  dest="dest_primary_node",
103                  help=("Primary node on destination cluster (only"
104                        " when moving exactly one instance)"))
105
106 DEST_SECONDARY_NODE_OPT = \
107   cli.cli_option("--dest-secondary-node", action="store", type="string",
108                  dest="dest_secondary_node",
109                  help=("Secondary node on destination cluster (only"
110                        " when moving exactly one instance)"))
111
112 PARALLEL_OPT = \
113   cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
114                  dest="parallel", metavar="<number>",
115                  help="Number of instances to be moved simultaneously")
116
117
118 class Error(Exception):
119   """Generic error.
120
121   """
122
123
124 class Abort(Error):
125   """Special exception for aborting import/export.
126
127   """
128
129
130 class RapiClientFactory:
131   """Factory class for creating RAPI clients.
132
133   @ivar src_cluster_name: Source cluster name
134   @ivar dest_cluster_name: Destination cluster name
135   @ivar GetSourceClient: Callable returning new client for source cluster
136   @ivar GetDestClient: Callable returning new client for destination cluster
137
138   """
139   def __init__(self, options, src_cluster_name, dest_cluster_name):
140     """Initializes this class.
141
142     @param options: Program options
143     @type src_cluster_name: string
144     @param src_cluster_name: Source cluster name
145     @type dest_cluster_name: string
146     @param dest_cluster_name: Destination cluster name
147
148     """
149     self.src_cluster_name = src_cluster_name
150     self.dest_cluster_name = dest_cluster_name
151
152     # TODO: Implement timeouts for RAPI connections
153     # TODO: Support for using system default paths for verifying SSL certificate
154     logging.debug("Using '%s' as source CA", options.src_ca_file)
155     src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
156
157     if options.dest_ca_file:
158       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
159       dest_curl_config = \
160         rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
161     else:
162       logging.debug("Using source CA for destination")
163       dest_curl_config = src_curl_config
164
165     logging.debug("Source RAPI server is %s:%s",
166                   src_cluster_name, options.src_rapi_port)
167     logging.debug("Source username is '%s'", options.src_username)
168
169     if options.src_username is None:
170       src_username = ""
171     else:
172       src_username = options.src_username
173
174     if options.src_password_file:
175       logging.debug("Reading '%s' for source password",
176                     options.src_password_file)
177       src_password = utils.ReadOneLineFile(options.src_password_file,
178                                            strict=True)
179     else:
180       logging.debug("Source has no password")
181       src_password = None
182
183     self.GetSourceClient = lambda: \
184       rapi.client.GanetiRapiClient(src_cluster_name,
185                                    port=options.src_rapi_port,
186                                    curl_config_fn=src_curl_config,
187                                    username=src_username,
188                                    password=src_password)
189
190     if options.dest_rapi_port:
191       dest_rapi_port = options.dest_rapi_port
192     else:
193       dest_rapi_port = options.src_rapi_port
194
195     if options.dest_username is None:
196       dest_username = src_username
197     else:
198       dest_username = options.dest_username
199
200     logging.debug("Destination RAPI server is %s:%s",
201                   dest_cluster_name, dest_rapi_port)
202     logging.debug("Destination username is '%s'", dest_username)
203
204     if options.dest_password_file:
205       logging.debug("Reading '%s' for destination password",
206                     options.dest_password_file)
207       dest_password = utils.ReadOneLineFile(options.dest_password_file,
208                                             strict=True)
209     else:
210       logging.debug("Using source password for destination")
211       dest_password = src_password
212
213     self.GetDestClient = lambda: \
214       rapi.client.GanetiRapiClient(dest_cluster_name,
215                                    port=dest_rapi_port,
216                                    curl_config_fn=dest_curl_config,
217                                    username=dest_username,
218                                    password=dest_password)
219
220
221 class MoveJobPollReportCb(cli.JobPollReportCbBase):
222   def __init__(self, abort_check_fn, remote_import_fn):
223     """Initializes this class.
224
225     @type abort_check_fn: callable
226     @param abort_check_fn: Function to check whether move is aborted
227     @type remote_import_fn: callable or None
228     @param remote_import_fn: Callback for reporting received remote import
229                              information
230
231     """
232     cli.JobPollReportCbBase.__init__(self)
233     self._abort_check_fn = abort_check_fn
234     self._remote_import_fn = remote_import_fn
235
236   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
237     """Handles a log message.
238
239     """
240     if log_type == constants.ELOG_REMOTE_IMPORT:
241       logging.debug("Received remote import information")
242
243       if not self._remote_import_fn:
244         raise RuntimeError("Received unexpected remote import information")
245
246       assert "x509_ca" in log_msg
247       assert "disks" in log_msg
248
249       self._remote_import_fn(log_msg)
250
251       return
252
253     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
254                  cli.FormatLogMessage(log_type, log_msg))
255
256   def ReportNotChanged(self, job_id, status):
257     """Called if a job hasn't changed in a while.
258
259     """
260     try:
261       # Check whether we were told to abort by the other thread
262       self._abort_check_fn()
263     except Abort:
264       logging.warning("Aborting despite job %s still running", job_id)
265       raise
266
267
268 class InstanceMove(object):
269   """Status class for instance moves.
270
271   """
272   def __init__(self, src_instance_name, dest_instance_name,
273                dest_pnode, dest_snode, dest_iallocator,
274                hvparams, beparams, osparams, nics):
275     """Initializes this class.
276
277     @type src_instance_name: string
278     @param src_instance_name: Instance name on source cluster
279     @type dest_instance_name: string
280     @param dest_instance_name: Instance name on destination cluster
281     @type dest_pnode: string or None
282     @param dest_pnode: Name of primary node on destination cluster
283     @type dest_snode: string or None
284     @param dest_snode: Name of secondary node on destination cluster
285     @type dest_iallocator: string or None
286     @param dest_iallocator: Name of iallocator to use
287     @type hvparams: dict or None
288     @param hvparams: Hypervisor parameters to override
289     @type beparams: dict or None
290     @param beparams: Backend parameters to override
291     @type osparams: dict or None
292     @param osparams: OS parameters to override
293     @type nics: dict or None
294     @param nics: NICs to override
295
296     """
297     self.src_instance_name = src_instance_name
298     self.dest_instance_name = dest_instance_name
299     self.dest_pnode = dest_pnode
300     self.dest_snode = dest_snode
301     self.dest_iallocator = dest_iallocator
302     self.hvparams = hvparams
303     self.beparams = beparams
304     self.osparams = osparams
305     self.nics = nics
306
307     self.error_message = None
308
309
310 class MoveRuntime(object):
311   """Class to keep track of instance move.
312
313   """
314   def __init__(self, move):
315     """Initializes this class.
316
317     @type move: L{InstanceMove}
318
319     """
320     self.move = move
321
322     # Thread synchronization
323     self.lock = threading.Lock()
324     self.source_to_dest = threading.Condition(self.lock)
325     self.dest_to_source = threading.Condition(self.lock)
326
327     # Source information
328     self.src_error_message = None
329     self.src_expinfo = None
330     self.src_instinfo = None
331
332     # Destination information
333     self.dest_error_message = None
334     self.dest_impinfo = None
335
336   def HandleErrors(self, prefix, fn, *args):
337     """Wrapper to catch errors and abort threads.
338
339     @type prefix: string
340     @param prefix: Variable name prefix ("src" or "dest")
341     @type fn: callable
342     @param fn: Function
343
344     """
345     assert prefix in ("dest", "src")
346
347     try:
348       # Call inner function
349       fn(*args)
350
351       errmsg = None
352     except Abort:
353       errmsg = "Aborted"
354     except Exception, err:
355       logging.exception("Caught unhandled exception")
356       errmsg = str(err)
357
358     setattr(self, "%s_error_message" % prefix, errmsg)
359
360     self.lock.acquire()
361     try:
362       self.source_to_dest.notifyAll()
363       self.dest_to_source.notifyAll()
364     finally:
365       self.lock.release()
366
367   def CheckAbort(self):
368     """Check whether thread should be aborted.
369
370     @raise Abort: When thread should be aborted
371
372     """
373     if not (self.src_error_message is None and
374             self.dest_error_message is None):
375       logging.info("Aborting")
376       raise Abort()
377
378   def Wait(self, cond, check_fn):
379     """Waits for a condition to become true.
380
381     @type cond: threading.Condition
382     @param cond: Threading condition
383     @type check_fn: callable
384     @param check_fn: Function to check whether condition is true
385
386     """
387     cond.acquire()
388     try:
389       while check_fn(self):
390         self.CheckAbort()
391         cond.wait()
392     finally:
393       cond.release()
394
395   def PollJob(self, cl, job_id, remote_import_fn=None):
396     """Wrapper for polling a job.
397
398     @type cl: L{rapi.client.GanetiRapiClient}
399     @param cl: RAPI client
400     @type job_id: string
401     @param job_id: Job ID
402     @type remote_import_fn: callable or None
403     @param remote_import_fn: Callback for reporting received remote import
404                              information
405
406     """
407     return rapi.client_utils.PollJob(cl, job_id,
408                                      MoveJobPollReportCb(self.CheckAbort,
409                                                          remote_import_fn))
410
411
412 class MoveDestExecutor(object):
413   def __init__(self, dest_client, mrt):
414     """Destination side of an instance move.
415
416     @type dest_client: L{rapi.client.GanetiRapiClient}
417     @param dest_client: RAPI client
418     @type mrt: L{MoveRuntime}
419     @param mrt: Instance move runtime information
420
421     """
422     logging.debug("Waiting for instance information to become available")
423     mrt.Wait(mrt.source_to_dest,
424              lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
425
426     logging.info("Creating instance %s in remote-import mode",
427                  mrt.move.dest_instance_name)
428     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
429                                   mrt.move.dest_pnode, mrt.move.dest_snode,
430                                   mrt.move.dest_iallocator,
431                                   mrt.src_instinfo, mrt.src_expinfo,
432                                   mrt.move.hvparams, mrt.move.beparams,
433                                   mrt.move.beparams, mrt.move.nics)
434     mrt.PollJob(dest_client, job_id,
435                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
436
437     logging.info("Import successful")
438
439   @staticmethod
440   def _SetImportInfo(mrt, impinfo):
441     """Sets the remote import information and notifies source thread.
442
443     @type mrt: L{MoveRuntime}
444     @param mrt: Instance move runtime information
445     @param impinfo: Remote import information
446
447     """
448     mrt.dest_to_source.acquire()
449     try:
450       mrt.dest_impinfo = impinfo
451       mrt.dest_to_source.notifyAll()
452     finally:
453       mrt.dest_to_source.release()
454
455   @staticmethod
456   def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
457                       override_hvparams, override_beparams, override_osparams,
458                       override_nics):
459     """Starts the instance creation in remote import mode.
460
461     @type cl: L{rapi.client.GanetiRapiClient}
462     @param cl: RAPI client
463     @type name: string
464     @param name: Instance name
465     @type pnode: string or None
466     @param pnode: Name of primary node on destination cluster
467     @type snode: string or None
468     @param snode: Name of secondary node on destination cluster
469     @type iallocator: string or None
470     @param iallocator: Name of iallocator to use
471     @type instance: dict
472     @param instance: Instance details from source cluster
473     @type expinfo: dict
474     @param expinfo: Prepared export information from source cluster
475     @type override_hvparams: dict or None
476     @param override_hvparams: Hypervisor parameters to override
477     @type override_beparams: dict or None
478     @param override_beparams: Backend parameters to override
479     @type override_osparams: dict or None
480     @param override_osparams: OS parameters to override
481     @type override_nics: dict or None
482     @param override_nics: NICs to override
483     @return: Job ID
484
485     """
486     disk_template = instance["disk_template"]
487
488     disks = [{
489       constants.IDISK_SIZE: i["size"],
490       constants.IDISK_MODE: i["mode"],
491       } for i in instance["disks"]]
492
493     nics = [{
494       constants.INIC_IP: ip,
495       constants.INIC_MAC: mac,
496       constants.INIC_MODE: mode,
497       constants.INIC_LINK: link,
498       } for ip, mac, mode, link in instance["nics"]]
499
500     if len(override_nics) > len(nics):
501       raise Error("Can not create new NICs")
502
503     if override_nics:
504       assert len(override_nics) <= len(nics)
505       for idx, (nic, override) in enumerate(zip(nics, override_nics)):
506         nics[idx] = objects.FillDict(nic, override)
507
508     # TODO: Should this be the actual up/down status? (run_state)
509     start = (instance["config_state"] == "up")
510
511     assert len(disks) == len(instance["disks"])
512     assert len(nics) == len(instance["nics"])
513
514     inst_beparams = instance["be_instance"]
515     if not inst_beparams:
516       inst_beparams = {}
517
518     inst_hvparams = instance["hv_instance"]
519     if not inst_hvparams:
520       inst_hvparams = {}
521
522     inst_osparams = instance["os_instance"]
523     if not inst_osparams:
524       inst_osparams = {}
525
526     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
527                              name, disk_template, disks, nics,
528                              os=instance["os"],
529                              pnode=pnode,
530                              snode=snode,
531                              start=start,
532                              ip_check=False,
533                              iallocator=iallocator,
534                              hypervisor=instance["hypervisor"],
535                              source_handshake=expinfo["handshake"],
536                              source_x509_ca=expinfo["x509_ca"],
537                              source_instance_name=instance["name"],
538                              beparams=objects.FillDict(inst_beparams,
539                                                        override_beparams),
540                              hvparams=objects.FillDict(inst_hvparams,
541                                                        override_hvparams),
542                              osparams=objects.FillDict(inst_osparams,
543                                                        override_osparams))
544
545
546 class MoveSourceExecutor(object):
547   def __init__(self, src_client, mrt):
548     """Source side of an instance move.
549
550     @type src_client: L{rapi.client.GanetiRapiClient}
551     @param src_client: RAPI client
552     @type mrt: L{MoveRuntime}
553     @param mrt: Instance move runtime information
554
555     """
556     logging.info("Checking whether instance exists")
557     self._CheckInstance(src_client, mrt.move.src_instance_name)
558
559     logging.info("Retrieving instance information from source cluster")
560     instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
561                                      mrt.move.src_instance_name)
562
563     logging.info("Preparing export on source cluster")
564     expinfo = self._PrepareExport(src_client, mrt.PollJob,
565                                   mrt.move.src_instance_name)
566     assert "handshake" in expinfo
567     assert "x509_key_name" in expinfo
568     assert "x509_ca" in expinfo
569
570     # Hand information to destination thread
571     mrt.source_to_dest.acquire()
572     try:
573       mrt.src_instinfo = instinfo
574       mrt.src_expinfo = expinfo
575       mrt.source_to_dest.notifyAll()
576     finally:
577       mrt.source_to_dest.release()
578
579     logging.info("Waiting for destination information to become available")
580     mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
581
582     logging.info("Starting remote export on source cluster")
583     self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
584                          expinfo["x509_key_name"], mrt.dest_impinfo)
585
586     logging.info("Export successful")
587
588   @staticmethod
589   def _CheckInstance(cl, name):
590     """Checks whether the instance exists on the source cluster.
591
592     @type cl: L{rapi.client.GanetiRapiClient}
593     @param cl: RAPI client
594     @type name: string
595     @param name: Instance name
596
597     """
598     try:
599       cl.GetInstance(name)
600     except rapi.client.GanetiApiError, err:
601       if err.code == rapi.client.HTTP_NOT_FOUND:
602         raise Error("Instance %s not found (%s)" % (name, str(err)))
603       raise
604
605   @staticmethod
606   def _GetInstanceInfo(cl, poll_job_fn, name):
607     """Retrieves detailed instance information from source cluster.
608
609     @type cl: L{rapi.client.GanetiRapiClient}
610     @param cl: RAPI client
611     @type poll_job_fn: callable
612     @param poll_job_fn: Function to poll for job result
613     @type name: string
614     @param name: Instance name
615
616     """
617     job_id = cl.GetInstanceInfo(name, static=True)
618     result = poll_job_fn(cl, job_id)
619     assert len(result[0].keys()) == 1
620     return result[0][result[0].keys()[0]]
621
622   @staticmethod
623   def _PrepareExport(cl, poll_job_fn, name):
624     """Prepares export on source cluster.
625
626     @type cl: L{rapi.client.GanetiRapiClient}
627     @param cl: RAPI client
628     @type poll_job_fn: callable
629     @param poll_job_fn: Function to poll for job result
630     @type name: string
631     @param name: Instance name
632
633     """
634     job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
635     return poll_job_fn(cl, job_id)[0]
636
637   @staticmethod
638   def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
639     """Exports instance from source cluster.
640
641     @type cl: L{rapi.client.GanetiRapiClient}
642     @param cl: RAPI client
643     @type poll_job_fn: callable
644     @param poll_job_fn: Function to poll for job result
645     @type name: string
646     @param name: Instance name
647     @param x509_key_name: Source X509 key
648     @param impinfo: Import information from destination cluster
649
650     """
651     job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
652                                impinfo["disks"], shutdown=True,
653                                remove_instance=True,
654                                x509_key_name=x509_key_name,
655                                destination_x509_ca=impinfo["x509_ca"])
656     (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
657
658     if not (fin_resu and compat.all(dresults)):
659       raise Error("Export failed for disks %s" %
660                   utils.CommaJoin(str(idx) for idx, result
661                                   in enumerate(dresults) if not result))
662
663
664 class MoveSourceWorker(workerpool.BaseWorker):
665   def RunTask(self, rapi_factory, move): # pylint: disable=W0221
666     """Executes an instance move.
667
668     @type rapi_factory: L{RapiClientFactory}
669     @param rapi_factory: RAPI client factory
670     @type move: L{InstanceMove}
671     @param move: Instance move information
672
673     """
674     try:
675       logging.info("Preparing to move %s from cluster %s to %s as %s",
676                    move.src_instance_name, rapi_factory.src_cluster_name,
677                    rapi_factory.dest_cluster_name, move.dest_instance_name)
678
679       mrt = MoveRuntime(move)
680
681       logging.debug("Starting destination thread")
682       dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
683                                      target=mrt.HandleErrors,
684                                      args=("dest", MoveDestExecutor,
685                                            rapi_factory.GetDestClient(),
686                                            mrt, ))
687       dest_thread.start()
688       try:
689         mrt.HandleErrors("src", MoveSourceExecutor,
690                          rapi_factory.GetSourceClient(), mrt)
691       finally:
692         dest_thread.join()
693
694       if mrt.src_error_message or mrt.dest_error_message:
695         move.error_message = ("Source error: %s, destination error: %s" %
696                               (mrt.src_error_message, mrt.dest_error_message))
697       else:
698         move.error_message = None
699     except Exception, err: # pylint: disable=W0703
700       logging.exception("Caught unhandled exception")
701       move.error_message = str(err)
702
703
704 def CheckRapiSetup(rapi_factory):
705   """Checks the RAPI setup by retrieving the version.
706
707   @type rapi_factory: L{RapiClientFactory}
708   @param rapi_factory: RAPI client factory
709
710   """
711   src_client = rapi_factory.GetSourceClient()
712   logging.info("Connecting to source RAPI server")
713   logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
714
715   dest_client = rapi_factory.GetDestClient()
716   logging.info("Connecting to destination RAPI server")
717   logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
718
719
720 def SetupLogging(options):
721   """Setting up logging infrastructure.
722
723   @param options: Parsed command line options
724
725   """
726   fmt = "%(asctime)s: %(threadName)s "
727   if options.debug or options.verbose:
728     fmt += "%(levelname)s "
729   fmt += "%(message)s"
730
731   formatter = logging.Formatter(fmt)
732
733   stderr_handler = logging.StreamHandler()
734   stderr_handler.setFormatter(formatter)
735   if options.debug:
736     stderr_handler.setLevel(logging.NOTSET)
737   elif options.verbose:
738     stderr_handler.setLevel(logging.INFO)
739   else:
740     stderr_handler.setLevel(logging.ERROR)
741
742   root_logger = logging.getLogger("")
743   root_logger.setLevel(logging.NOTSET)
744   root_logger.addHandler(stderr_handler)
745
746
747 def ParseOptions():
748   """Parses options passed to program.
749
750   """
751   program = os.path.basename(sys.argv[0])
752
753   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
754                                         " <source-cluster> <dest-cluster>"
755                                         " <instance...>"),
756                                  prog=program)
757   parser.add_option(cli.DEBUG_OPT)
758   parser.add_option(cli.VERBOSE_OPT)
759   parser.add_option(cli.IALLOCATOR_OPT)
760   parser.add_option(cli.BACKEND_OPT)
761   parser.add_option(cli.HVOPTS_OPT)
762   parser.add_option(cli.OSPARAMS_OPT)
763   parser.add_option(cli.NET_OPT)
764   parser.add_option(SRC_RAPI_PORT_OPT)
765   parser.add_option(SRC_CA_FILE_OPT)
766   parser.add_option(SRC_USERNAME_OPT)
767   parser.add_option(SRC_PASSWORD_FILE_OPT)
768   parser.add_option(DEST_RAPI_PORT_OPT)
769   parser.add_option(DEST_CA_FILE_OPT)
770   parser.add_option(DEST_USERNAME_OPT)
771   parser.add_option(DEST_PASSWORD_FILE_OPT)
772   parser.add_option(DEST_INSTANCE_NAME_OPT)
773   parser.add_option(DEST_PRIMARY_NODE_OPT)
774   parser.add_option(DEST_SECONDARY_NODE_OPT)
775   parser.add_option(PARALLEL_OPT)
776
777   (options, args) = parser.parse_args()
778
779   return (parser, options, args)
780
781
782 def CheckOptions(parser, options, args):
783   """Checks options and arguments for validity.
784
785   """
786   if len(args) < 3:
787     parser.error("Not enough arguments")
788
789   src_cluster_name = args.pop(0)
790   dest_cluster_name = args.pop(0)
791   instance_names = args
792
793   assert len(instance_names) > 0
794
795   # TODO: Remove once using system default paths for SSL certificate
796   # verification is implemented
797   if not options.src_ca_file:
798     parser.error("Missing source cluster CA file")
799
800   if options.parallel < 1:
801     parser.error("Number of simultaneous moves must be >= 1")
802
803   if not (bool(options.iallocator) ^
804           bool(options.dest_primary_node or options.dest_secondary_node)):
805     parser.error("Destination node and iallocator options exclude each other")
806
807   if len(instance_names) == 1:
808     # Moving one instance only
809     if not (options.iallocator or
810             options.dest_primary_node or
811             options.dest_secondary_node):
812       parser.error("An iallocator or the destination node is required")
813
814     if options.hvparams:
815       utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
816
817     if options.beparams:
818       utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
819
820     if options.nics:
821       options.nics = cli.ParseNicOption(options.nics)
822   else:
823     # Moving more than one instance
824     if (options.dest_instance_name or options.dest_primary_node or
825         options.dest_secondary_node or options.hvparams or
826         options.beparams or options.osparams or options.nics):
827       parser.error("The options --dest-instance-name, --dest-primary-node,"
828                    " --dest-secondary-node, --hypervisor-parameters,"
829                    " --backend-parameters, --os-parameters and --net can"
830                    " only be used when moving exactly one instance")
831
832     if not options.iallocator:
833       parser.error("An iallocator must be specified for moving more than one"
834                    " instance")
835
836   return (src_cluster_name, dest_cluster_name, instance_names)
837
838
839 @rapi.client.UsesRapiClient
840 def main():
841   """Main routine.
842
843   """
844   (parser, options, args) = ParseOptions()
845
846   SetupLogging(options)
847
848   (src_cluster_name, dest_cluster_name, instance_names) = \
849     CheckOptions(parser, options, args)
850
851   logging.info("Source cluster: %s", src_cluster_name)
852   logging.info("Destination cluster: %s", dest_cluster_name)
853   logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
854
855   rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
856
857   CheckRapiSetup(rapi_factory)
858
859   assert (len(instance_names) == 1 or
860           not (options.dest_primary_node or options.dest_secondary_node))
861   assert len(instance_names) == 1 or options.iallocator
862   assert (len(instance_names) > 1 or options.iallocator or
863           options.dest_primary_node or options.dest_secondary_node)
864   assert (len(instance_names) == 1 or
865           not (options.hvparams or options.beparams or options.osparams or
866                options.nics))
867
868   # Prepare list of instance moves
869   moves = []
870   for src_instance_name in instance_names:
871     if options.dest_instance_name:
872       assert len(instance_names) == 1
873       # Rename instance
874       dest_instance_name = options.dest_instance_name
875     else:
876       dest_instance_name = src_instance_name
877
878     moves.append(InstanceMove(src_instance_name, dest_instance_name,
879                               options.dest_primary_node,
880                               options.dest_secondary_node,
881                               options.iallocator, options.hvparams,
882                               options.beparams, options.osparams,
883                               options.nics))
884
885   assert len(moves) == len(instance_names)
886
887   # Start workerpool
888   wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
889   try:
890     # Add instance moves to workerpool
891     for move in moves:
892       wp.AddTask((rapi_factory, move))
893
894     # Wait for all moves to finish
895     wp.Quiesce()
896
897   finally:
898     wp.TerminateWorkers()
899
900   # There should be no threads running at this point, hence not using locks
901   # anymore
902
903   logging.info("Instance move results:")
904
905   for move in moves:
906     if move.dest_instance_name == move.src_instance_name:
907       name = move.src_instance_name
908     else:
909       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
910
911     if move.error_message:
912       msg = "Failed (%s)" % move.error_message
913     else:
914       msg = "Success"
915
916     logging.info("%s: %s", name, msg)
917
918   if compat.any(move.error_message for move in moves):
919     sys.exit(constants.EXIT_FAILURE)
920
921   sys.exit(constants.EXIT_SUCCESS)
922
923
924 if __name__ == "__main__":
925   main()