Corrected network design doc regarding user interface
[ganeti-local] / tools / move-instance
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to move instances from one cluster to another.
22
23 """
24
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
27
28 import os
29 import sys
30 import time
31 import logging
32 import optparse
33 import threading
34
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
42
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
46
47
48 SRC_RAPI_PORT_OPT = \
49   cli.cli_option("--src-rapi-port", action="store", type="int",
50                  dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51                  help=("Source cluster RAPI port (defaults to %s)" %
52                        constants.DEFAULT_RAPI_PORT))
53
54 SRC_CA_FILE_OPT = \
55   cli.cli_option("--src-ca-file", action="store", type="string",
56                  dest="src_ca_file",
57                  help=("File containing source cluster Certificate"
58                        " Authority (CA) in PEM format"))
59
60 SRC_USERNAME_OPT = \
61   cli.cli_option("--src-username", action="store", type="string",
62                  dest="src_username", default=None,
63                  help="Source cluster username")
64
65 SRC_PASSWORD_FILE_OPT = \
66   cli.cli_option("--src-password-file", action="store", type="string",
67                  dest="src_password_file",
68                  help="File containing source cluster password")
69
70 DEST_RAPI_PORT_OPT = \
71   cli.cli_option("--dest-rapi-port", action="store", type="int",
72                  dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73                  help=("Destination cluster RAPI port (defaults to source"
74                        " cluster RAPI port)"))
75
76 DEST_CA_FILE_OPT = \
77   cli.cli_option("--dest-ca-file", action="store", type="string",
78                  dest="dest_ca_file",
79                  help=("File containing destination cluster Certificate"
80                        " Authority (CA) in PEM format (defaults to source"
81                        " cluster CA)"))
82
83 DEST_USERNAME_OPT = \
84   cli.cli_option("--dest-username", action="store", type="string",
85                  dest="dest_username", default=None,
86                  help=("Destination cluster username (defaults to"
87                        " source cluster username)"))
88
89 DEST_PASSWORD_FILE_OPT = \
90   cli.cli_option("--dest-password-file", action="store", type="string",
91                  dest="dest_password_file",
92                  help=("File containing destination cluster password"
93                        " (defaults to source cluster password)"))
94
95 DEST_INSTANCE_NAME_OPT = \
96   cli.cli_option("--dest-instance-name", action="store", type="string",
97                  dest="dest_instance_name",
98                  help=("Instance name on destination cluster (only"
99                        " when moving exactly one instance)"))
100
101 DEST_PRIMARY_NODE_OPT = \
102   cli.cli_option("--dest-primary-node", action="store", type="string",
103                  dest="dest_primary_node",
104                  help=("Primary node on destination cluster (only"
105                        " when moving exactly one instance)"))
106
107 DEST_SECONDARY_NODE_OPT = \
108   cli.cli_option("--dest-secondary-node", action="store", type="string",
109                  dest="dest_secondary_node",
110                  help=("Secondary node on destination cluster (only"
111                        " when moving exactly one instance)"))
112
113 PARALLEL_OPT = \
114   cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
115                  dest="parallel", metavar="<number>",
116                  help="Number of instances to be moved simultaneously")
117
118
119 class Error(Exception):
120   """Generic error.
121
122   """
123
124
125 class Abort(Error):
126   """Special exception for aborting import/export.
127
128   """
129
130
131 class RapiClientFactory:
132   """Factory class for creating RAPI clients.
133
134   @ivar src_cluster_name: Source cluster name
135   @ivar dest_cluster_name: Destination cluster name
136   @ivar GetSourceClient: Callable returning new client for source cluster
137   @ivar GetDestClient: Callable returning new client for destination cluster
138
139   """
140   def __init__(self, options, src_cluster_name, dest_cluster_name):
141     """Initializes this class.
142
143     @param options: Program options
144     @type src_cluster_name: string
145     @param src_cluster_name: Source cluster name
146     @type dest_cluster_name: string
147     @param dest_cluster_name: Destination cluster name
148
149     """
150     self.src_cluster_name = src_cluster_name
151     self.dest_cluster_name = dest_cluster_name
152
153     # TODO: Implement timeouts for RAPI connections
154     # TODO: Support for using system default paths for verifying SSL certificate
155     logging.debug("Using '%s' as source CA", options.src_ca_file)
156     src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
157
158     if options.dest_ca_file:
159       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
160       dest_curl_config = \
161         rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
162     else:
163       logging.debug("Using source CA for destination")
164       dest_curl_config = src_curl_config
165
166     logging.debug("Source RAPI server is %s:%s",
167                   src_cluster_name, options.src_rapi_port)
168     logging.debug("Source username is '%s'", options.src_username)
169
170     if options.src_username is None:
171       src_username = ""
172     else:
173       src_username = options.src_username
174
175     if options.src_password_file:
176       logging.debug("Reading '%s' for source password",
177                     options.src_password_file)
178       src_password = utils.ReadOneLineFile(options.src_password_file,
179                                            strict=True)
180     else:
181       logging.debug("Source has no password")
182       src_password = None
183
184     self.GetSourceClient = lambda: \
185       rapi.client.GanetiRapiClient(src_cluster_name,
186                                    port=options.src_rapi_port,
187                                    curl_config_fn=src_curl_config,
188                                    username=src_username,
189                                    password=src_password)
190
191     if options.dest_rapi_port:
192       dest_rapi_port = options.dest_rapi_port
193     else:
194       dest_rapi_port = options.src_rapi_port
195
196     if options.dest_username is None:
197       dest_username = src_username
198     else:
199       dest_username = options.dest_username
200
201     logging.debug("Destination RAPI server is %s:%s",
202                   dest_cluster_name, dest_rapi_port)
203     logging.debug("Destination username is '%s'", dest_username)
204
205     if options.dest_password_file:
206       logging.debug("Reading '%s' for destination password",
207                     options.dest_password_file)
208       dest_password = utils.ReadOneLineFile(options.dest_password_file,
209                                             strict=True)
210     else:
211       logging.debug("Using source password for destination")
212       dest_password = src_password
213
214     self.GetDestClient = lambda: \
215       rapi.client.GanetiRapiClient(dest_cluster_name,
216                                    port=dest_rapi_port,
217                                    curl_config_fn=dest_curl_config,
218                                    username=dest_username,
219                                    password=dest_password)
220
221
222 class MoveJobPollReportCb(cli.JobPollReportCbBase):
223   def __init__(self, abort_check_fn, remote_import_fn):
224     """Initializes this class.
225
226     @type abort_check_fn: callable
227     @param abort_check_fn: Function to check whether move is aborted
228     @type remote_import_fn: callable or None
229     @param remote_import_fn: Callback for reporting received remote import
230                              information
231
232     """
233     cli.JobPollReportCbBase.__init__(self)
234     self._abort_check_fn = abort_check_fn
235     self._remote_import_fn = remote_import_fn
236
237   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
238     """Handles a log message.
239
240     """
241     if log_type == constants.ELOG_REMOTE_IMPORT:
242       logging.debug("Received remote import information")
243
244       if not self._remote_import_fn:
245         raise RuntimeError("Received unexpected remote import information")
246
247       assert "x509_ca" in log_msg
248       assert "disks" in log_msg
249
250       self._remote_import_fn(log_msg)
251
252       return
253
254     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
255                  cli.FormatLogMessage(log_type, log_msg))
256
257   def ReportNotChanged(self, job_id, status):
258     """Called if a job hasn't changed in a while.
259
260     """
261     try:
262       # Check whether we were told to abort by the other thread
263       self._abort_check_fn()
264     except Abort:
265       logging.warning("Aborting despite job %s still running", job_id)
266       raise
267
268
269 class InstanceMove(object):
270   """Status class for instance moves.
271
272   """
273   def __init__(self, src_instance_name, dest_instance_name,
274                dest_pnode, dest_snode, dest_iallocator,
275                hvparams, beparams, osparams, nics):
276     """Initializes this class.
277
278     @type src_instance_name: string
279     @param src_instance_name: Instance name on source cluster
280     @type dest_instance_name: string
281     @param dest_instance_name: Instance name on destination cluster
282     @type dest_pnode: string or None
283     @param dest_pnode: Name of primary node on destination cluster
284     @type dest_snode: string or None
285     @param dest_snode: Name of secondary node on destination cluster
286     @type dest_iallocator: string or None
287     @param dest_iallocator: Name of iallocator to use
288     @type hvparams: dict or None
289     @param hvparams: Hypervisor parameters to override
290     @type beparams: dict or None
291     @param beparams: Backend parameters to override
292     @type osparams: dict or None
293     @param osparams: OS parameters to override
294     @type nics: dict or None
295     @param nics: NICs to override
296
297     """
298     self.src_instance_name = src_instance_name
299     self.dest_instance_name = dest_instance_name
300     self.dest_pnode = dest_pnode
301     self.dest_snode = dest_snode
302     self.dest_iallocator = dest_iallocator
303     self.hvparams = hvparams
304     self.beparams = beparams
305     self.osparams = osparams
306     self.nics = nics
307
308     self.error_message = None
309
310
311 class MoveRuntime(object):
312   """Class to keep track of instance move.
313
314   """
315   def __init__(self, move):
316     """Initializes this class.
317
318     @type move: L{InstanceMove}
319
320     """
321     self.move = move
322
323     # Thread synchronization
324     self.lock = threading.Lock()
325     self.source_to_dest = threading.Condition(self.lock)
326     self.dest_to_source = threading.Condition(self.lock)
327
328     # Source information
329     self.src_error_message = None
330     self.src_expinfo = None
331     self.src_instinfo = None
332
333     # Destination information
334     self.dest_error_message = None
335     self.dest_impinfo = None
336
337   def HandleErrors(self, prefix, fn, *args):
338     """Wrapper to catch errors and abort threads.
339
340     @type prefix: string
341     @param prefix: Variable name prefix ("src" or "dest")
342     @type fn: callable
343     @param fn: Function
344
345     """
346     assert prefix in ("dest", "src")
347
348     try:
349       # Call inner function
350       fn(*args)
351
352       errmsg = None
353     except Abort:
354       errmsg = "Aborted"
355     except Exception, err:
356       logging.exception("Caught unhandled exception")
357       errmsg = str(err)
358
359     setattr(self, "%s_error_message" % prefix, errmsg)
360
361     self.lock.acquire()
362     try:
363       self.source_to_dest.notifyAll()
364       self.dest_to_source.notifyAll()
365     finally:
366       self.lock.release()
367
368   def CheckAbort(self):
369     """Check whether thread should be aborted.
370
371     @raise Abort: When thread should be aborted
372
373     """
374     if not (self.src_error_message is None and
375             self.dest_error_message is None):
376       logging.info("Aborting")
377       raise Abort()
378
379   def Wait(self, cond, check_fn):
380     """Waits for a condition to become true.
381
382     @type cond: threading.Condition
383     @param cond: Threading condition
384     @type check_fn: callable
385     @param check_fn: Function to check whether condition is true
386
387     """
388     cond.acquire()
389     try:
390       while check_fn(self):
391         self.CheckAbort()
392         cond.wait()
393     finally:
394       cond.release()
395
396   def PollJob(self, cl, job_id, remote_import_fn=None):
397     """Wrapper for polling a job.
398
399     @type cl: L{rapi.client.GanetiRapiClient}
400     @param cl: RAPI client
401     @type job_id: string
402     @param job_id: Job ID
403     @type remote_import_fn: callable or None
404     @param remote_import_fn: Callback for reporting received remote import
405                              information
406
407     """
408     return rapi.client_utils.PollJob(cl, job_id,
409                                      MoveJobPollReportCb(self.CheckAbort,
410                                                          remote_import_fn))
411
412
413 class MoveDestExecutor(object):
414   def __init__(self, dest_client, mrt):
415     """Destination side of an instance move.
416
417     @type dest_client: L{rapi.client.GanetiRapiClient}
418     @param dest_client: RAPI client
419     @type mrt: L{MoveRuntime}
420     @param mrt: Instance move runtime information
421
422     """
423     logging.debug("Waiting for instance information to become available")
424     mrt.Wait(mrt.source_to_dest,
425              lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
426
427     logging.info("Creating instance %s in remote-import mode",
428                  mrt.move.dest_instance_name)
429     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
430                                   mrt.move.dest_pnode, mrt.move.dest_snode,
431                                   mrt.move.dest_iallocator,
432                                   mrt.src_instinfo, mrt.src_expinfo,
433                                   mrt.move.hvparams, mrt.move.beparams,
434                                   mrt.move.beparams, mrt.move.nics)
435     mrt.PollJob(dest_client, job_id,
436                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
437
438     logging.info("Import successful")
439
440   @staticmethod
441   def _SetImportInfo(mrt, impinfo):
442     """Sets the remote import information and notifies source thread.
443
444     @type mrt: L{MoveRuntime}
445     @param mrt: Instance move runtime information
446     @param impinfo: Remote import information
447
448     """
449     mrt.dest_to_source.acquire()
450     try:
451       mrt.dest_impinfo = impinfo
452       mrt.dest_to_source.notifyAll()
453     finally:
454       mrt.dest_to_source.release()
455
456   @staticmethod
457   def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
458                       override_hvparams, override_beparams, override_osparams,
459                       override_nics):
460     """Starts the instance creation in remote import mode.
461
462     @type cl: L{rapi.client.GanetiRapiClient}
463     @param cl: RAPI client
464     @type name: string
465     @param name: Instance name
466     @type pnode: string or None
467     @param pnode: Name of primary node on destination cluster
468     @type snode: string or None
469     @param snode: Name of secondary node on destination cluster
470     @type iallocator: string or None
471     @param iallocator: Name of iallocator to use
472     @type instance: dict
473     @param instance: Instance details from source cluster
474     @type expinfo: dict
475     @param expinfo: Prepared export information from source cluster
476     @type override_hvparams: dict or None
477     @param override_hvparams: Hypervisor parameters to override
478     @type override_beparams: dict or None
479     @param override_beparams: Backend parameters to override
480     @type override_osparams: dict or None
481     @param override_osparams: OS parameters to override
482     @type override_nics: dict or None
483     @param override_nics: NICs to override
484     @return: Job ID
485
486     """
487     disk_template = instance["disk_template"]
488
489     disks = [{
490       constants.IDISK_SIZE: i["size"],
491       constants.IDISK_MODE: i["mode"],
492       } for i in instance["disks"]]
493
494     nics = [{
495       constants.INIC_IP: ip,
496       constants.INIC_MAC: mac,
497       constants.INIC_MODE: mode,
498       constants.INIC_LINK: link,
499       constants.INIC_NETWORK: network
500       } for ip, mac, mode, link, network, _ in instance["nics"]]
501
502     if len(override_nics) > len(nics):
503       raise Error("Can not create new NICs")
504
505     if override_nics:
506       assert len(override_nics) <= len(nics)
507       for idx, (nic, override) in enumerate(zip(nics, override_nics)):
508         nics[idx] = objects.FillDict(nic, override)
509
510     # TODO: Should this be the actual up/down status? (run_state)
511     start = (instance["config_state"] == "up")
512
513     assert len(disks) == len(instance["disks"])
514     assert len(nics) == len(instance["nics"])
515
516     inst_beparams = instance["be_instance"]
517     if not inst_beparams:
518       inst_beparams = {}
519
520     inst_hvparams = instance["hv_instance"]
521     if not inst_hvparams:
522       inst_hvparams = {}
523
524     inst_osparams = instance["os_instance"]
525     if not inst_osparams:
526       inst_osparams = {}
527
528     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
529                              name, disk_template, disks, nics,
530                              os=instance["os"],
531                              pnode=pnode,
532                              snode=snode,
533                              start=start,
534                              ip_check=False,
535                              iallocator=iallocator,
536                              hypervisor=instance["hypervisor"],
537                              source_handshake=expinfo["handshake"],
538                              source_x509_ca=expinfo["x509_ca"],
539                              source_instance_name=instance["name"],
540                              beparams=objects.FillDict(inst_beparams,
541                                                        override_beparams),
542                              hvparams=objects.FillDict(inst_hvparams,
543                                                        override_hvparams),
544                              osparams=objects.FillDict(inst_osparams,
545                                                        override_osparams))
546
547
548 class MoveSourceExecutor(object):
549   def __init__(self, src_client, mrt):
550     """Source side of an instance move.
551
552     @type src_client: L{rapi.client.GanetiRapiClient}
553     @param src_client: RAPI client
554     @type mrt: L{MoveRuntime}
555     @param mrt: Instance move runtime information
556
557     """
558     logging.info("Checking whether instance exists")
559     self._CheckInstance(src_client, mrt.move.src_instance_name)
560
561     logging.info("Retrieving instance information from source cluster")
562     instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
563                                      mrt.move.src_instance_name)
564
565     logging.info("Preparing export on source cluster")
566     expinfo = self._PrepareExport(src_client, mrt.PollJob,
567                                   mrt.move.src_instance_name)
568     assert "handshake" in expinfo
569     assert "x509_key_name" in expinfo
570     assert "x509_ca" in expinfo
571
572     # Hand information to destination thread
573     mrt.source_to_dest.acquire()
574     try:
575       mrt.src_instinfo = instinfo
576       mrt.src_expinfo = expinfo
577       mrt.source_to_dest.notifyAll()
578     finally:
579       mrt.source_to_dest.release()
580
581     logging.info("Waiting for destination information to become available")
582     mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
583
584     logging.info("Starting remote export on source cluster")
585     self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
586                          expinfo["x509_key_name"], mrt.dest_impinfo)
587
588     logging.info("Export successful")
589
590   @staticmethod
591   def _CheckInstance(cl, name):
592     """Checks whether the instance exists on the source cluster.
593
594     @type cl: L{rapi.client.GanetiRapiClient}
595     @param cl: RAPI client
596     @type name: string
597     @param name: Instance name
598
599     """
600     try:
601       cl.GetInstance(name)
602     except rapi.client.GanetiApiError, err:
603       if err.code == rapi.client.HTTP_NOT_FOUND:
604         raise Error("Instance %s not found (%s)" % (name, str(err)))
605       raise
606
607   @staticmethod
608   def _GetInstanceInfo(cl, poll_job_fn, name):
609     """Retrieves detailed instance information from source cluster.
610
611     @type cl: L{rapi.client.GanetiRapiClient}
612     @param cl: RAPI client
613     @type poll_job_fn: callable
614     @param poll_job_fn: Function to poll for job result
615     @type name: string
616     @param name: Instance name
617
618     """
619     job_id = cl.GetInstanceInfo(name, static=True)
620     result = poll_job_fn(cl, job_id)
621     assert len(result[0].keys()) == 1
622     return result[0][result[0].keys()[0]]
623
624   @staticmethod
625   def _PrepareExport(cl, poll_job_fn, name):
626     """Prepares export on source cluster.
627
628     @type cl: L{rapi.client.GanetiRapiClient}
629     @param cl: RAPI client
630     @type poll_job_fn: callable
631     @param poll_job_fn: Function to poll for job result
632     @type name: string
633     @param name: Instance name
634
635     """
636     job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
637     return poll_job_fn(cl, job_id)[0]
638
639   @staticmethod
640   def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
641     """Exports instance from source cluster.
642
643     @type cl: L{rapi.client.GanetiRapiClient}
644     @param cl: RAPI client
645     @type poll_job_fn: callable
646     @param poll_job_fn: Function to poll for job result
647     @type name: string
648     @param name: Instance name
649     @param x509_key_name: Source X509 key
650     @param impinfo: Import information from destination cluster
651
652     """
653     job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
654                                impinfo["disks"], shutdown=True,
655                                remove_instance=True,
656                                x509_key_name=x509_key_name,
657                                destination_x509_ca=impinfo["x509_ca"])
658     (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
659
660     if not (fin_resu and compat.all(dresults)):
661       raise Error("Export failed for disks %s" %
662                   utils.CommaJoin(str(idx) for idx, result
663                                   in enumerate(dresults) if not result))
664
665
666 class MoveSourceWorker(workerpool.BaseWorker):
667   def RunTask(self, rapi_factory, move): # pylint: disable=W0221
668     """Executes an instance move.
669
670     @type rapi_factory: L{RapiClientFactory}
671     @param rapi_factory: RAPI client factory
672     @type move: L{InstanceMove}
673     @param move: Instance move information
674
675     """
676     try:
677       logging.info("Preparing to move %s from cluster %s to %s as %s",
678                    move.src_instance_name, rapi_factory.src_cluster_name,
679                    rapi_factory.dest_cluster_name, move.dest_instance_name)
680
681       mrt = MoveRuntime(move)
682
683       logging.debug("Starting destination thread")
684       dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
685                                      target=mrt.HandleErrors,
686                                      args=("dest", MoveDestExecutor,
687                                            rapi_factory.GetDestClient(),
688                                            mrt, ))
689       dest_thread.start()
690       try:
691         mrt.HandleErrors("src", MoveSourceExecutor,
692                          rapi_factory.GetSourceClient(), mrt)
693       finally:
694         dest_thread.join()
695
696       if mrt.src_error_message or mrt.dest_error_message:
697         move.error_message = ("Source error: %s, destination error: %s" %
698                               (mrt.src_error_message, mrt.dest_error_message))
699       else:
700         move.error_message = None
701     except Exception, err: # pylint: disable=W0703
702       logging.exception("Caught unhandled exception")
703       move.error_message = str(err)
704
705
706 def CheckRapiSetup(rapi_factory):
707   """Checks the RAPI setup by retrieving the version.
708
709   @type rapi_factory: L{RapiClientFactory}
710   @param rapi_factory: RAPI client factory
711
712   """
713   src_client = rapi_factory.GetSourceClient()
714   logging.info("Connecting to source RAPI server")
715   logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
716
717   dest_client = rapi_factory.GetDestClient()
718   logging.info("Connecting to destination RAPI server")
719   logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
720
721
722 def SetupLogging(options):
723   """Setting up logging infrastructure.
724
725   @param options: Parsed command line options
726
727   """
728   fmt = "%(asctime)s: %(threadName)s "
729   if options.debug or options.verbose:
730     fmt += "%(levelname)s "
731   fmt += "%(message)s"
732
733   formatter = logging.Formatter(fmt)
734
735   stderr_handler = logging.StreamHandler()
736   stderr_handler.setFormatter(formatter)
737   if options.debug:
738     stderr_handler.setLevel(logging.NOTSET)
739   elif options.verbose:
740     stderr_handler.setLevel(logging.INFO)
741   else:
742     stderr_handler.setLevel(logging.ERROR)
743
744   root_logger = logging.getLogger("")
745   root_logger.setLevel(logging.NOTSET)
746   root_logger.addHandler(stderr_handler)
747
748
749 def ParseOptions():
750   """Parses options passed to program.
751
752   """
753   program = os.path.basename(sys.argv[0])
754
755   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
756                                         " <source-cluster> <dest-cluster>"
757                                         " <instance...>"),
758                                  prog=program)
759   parser.add_option(cli.DEBUG_OPT)
760   parser.add_option(cli.VERBOSE_OPT)
761   parser.add_option(cli.IALLOCATOR_OPT)
762   parser.add_option(cli.BACKEND_OPT)
763   parser.add_option(cli.HVOPTS_OPT)
764   parser.add_option(cli.OSPARAMS_OPT)
765   parser.add_option(cli.NET_OPT)
766   parser.add_option(SRC_RAPI_PORT_OPT)
767   parser.add_option(SRC_CA_FILE_OPT)
768   parser.add_option(SRC_USERNAME_OPT)
769   parser.add_option(SRC_PASSWORD_FILE_OPT)
770   parser.add_option(DEST_RAPI_PORT_OPT)
771   parser.add_option(DEST_CA_FILE_OPT)
772   parser.add_option(DEST_USERNAME_OPT)
773   parser.add_option(DEST_PASSWORD_FILE_OPT)
774   parser.add_option(DEST_INSTANCE_NAME_OPT)
775   parser.add_option(DEST_PRIMARY_NODE_OPT)
776   parser.add_option(DEST_SECONDARY_NODE_OPT)
777   parser.add_option(PARALLEL_OPT)
778
779   (options, args) = parser.parse_args()
780
781   return (parser, options, args)
782
783
784 def CheckOptions(parser, options, args):
785   """Checks options and arguments for validity.
786
787   """
788   if len(args) < 3:
789     parser.error("Not enough arguments")
790
791   src_cluster_name = args.pop(0)
792   dest_cluster_name = args.pop(0)
793   instance_names = args
794
795   assert len(instance_names) > 0
796
797   # TODO: Remove once using system default paths for SSL certificate
798   # verification is implemented
799   if not options.src_ca_file:
800     parser.error("Missing source cluster CA file")
801
802   if options.parallel < 1:
803     parser.error("Number of simultaneous moves must be >= 1")
804
805   if not (bool(options.iallocator) ^
806           bool(options.dest_primary_node or options.dest_secondary_node)):
807     parser.error("Destination node and iallocator options exclude each other")
808
809   if len(instance_names) == 1:
810     # Moving one instance only
811     if not (options.iallocator or
812             options.dest_primary_node or
813             options.dest_secondary_node):
814       parser.error("An iallocator or the destination node is required")
815
816     if options.hvparams:
817       utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
818
819     if options.beparams:
820       utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
821
822     if options.nics:
823       options.nics = cli.ParseNicOption(options.nics)
824   else:
825     # Moving more than one instance
826     if (options.dest_instance_name or options.dest_primary_node or
827         options.dest_secondary_node or options.hvparams or
828         options.beparams or options.osparams or options.nics):
829       parser.error("The options --dest-instance-name, --dest-primary-node,"
830                    " --dest-secondary-node, --hypervisor-parameters,"
831                    " --backend-parameters, --os-parameters and --net can"
832                    " only be used when moving exactly one instance")
833
834     if not options.iallocator:
835       parser.error("An iallocator must be specified for moving more than one"
836                    " instance")
837
838   return (src_cluster_name, dest_cluster_name, instance_names)
839
840
841 @UsesRapiClient
842 def main():
843   """Main routine.
844
845   """
846   (parser, options, args) = ParseOptions()
847
848   SetupLogging(options)
849
850   (src_cluster_name, dest_cluster_name, instance_names) = \
851     CheckOptions(parser, options, args)
852
853   logging.info("Source cluster: %s", src_cluster_name)
854   logging.info("Destination cluster: %s", dest_cluster_name)
855   logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
856
857   rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
858
859   CheckRapiSetup(rapi_factory)
860
861   assert (len(instance_names) == 1 or
862           not (options.dest_primary_node or options.dest_secondary_node))
863   assert len(instance_names) == 1 or options.iallocator
864   assert (len(instance_names) > 1 or options.iallocator or
865           options.dest_primary_node or options.dest_secondary_node)
866   assert (len(instance_names) == 1 or
867           not (options.hvparams or options.beparams or options.osparams or
868                options.nics))
869
870   # Prepare list of instance moves
871   moves = []
872   for src_instance_name in instance_names:
873     if options.dest_instance_name:
874       assert len(instance_names) == 1
875       # Rename instance
876       dest_instance_name = options.dest_instance_name
877     else:
878       dest_instance_name = src_instance_name
879
880     moves.append(InstanceMove(src_instance_name, dest_instance_name,
881                               options.dest_primary_node,
882                               options.dest_secondary_node,
883                               options.iallocator, options.hvparams,
884                               options.beparams, options.osparams,
885                               options.nics))
886
887   assert len(moves) == len(instance_names)
888
889   # Start workerpool
890   wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
891   try:
892     # Add instance moves to workerpool
893     for move in moves:
894       wp.AddTask((rapi_factory, move))
895
896     # Wait for all moves to finish
897     wp.Quiesce()
898
899   finally:
900     wp.TerminateWorkers()
901
902   # There should be no threads running at this point, hence not using locks
903   # anymore
904
905   logging.info("Instance move results:")
906
907   for move in moves:
908     if move.dest_instance_name == move.src_instance_name:
909       name = move.src_instance_name
910     else:
911       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
912
913     if move.error_message:
914       msg = "Failed (%s)" % move.error_message
915     else:
916       msg = "Success"
917
918     logging.info("%s: %s", name, msg)
919
920   if compat.any(move.error_message for move in moves):
921     sys.exit(constants.EXIT_FAILURE)
922
923   sys.exit(constants.EXIT_SUCCESS)
924
925
926 if __name__ == "__main__":
927   main()