Prepare version numbers for 2.10 release cycle
[ganeti-local] / tools / move-instance
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to move instances from one cluster to another.
22
23 """
24
25 # pylint: disable=C0103
26 # C0103: Invalid name move-instance
27
28 import os
29 import sys
30 import time
31 import logging
32 import optparse
33 import threading
34
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti import workerpool
39 from ganeti import objects
40 from ganeti import compat
41 from ganeti import rapi
42
43 import ganeti.rapi.client # pylint: disable=W0611
44 import ganeti.rapi.client_utils
45 from ganeti.rapi.client import UsesRapiClient
46
47
48 SRC_RAPI_PORT_OPT = \
49   cli.cli_option("--src-rapi-port", action="store", type="int",
50                  dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51                  help=("Source cluster RAPI port (defaults to %s)" %
52                        constants.DEFAULT_RAPI_PORT))
53
54 SRC_CA_FILE_OPT = \
55   cli.cli_option("--src-ca-file", action="store", type="string",
56                  dest="src_ca_file",
57                  help=("File containing source cluster Certificate"
58                        " Authority (CA) in PEM format"))
59
60 SRC_USERNAME_OPT = \
61   cli.cli_option("--src-username", action="store", type="string",
62                  dest="src_username", default=None,
63                  help="Source cluster username")
64
65 SRC_PASSWORD_FILE_OPT = \
66   cli.cli_option("--src-password-file", action="store", type="string",
67                  dest="src_password_file",
68                  help="File containing source cluster password")
69
70 DEST_RAPI_PORT_OPT = \
71   cli.cli_option("--dest-rapi-port", action="store", type="int",
72                  dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73                  help=("Destination cluster RAPI port (defaults to source"
74                        " cluster RAPI port)"))
75
76 DEST_CA_FILE_OPT = \
77   cli.cli_option("--dest-ca-file", action="store", type="string",
78                  dest="dest_ca_file",
79                  help=("File containing destination cluster Certificate"
80                        " Authority (CA) in PEM format (defaults to source"
81                        " cluster CA)"))
82
83 DEST_USERNAME_OPT = \
84   cli.cli_option("--dest-username", action="store", type="string",
85                  dest="dest_username", default=None,
86                  help=("Destination cluster username (defaults to"
87                        " source cluster username)"))
88
89 DEST_PASSWORD_FILE_OPT = \
90   cli.cli_option("--dest-password-file", action="store", type="string",
91                  dest="dest_password_file",
92                  help=("File containing destination cluster password"
93                        " (defaults to source cluster password)"))
94
95 DEST_INSTANCE_NAME_OPT = \
96   cli.cli_option("--dest-instance-name", action="store", type="string",
97                  dest="dest_instance_name",
98                  help=("Instance name on destination cluster (only"
99                        " when moving exactly one instance)"))
100
101 DEST_PRIMARY_NODE_OPT = \
102   cli.cli_option("--dest-primary-node", action="store", type="string",
103                  dest="dest_primary_node",
104                  help=("Primary node on destination cluster (only"
105                        " when moving exactly one instance)"))
106
107 DEST_SECONDARY_NODE_OPT = \
108   cli.cli_option("--dest-secondary-node", action="store", type="string",
109                  dest="dest_secondary_node",
110                  help=("Secondary node on destination cluster (only"
111                        " when moving exactly one instance)"))
112
113 DEST_DISK_TEMPLATE_OPT = \
114   cli.cli_option("--dest-disk-template", action="store", type="string",
115                  dest="dest_disk_template", default=None,
116                  help="Disk template to use on destination cluster")
117
118 PARALLEL_OPT = \
119   cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
120                  dest="parallel", metavar="<number>",
121                  help="Number of instances to be moved simultaneously")
122
123
124 class Error(Exception):
125   """Generic error.
126
127   """
128
129
130 class Abort(Error):
131   """Special exception for aborting import/export.
132
133   """
134
135
136 class RapiClientFactory:
137   """Factory class for creating RAPI clients.
138
139   @ivar src_cluster_name: Source cluster name
140   @ivar dest_cluster_name: Destination cluster name
141   @ivar GetSourceClient: Callable returning new client for source cluster
142   @ivar GetDestClient: Callable returning new client for destination cluster
143
144   """
145   def __init__(self, options, src_cluster_name, dest_cluster_name):
146     """Initializes this class.
147
148     @param options: Program options
149     @type src_cluster_name: string
150     @param src_cluster_name: Source cluster name
151     @type dest_cluster_name: string
152     @param dest_cluster_name: Destination cluster name
153
154     """
155     self.src_cluster_name = src_cluster_name
156     self.dest_cluster_name = dest_cluster_name
157
158     # TODO: Implement timeouts for RAPI connections
159     # TODO: Support for using system default paths for verifying SSL certificate
160     logging.debug("Using '%s' as source CA", options.src_ca_file)
161     src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
162
163     if options.dest_ca_file:
164       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
165       dest_curl_config = \
166         rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
167     else:
168       logging.debug("Using source CA for destination")
169       dest_curl_config = src_curl_config
170
171     logging.debug("Source RAPI server is %s:%s",
172                   src_cluster_name, options.src_rapi_port)
173     logging.debug("Source username is '%s'", options.src_username)
174
175     if options.src_username is None:
176       src_username = ""
177     else:
178       src_username = options.src_username
179
180     if options.src_password_file:
181       logging.debug("Reading '%s' for source password",
182                     options.src_password_file)
183       src_password = utils.ReadOneLineFile(options.src_password_file,
184                                            strict=True)
185     else:
186       logging.debug("Source has no password")
187       src_password = None
188
189     self.GetSourceClient = lambda: \
190       rapi.client.GanetiRapiClient(src_cluster_name,
191                                    port=options.src_rapi_port,
192                                    curl_config_fn=src_curl_config,
193                                    username=src_username,
194                                    password=src_password)
195
196     if options.dest_rapi_port:
197       dest_rapi_port = options.dest_rapi_port
198     else:
199       dest_rapi_port = options.src_rapi_port
200
201     if options.dest_username is None:
202       dest_username = src_username
203     else:
204       dest_username = options.dest_username
205
206     logging.debug("Destination RAPI server is %s:%s",
207                   dest_cluster_name, dest_rapi_port)
208     logging.debug("Destination username is '%s'", dest_username)
209
210     if options.dest_password_file:
211       logging.debug("Reading '%s' for destination password",
212                     options.dest_password_file)
213       dest_password = utils.ReadOneLineFile(options.dest_password_file,
214                                             strict=True)
215     else:
216       logging.debug("Using source password for destination")
217       dest_password = src_password
218
219     self.GetDestClient = lambda: \
220       rapi.client.GanetiRapiClient(dest_cluster_name,
221                                    port=dest_rapi_port,
222                                    curl_config_fn=dest_curl_config,
223                                    username=dest_username,
224                                    password=dest_password)
225
226
227 class MoveJobPollReportCb(cli.JobPollReportCbBase):
228   def __init__(self, abort_check_fn, remote_import_fn):
229     """Initializes this class.
230
231     @type abort_check_fn: callable
232     @param abort_check_fn: Function to check whether move is aborted
233     @type remote_import_fn: callable or None
234     @param remote_import_fn: Callback for reporting received remote import
235                              information
236
237     """
238     cli.JobPollReportCbBase.__init__(self)
239     self._abort_check_fn = abort_check_fn
240     self._remote_import_fn = remote_import_fn
241
242   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
243     """Handles a log message.
244
245     """
246     if log_type == constants.ELOG_REMOTE_IMPORT:
247       logging.debug("Received remote import information")
248
249       if not self._remote_import_fn:
250         raise RuntimeError("Received unexpected remote import information")
251
252       assert "x509_ca" in log_msg
253       assert "disks" in log_msg
254
255       self._remote_import_fn(log_msg)
256
257       return
258
259     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
260                  cli.FormatLogMessage(log_type, log_msg))
261
262   def ReportNotChanged(self, job_id, status):
263     """Called if a job hasn't changed in a while.
264
265     """
266     try:
267       # Check whether we were told to abort by the other thread
268       self._abort_check_fn()
269     except Abort:
270       logging.warning("Aborting despite job %s still running", job_id)
271       raise
272
273
274 class InstanceMove(object):
275   """Status class for instance moves.
276
277   """
278   def __init__(self, src_instance_name, dest_instance_name,
279                dest_pnode, dest_snode, dest_iallocator,
280                dest_disk_template, hvparams,
281                beparams, osparams, nics):
282     """Initializes this class.
283
284     @type src_instance_name: string
285     @param src_instance_name: Instance name on source cluster
286     @type dest_instance_name: string
287     @param dest_instance_name: Instance name on destination cluster
288     @type dest_pnode: string or None
289     @param dest_pnode: Name of primary node on destination cluster
290     @type dest_snode: string or None
291     @param dest_snode: Name of secondary node on destination cluster
292     @type dest_iallocator: string or None
293     @param dest_iallocator: Name of iallocator to use
294     @type dest_disk_template: string or None
295     @param dest_disk_template: Disk template to use instead of the original one
296     @type hvparams: dict or None
297     @param hvparams: Hypervisor parameters to override
298     @type beparams: dict or None
299     @param beparams: Backend parameters to override
300     @type osparams: dict or None
301     @param osparams: OS parameters to override
302     @type nics: dict or None
303     @param nics: NICs to override
304
305     """
306     self.src_instance_name = src_instance_name
307     self.dest_instance_name = dest_instance_name
308     self.dest_pnode = dest_pnode
309     self.dest_snode = dest_snode
310     self.dest_iallocator = dest_iallocator
311     self.dest_disk_template = dest_disk_template
312     self.hvparams = hvparams
313     self.beparams = beparams
314     self.osparams = osparams
315     self.nics = nics
316
317     self.error_message = None
318
319
320 class MoveRuntime(object):
321   """Class to keep track of instance move.
322
323   """
324   def __init__(self, move):
325     """Initializes this class.
326
327     @type move: L{InstanceMove}
328
329     """
330     self.move = move
331
332     # Thread synchronization
333     self.lock = threading.Lock()
334     self.source_to_dest = threading.Condition(self.lock)
335     self.dest_to_source = threading.Condition(self.lock)
336
337     # Source information
338     self.src_error_message = None
339     self.src_expinfo = None
340     self.src_instinfo = None
341
342     # Destination information
343     self.dest_error_message = None
344     self.dest_impinfo = None
345
346   def HandleErrors(self, prefix, fn, *args):
347     """Wrapper to catch errors and abort threads.
348
349     @type prefix: string
350     @param prefix: Variable name prefix ("src" or "dest")
351     @type fn: callable
352     @param fn: Function
353
354     """
355     assert prefix in ("dest", "src")
356
357     try:
358       # Call inner function
359       fn(*args)
360
361       errmsg = None
362     except Abort:
363       errmsg = "Aborted"
364     except Exception, err:
365       logging.exception("Caught unhandled exception")
366       errmsg = str(err)
367
368     setattr(self, "%s_error_message" % prefix, errmsg)
369
370     self.lock.acquire()
371     try:
372       self.source_to_dest.notifyAll()
373       self.dest_to_source.notifyAll()
374     finally:
375       self.lock.release()
376
377   def CheckAbort(self):
378     """Check whether thread should be aborted.
379
380     @raise Abort: When thread should be aborted
381
382     """
383     if not (self.src_error_message is None and
384             self.dest_error_message is None):
385       logging.info("Aborting")
386       raise Abort()
387
388   def Wait(self, cond, check_fn):
389     """Waits for a condition to become true.
390
391     @type cond: threading.Condition
392     @param cond: Threading condition
393     @type check_fn: callable
394     @param check_fn: Function to check whether condition is true
395
396     """
397     cond.acquire()
398     try:
399       while check_fn(self):
400         self.CheckAbort()
401         cond.wait()
402     finally:
403       cond.release()
404
405   def PollJob(self, cl, job_id, remote_import_fn=None):
406     """Wrapper for polling a job.
407
408     @type cl: L{rapi.client.GanetiRapiClient}
409     @param cl: RAPI client
410     @type job_id: string
411     @param job_id: Job ID
412     @type remote_import_fn: callable or None
413     @param remote_import_fn: Callback for reporting received remote import
414                              information
415
416     """
417     return rapi.client_utils.PollJob(cl, job_id,
418                                      MoveJobPollReportCb(self.CheckAbort,
419                                                          remote_import_fn))
420
421
422 class MoveDestExecutor(object):
423   def __init__(self, dest_client, mrt):
424     """Destination side of an instance move.
425
426     @type dest_client: L{rapi.client.GanetiRapiClient}
427     @param dest_client: RAPI client
428     @type mrt: L{MoveRuntime}
429     @param mrt: Instance move runtime information
430
431     """
432     logging.debug("Waiting for instance information to become available")
433     mrt.Wait(mrt.source_to_dest,
434              lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
435
436     logging.info("Creating instance %s in remote-import mode",
437                  mrt.move.dest_instance_name)
438     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
439                                   mrt.move.dest_pnode, mrt.move.dest_snode,
440                                   mrt.move.dest_iallocator,
441                                   mrt.move.dest_disk_template,
442                                   mrt.src_instinfo, mrt.src_expinfo,
443                                   mrt.move.hvparams, mrt.move.beparams,
444                                   mrt.move.beparams, mrt.move.nics)
445     mrt.PollJob(dest_client, job_id,
446                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
447
448     logging.info("Import successful")
449
450   @staticmethod
451   def _SetImportInfo(mrt, impinfo):
452     """Sets the remote import information and notifies source thread.
453
454     @type mrt: L{MoveRuntime}
455     @param mrt: Instance move runtime information
456     @param impinfo: Remote import information
457
458     """
459     mrt.dest_to_source.acquire()
460     try:
461       mrt.dest_impinfo = impinfo
462       mrt.dest_to_source.notifyAll()
463     finally:
464       mrt.dest_to_source.release()
465
466   @staticmethod
467   def _CreateInstance(cl, name, pnode, snode, iallocator, dest_disk_template,
468                       instance, expinfo, override_hvparams, override_beparams,
469                       override_osparams, override_nics):
470     """Starts the instance creation in remote import mode.
471
472     @type cl: L{rapi.client.GanetiRapiClient}
473     @param cl: RAPI client
474     @type name: string
475     @param name: Instance name
476     @type pnode: string or None
477     @param pnode: Name of primary node on destination cluster
478     @type snode: string or None
479     @param snode: Name of secondary node on destination cluster
480     @type iallocator: string or None
481     @param iallocator: Name of iallocator to use
482     @type dest_disk_template: string or None
483     @param dest_disk_template: Disk template to use instead of the original one
484     @type instance: dict
485     @param instance: Instance details from source cluster
486     @type expinfo: dict
487     @param expinfo: Prepared export information from source cluster
488     @type override_hvparams: dict or None
489     @param override_hvparams: Hypervisor parameters to override
490     @type override_beparams: dict or None
491     @param override_beparams: Backend parameters to override
492     @type override_osparams: dict or None
493     @param override_osparams: OS parameters to override
494     @type override_nics: dict or None
495     @param override_nics: NICs to override
496     @return: Job ID
497
498     """
499     if dest_disk_template:
500       disk_template = dest_disk_template
501     else:
502       disk_template = instance["disk_template"]
503
504     disks = []
505     for idisk in instance["disks"]:
506       odisk = {
507         constants.IDISK_SIZE: idisk["size"],
508         constants.IDISK_MODE: idisk["mode"],
509         constants.IDISK_NAME: str(idisk.get("name")),
510         }
511       spindles = idisk.get("spindles")
512       if spindles is not None:
513         odisk[constants.IDISK_SPINDLES] = spindles
514       disks.append(odisk)
515
516     nics = [{
517       constants.INIC_IP: ip,
518       constants.INIC_MAC: mac,
519       constants.INIC_MODE: mode,
520       constants.INIC_LINK: link,
521       constants.INIC_NAME: vlan,
522       constants.INIC_NETWORK: network,
523       constants.INIC_NAME: nic_name
524       } for nic_name, _, ip, mac, mode, link, vlan, network, _
525         in instance["nics"]]
526
527     if len(override_nics) > len(nics):
528       raise Error("Can not create new NICs")
529
530     if override_nics:
531       assert len(override_nics) <= len(nics)
532       for idx, (nic, override) in enumerate(zip(nics, override_nics)):
533         nics[idx] = objects.FillDict(nic, override)
534
535     # TODO: Should this be the actual up/down status? (run_state)
536     start = (instance["config_state"] == "up")
537
538     assert len(disks) == len(instance["disks"])
539     assert len(nics) == len(instance["nics"])
540
541     inst_beparams = instance["be_instance"]
542     if not inst_beparams:
543       inst_beparams = {}
544
545     inst_hvparams = instance["hv_instance"]
546     if not inst_hvparams:
547       inst_hvparams = {}
548
549     inst_osparams = instance["os_instance"]
550     if not inst_osparams:
551       inst_osparams = {}
552
553     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
554                              name, disk_template, disks, nics,
555                              os=instance["os"],
556                              pnode=pnode,
557                              snode=snode,
558                              start=start,
559                              ip_check=False,
560                              iallocator=iallocator,
561                              hypervisor=instance["hypervisor"],
562                              source_handshake=expinfo["handshake"],
563                              source_x509_ca=expinfo["x509_ca"],
564                              source_instance_name=instance["name"],
565                              beparams=objects.FillDict(inst_beparams,
566                                                        override_beparams),
567                              hvparams=objects.FillDict(inst_hvparams,
568                                                        override_hvparams),
569                              osparams=objects.FillDict(inst_osparams,
570                                                        override_osparams))
571
572
573 class MoveSourceExecutor(object):
574   def __init__(self, src_client, mrt):
575     """Source side of an instance move.
576
577     @type src_client: L{rapi.client.GanetiRapiClient}
578     @param src_client: RAPI client
579     @type mrt: L{MoveRuntime}
580     @param mrt: Instance move runtime information
581
582     """
583     logging.info("Checking whether instance exists")
584     self._CheckInstance(src_client, mrt.move.src_instance_name)
585
586     logging.info("Retrieving instance information from source cluster")
587     instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
588                                      mrt.move.src_instance_name)
589     if (instinfo["disk_template"] in
590         [constants.DT_FILE, constants.DT_SHARED_FILE]):
591       raise Error("Inter-cluster move of file-based instances is not"
592                   " supported.")
593
594     logging.info("Preparing export on source cluster")
595     expinfo = self._PrepareExport(src_client, mrt.PollJob,
596                                   mrt.move.src_instance_name)
597     assert "handshake" in expinfo
598     assert "x509_key_name" in expinfo
599     assert "x509_ca" in expinfo
600
601     # Hand information to destination thread
602     mrt.source_to_dest.acquire()
603     try:
604       mrt.src_instinfo = instinfo
605       mrt.src_expinfo = expinfo
606       mrt.source_to_dest.notifyAll()
607     finally:
608       mrt.source_to_dest.release()
609
610     logging.info("Waiting for destination information to become available")
611     mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
612
613     logging.info("Starting remote export on source cluster")
614     self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
615                          expinfo["x509_key_name"], mrt.dest_impinfo)
616
617     logging.info("Export successful")
618
619   @staticmethod
620   def _CheckInstance(cl, name):
621     """Checks whether the instance exists on the source cluster.
622
623     @type cl: L{rapi.client.GanetiRapiClient}
624     @param cl: RAPI client
625     @type name: string
626     @param name: Instance name
627
628     """
629     try:
630       cl.GetInstance(name)
631     except rapi.client.GanetiApiError, err:
632       if err.code == rapi.client.HTTP_NOT_FOUND:
633         raise Error("Instance %s not found (%s)" % (name, str(err)))
634       raise
635
636   @staticmethod
637   def _GetInstanceInfo(cl, poll_job_fn, name):
638     """Retrieves detailed instance information from source cluster.
639
640     @type cl: L{rapi.client.GanetiRapiClient}
641     @param cl: RAPI client
642     @type poll_job_fn: callable
643     @param poll_job_fn: Function to poll for job result
644     @type name: string
645     @param name: Instance name
646
647     """
648     job_id = cl.GetInstanceInfo(name, static=True)
649     result = poll_job_fn(cl, job_id)
650     assert len(result[0].keys()) == 1
651     return result[0][result[0].keys()[0]]
652
653   @staticmethod
654   def _PrepareExport(cl, poll_job_fn, name):
655     """Prepares export on source cluster.
656
657     @type cl: L{rapi.client.GanetiRapiClient}
658     @param cl: RAPI client
659     @type poll_job_fn: callable
660     @param poll_job_fn: Function to poll for job result
661     @type name: string
662     @param name: Instance name
663
664     """
665     job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
666     return poll_job_fn(cl, job_id)[0]
667
668   @staticmethod
669   def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
670     """Exports instance from source cluster.
671
672     @type cl: L{rapi.client.GanetiRapiClient}
673     @param cl: RAPI client
674     @type poll_job_fn: callable
675     @param poll_job_fn: Function to poll for job result
676     @type name: string
677     @param name: Instance name
678     @param x509_key_name: Source X509 key
679     @param impinfo: Import information from destination cluster
680
681     """
682     job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
683                                impinfo["disks"], shutdown=True,
684                                remove_instance=True,
685                                x509_key_name=x509_key_name,
686                                destination_x509_ca=impinfo["x509_ca"])
687     (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
688
689     if not (fin_resu and compat.all(dresults)):
690       raise Error("Export failed for disks %s" %
691                   utils.CommaJoin(str(idx) for idx, result
692                                   in enumerate(dresults) if not result))
693
694
695 class MoveSourceWorker(workerpool.BaseWorker):
696   def RunTask(self, rapi_factory, move): # pylint: disable=W0221
697     """Executes an instance move.
698
699     @type rapi_factory: L{RapiClientFactory}
700     @param rapi_factory: RAPI client factory
701     @type move: L{InstanceMove}
702     @param move: Instance move information
703
704     """
705     try:
706       logging.info("Preparing to move %s from cluster %s to %s as %s",
707                    move.src_instance_name, rapi_factory.src_cluster_name,
708                    rapi_factory.dest_cluster_name, move.dest_instance_name)
709
710       mrt = MoveRuntime(move)
711
712       logging.debug("Starting destination thread")
713       dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
714                                      target=mrt.HandleErrors,
715                                      args=("dest", MoveDestExecutor,
716                                            rapi_factory.GetDestClient(),
717                                            mrt, ))
718       dest_thread.start()
719       try:
720         mrt.HandleErrors("src", MoveSourceExecutor,
721                          rapi_factory.GetSourceClient(), mrt)
722       finally:
723         dest_thread.join()
724
725       if mrt.src_error_message or mrt.dest_error_message:
726         move.error_message = ("Source error: %s, destination error: %s" %
727                               (mrt.src_error_message, mrt.dest_error_message))
728       else:
729         move.error_message = None
730     except Exception, err: # pylint: disable=W0703
731       logging.exception("Caught unhandled exception")
732       move.error_message = str(err)
733
734
735 def CheckRapiSetup(rapi_factory):
736   """Checks the RAPI setup by retrieving the version.
737
738   @type rapi_factory: L{RapiClientFactory}
739   @param rapi_factory: RAPI client factory
740
741   """
742   src_client = rapi_factory.GetSourceClient()
743   logging.info("Connecting to source RAPI server")
744   logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
745
746   dest_client = rapi_factory.GetDestClient()
747   logging.info("Connecting to destination RAPI server")
748   logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
749
750
751 def ParseOptions():
752   """Parses options passed to program.
753
754   """
755   program = os.path.basename(sys.argv[0])
756
757   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
758                                         " <source-cluster> <dest-cluster>"
759                                         " <instance...>"),
760                                  prog=program)
761   parser.add_option(cli.DEBUG_OPT)
762   parser.add_option(cli.VERBOSE_OPT)
763   parser.add_option(cli.IALLOCATOR_OPT)
764   parser.add_option(cli.BACKEND_OPT)
765   parser.add_option(cli.HVOPTS_OPT)
766   parser.add_option(cli.OSPARAMS_OPT)
767   parser.add_option(cli.NET_OPT)
768   parser.add_option(SRC_RAPI_PORT_OPT)
769   parser.add_option(SRC_CA_FILE_OPT)
770   parser.add_option(SRC_USERNAME_OPT)
771   parser.add_option(SRC_PASSWORD_FILE_OPT)
772   parser.add_option(DEST_RAPI_PORT_OPT)
773   parser.add_option(DEST_CA_FILE_OPT)
774   parser.add_option(DEST_USERNAME_OPT)
775   parser.add_option(DEST_PASSWORD_FILE_OPT)
776   parser.add_option(DEST_INSTANCE_NAME_OPT)
777   parser.add_option(DEST_PRIMARY_NODE_OPT)
778   parser.add_option(DEST_SECONDARY_NODE_OPT)
779   parser.add_option(DEST_DISK_TEMPLATE_OPT)
780   parser.add_option(PARALLEL_OPT)
781
782   (options, args) = parser.parse_args()
783
784   return (parser, options, args)
785
786
787 def CheckOptions(parser, options, args):
788   """Checks options and arguments for validity.
789
790   """
791   if len(args) < 3:
792     parser.error("Not enough arguments")
793
794   src_cluster_name = args.pop(0)
795   dest_cluster_name = args.pop(0)
796   instance_names = args
797
798   assert len(instance_names) > 0
799
800   # TODO: Remove once using system default paths for SSL certificate
801   # verification is implemented
802   if not options.src_ca_file:
803     parser.error("Missing source cluster CA file")
804
805   if options.parallel < 1:
806     parser.error("Number of simultaneous moves must be >= 1")
807
808   if (bool(options.iallocator) and
809       bool(options.dest_primary_node or options.dest_secondary_node)):
810     parser.error("Destination node and iallocator options exclude each other")
811
812   if len(instance_names) == 1:
813     # Moving one instance only
814     if options.hvparams:
815       utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
816
817     if options.beparams:
818       utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
819
820     if options.nics:
821       options.nics = cli.ParseNicOption(options.nics)
822   else:
823     # Moving more than one instance
824     if (options.dest_instance_name or options.dest_primary_node or
825         options.dest_secondary_node or options.hvparams or
826         options.beparams or options.osparams or options.nics):
827       parser.error("The options --dest-instance-name, --dest-primary-node,"
828                    " --dest-secondary-node, --hypervisor-parameters,"
829                    " --backend-parameters, --os-parameters and --net can"
830                    " only be used when moving exactly one instance")
831
832   return (src_cluster_name, dest_cluster_name, instance_names)
833
834
835 def DestClusterHasDefaultIAllocator(rapi_factory):
836   """Determines if a given cluster has a default iallocator.
837
838   """
839   result = rapi_factory.GetDestClient().GetInfo()
840   ia_name = "default_iallocator"
841   return ia_name in result and result[ia_name]
842
843
844 def ExitWithError(message):
845   """Exits after an error and shows a message.
846
847   """
848   sys.stderr.write("move-instance: error: " + message + "\n")
849   sys.exit(constants.EXIT_FAILURE)
850
851
852 @UsesRapiClient
853 def main():
854   """Main routine.
855
856   """
857   (parser, options, args) = ParseOptions()
858
859   utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
860
861   (src_cluster_name, dest_cluster_name, instance_names) = \
862     CheckOptions(parser, options, args)
863
864   logging.info("Source cluster: %s", src_cluster_name)
865   logging.info("Destination cluster: %s", dest_cluster_name)
866   logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
867
868   rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
869
870   CheckRapiSetup(rapi_factory)
871
872   has_iallocator = options.iallocator or \
873                    DestClusterHasDefaultIAllocator(rapi_factory)
874
875   if len(instance_names) > 1 and not has_iallocator:
876     ExitWithError("When moving multiple nodes, an iallocator must be used. "
877                   "None was provided and the target cluster does not have "
878                   "a default iallocator.")
879   if (len(instance_names) == 1 and not (has_iallocator or
880       options.dest_primary_node or options.dest_secondary_node)):
881     ExitWithError("Target cluster does not have a default iallocator, "
882                   "please specify either destination nodes or an iallocator.")
883
884   # Prepare list of instance moves
885   moves = []
886   for src_instance_name in instance_names:
887     if options.dest_instance_name:
888       assert len(instance_names) == 1
889       # Rename instance
890       dest_instance_name = options.dest_instance_name
891     else:
892       dest_instance_name = src_instance_name
893
894     moves.append(InstanceMove(src_instance_name, dest_instance_name,
895                               options.dest_primary_node,
896                               options.dest_secondary_node,
897                               options.iallocator,
898                               options.dest_disk_template,
899                               options.hvparams,
900                               options.beparams,
901                               options.osparams,
902                               options.nics))
903
904   assert len(moves) == len(instance_names)
905
906   # Start workerpool
907   wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
908   try:
909     # Add instance moves to workerpool
910     for move in moves:
911       wp.AddTask((rapi_factory, move))
912
913     # Wait for all moves to finish
914     wp.Quiesce()
915
916   finally:
917     wp.TerminateWorkers()
918
919   # There should be no threads running at this point, hence not using locks
920   # anymore
921
922   logging.info("Instance move results:")
923
924   for move in moves:
925     if move.dest_instance_name == move.src_instance_name:
926       name = move.src_instance_name
927     else:
928       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
929
930     if move.error_message:
931       msg = "Failed (%s)" % move.error_message
932     else:
933       msg = "Success"
934
935     logging.info("%s: %s", name, msg)
936
937   if compat.any(move.error_message for move in moves):
938     sys.exit(constants.EXIT_FAILURE)
939
940   sys.exit(constants.EXIT_SUCCESS)
941
942
943 if __name__ == "__main__":
944   main()