Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ fc3f75dd

History | View | Annotate | Download (30.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import objects
40
from ganeti import compat
41
from ganeti import rapi
42

    
43
import ganeti.rapi.client # pylint: disable=W0611
44
import ganeti.rapi.client_utils
45
from ganeti.rapi.client import UsesRapiClient
46

    
47

    
48
SRC_RAPI_PORT_OPT = \
49
  cli.cli_option("--src-rapi-port", action="store", type="int",
50
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51
                 help=("Source cluster RAPI port (defaults to %s)" %
52
                       constants.DEFAULT_RAPI_PORT))
53

    
54
SRC_CA_FILE_OPT = \
55
  cli.cli_option("--src-ca-file", action="store", type="string",
56
                 dest="src_ca_file",
57
                 help=("File containing source cluster Certificate"
58
                       " Authority (CA) in PEM format"))
59

    
60
SRC_USERNAME_OPT = \
61
  cli.cli_option("--src-username", action="store", type="string",
62
                 dest="src_username", default=None,
63
                 help="Source cluster username")
64

    
65
SRC_PASSWORD_FILE_OPT = \
66
  cli.cli_option("--src-password-file", action="store", type="string",
67
                 dest="src_password_file",
68
                 help="File containing source cluster password")
69

    
70
DEST_RAPI_PORT_OPT = \
71
  cli.cli_option("--dest-rapi-port", action="store", type="int",
72
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73
                 help=("Destination cluster RAPI port (defaults to source"
74
                       " cluster RAPI port)"))
75

    
76
DEST_CA_FILE_OPT = \
77
  cli.cli_option("--dest-ca-file", action="store", type="string",
78
                 dest="dest_ca_file",
79
                 help=("File containing destination cluster Certificate"
80
                       " Authority (CA) in PEM format (defaults to source"
81
                       " cluster CA)"))
82

    
83
DEST_USERNAME_OPT = \
84
  cli.cli_option("--dest-username", action="store", type="string",
85
                 dest="dest_username", default=None,
86
                 help=("Destination cluster username (defaults to"
87
                       " source cluster username)"))
88

    
89
DEST_PASSWORD_FILE_OPT = \
90
  cli.cli_option("--dest-password-file", action="store", type="string",
91
                 dest="dest_password_file",
92
                 help=("File containing destination cluster password"
93
                       " (defaults to source cluster password)"))
94

    
95
DEST_INSTANCE_NAME_OPT = \
96
  cli.cli_option("--dest-instance-name", action="store", type="string",
97
                 dest="dest_instance_name",
98
                 help=("Instance name on destination cluster (only"
99
                       " when moving exactly one instance)"))
100

    
101
DEST_PRIMARY_NODE_OPT = \
102
  cli.cli_option("--dest-primary-node", action="store", type="string",
103
                 dest="dest_primary_node",
104
                 help=("Primary node on destination cluster (only"
105
                       " when moving exactly one instance)"))
106

    
107
DEST_SECONDARY_NODE_OPT = \
108
  cli.cli_option("--dest-secondary-node", action="store", type="string",
109
                 dest="dest_secondary_node",
110
                 help=("Secondary node on destination cluster (only"
111
                       " when moving exactly one instance)"))
112

    
113
PARALLEL_OPT = \
114
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
115
                 dest="parallel", metavar="<number>",
116
                 help="Number of instances to be moved simultaneously")
117

    
118

    
119
class Error(Exception):
120
  """Generic error.
121

    
122
  """
123

    
124

    
125
class Abort(Error):
126
  """Special exception for aborting import/export.
127

    
128
  """
129

    
130

    
131
class RapiClientFactory:
132
  """Factory class for creating RAPI clients.
133

    
134
  @ivar src_cluster_name: Source cluster name
135
  @ivar dest_cluster_name: Destination cluster name
136
  @ivar GetSourceClient: Callable returning new client for source cluster
137
  @ivar GetDestClient: Callable returning new client for destination cluster
138

    
139
  """
140
  def __init__(self, options, src_cluster_name, dest_cluster_name):
141
    """Initializes this class.
142

    
143
    @param options: Program options
144
    @type src_cluster_name: string
145
    @param src_cluster_name: Source cluster name
146
    @type dest_cluster_name: string
147
    @param dest_cluster_name: Destination cluster name
148

    
149
    """
150
    self.src_cluster_name = src_cluster_name
151
    self.dest_cluster_name = dest_cluster_name
152

    
153
    # TODO: Implement timeouts for RAPI connections
154
    # TODO: Support for using system default paths for verifying SSL certificate
155
    logging.debug("Using '%s' as source CA", options.src_ca_file)
156
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
157

    
158
    if options.dest_ca_file:
159
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
160
      dest_curl_config = \
161
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
162
    else:
163
      logging.debug("Using source CA for destination")
164
      dest_curl_config = src_curl_config
165

    
166
    logging.debug("Source RAPI server is %s:%s",
167
                  src_cluster_name, options.src_rapi_port)
168
    logging.debug("Source username is '%s'", options.src_username)
169

    
170
    if options.src_username is None:
171
      src_username = ""
172
    else:
173
      src_username = options.src_username
174

    
175
    if options.src_password_file:
176
      logging.debug("Reading '%s' for source password",
177
                    options.src_password_file)
178
      src_password = utils.ReadOneLineFile(options.src_password_file,
179
                                           strict=True)
180
    else:
181
      logging.debug("Source has no password")
182
      src_password = None
183

    
184
    self.GetSourceClient = lambda: \
185
      rapi.client.GanetiRapiClient(src_cluster_name,
186
                                   port=options.src_rapi_port,
187
                                   curl_config_fn=src_curl_config,
188
                                   username=src_username,
189
                                   password=src_password)
190

    
191
    if options.dest_rapi_port:
192
      dest_rapi_port = options.dest_rapi_port
193
    else:
194
      dest_rapi_port = options.src_rapi_port
195

    
196
    if options.dest_username is None:
197
      dest_username = src_username
198
    else:
199
      dest_username = options.dest_username
200

    
201
    logging.debug("Destination RAPI server is %s:%s",
202
                  dest_cluster_name, dest_rapi_port)
203
    logging.debug("Destination username is '%s'", dest_username)
204

    
205
    if options.dest_password_file:
206
      logging.debug("Reading '%s' for destination password",
207
                    options.dest_password_file)
208
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
209
                                            strict=True)
210
    else:
211
      logging.debug("Using source password for destination")
212
      dest_password = src_password
213

    
214
    self.GetDestClient = lambda: \
215
      rapi.client.GanetiRapiClient(dest_cluster_name,
216
                                   port=dest_rapi_port,
217
                                   curl_config_fn=dest_curl_config,
218
                                   username=dest_username,
219
                                   password=dest_password)
220

    
221

    
222
class MoveJobPollReportCb(cli.JobPollReportCbBase):
223
  def __init__(self, abort_check_fn, remote_import_fn):
224
    """Initializes this class.
225

    
226
    @type abort_check_fn: callable
227
    @param abort_check_fn: Function to check whether move is aborted
228
    @type remote_import_fn: callable or None
229
    @param remote_import_fn: Callback for reporting received remote import
230
                             information
231

    
232
    """
233
    cli.JobPollReportCbBase.__init__(self)
234
    self._abort_check_fn = abort_check_fn
235
    self._remote_import_fn = remote_import_fn
236

    
237
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
238
    """Handles a log message.
239

    
240
    """
241
    if log_type == constants.ELOG_REMOTE_IMPORT:
242
      logging.debug("Received remote import information")
243

    
244
      if not self._remote_import_fn:
245
        raise RuntimeError("Received unexpected remote import information")
246

    
247
      assert "x509_ca" in log_msg
248
      assert "disks" in log_msg
249

    
250
      self._remote_import_fn(log_msg)
251

    
252
      return
253

    
254
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
255
                 cli.FormatLogMessage(log_type, log_msg))
256

    
257
  def ReportNotChanged(self, job_id, status):
258
    """Called if a job hasn't changed in a while.
259

    
260
    """
261
    try:
262
      # Check whether we were told to abort by the other thread
263
      self._abort_check_fn()
264
    except Abort:
265
      logging.warning("Aborting despite job %s still running", job_id)
266
      raise
267

    
268

    
269
class InstanceMove(object):
270
  """Status class for instance moves.
271

    
272
  """
273
  def __init__(self, src_instance_name, dest_instance_name,
274
               dest_pnode, dest_snode, dest_iallocator,
275
               hvparams, beparams, osparams, nics):
276
    """Initializes this class.
277

    
278
    @type src_instance_name: string
279
    @param src_instance_name: Instance name on source cluster
280
    @type dest_instance_name: string
281
    @param dest_instance_name: Instance name on destination cluster
282
    @type dest_pnode: string or None
283
    @param dest_pnode: Name of primary node on destination cluster
284
    @type dest_snode: string or None
285
    @param dest_snode: Name of secondary node on destination cluster
286
    @type dest_iallocator: string or None
287
    @param dest_iallocator: Name of iallocator to use
288
    @type hvparams: dict or None
289
    @param hvparams: Hypervisor parameters to override
290
    @type beparams: dict or None
291
    @param beparams: Backend parameters to override
292
    @type osparams: dict or None
293
    @param osparams: OS parameters to override
294
    @type nics: dict or None
295
    @param nics: NICs to override
296

    
297
    """
298
    self.src_instance_name = src_instance_name
299
    self.dest_instance_name = dest_instance_name
300
    self.dest_pnode = dest_pnode
301
    self.dest_snode = dest_snode
302
    self.dest_iallocator = dest_iallocator
303
    self.hvparams = hvparams
304
    self.beparams = beparams
305
    self.osparams = osparams
306
    self.nics = nics
307

    
308
    self.error_message = None
309

    
310

    
311
class MoveRuntime(object):
312
  """Class to keep track of instance move.
313

    
314
  """
315
  def __init__(self, move):
316
    """Initializes this class.
317

    
318
    @type move: L{InstanceMove}
319

    
320
    """
321
    self.move = move
322

    
323
    # Thread synchronization
324
    self.lock = threading.Lock()
325
    self.source_to_dest = threading.Condition(self.lock)
326
    self.dest_to_source = threading.Condition(self.lock)
327

    
328
    # Source information
329
    self.src_error_message = None
330
    self.src_expinfo = None
331
    self.src_instinfo = None
332

    
333
    # Destination information
334
    self.dest_error_message = None
335
    self.dest_impinfo = None
336

    
337
  def HandleErrors(self, prefix, fn, *args):
338
    """Wrapper to catch errors and abort threads.
339

    
340
    @type prefix: string
341
    @param prefix: Variable name prefix ("src" or "dest")
342
    @type fn: callable
343
    @param fn: Function
344

    
345
    """
346
    assert prefix in ("dest", "src")
347

    
348
    try:
349
      # Call inner function
350
      fn(*args)
351

    
352
      errmsg = None
353
    except Abort:
354
      errmsg = "Aborted"
355
    except Exception, err:
356
      logging.exception("Caught unhandled exception")
357
      errmsg = str(err)
358

    
359
    setattr(self, "%s_error_message" % prefix, errmsg)
360

    
361
    self.lock.acquire()
362
    try:
363
      self.source_to_dest.notifyAll()
364
      self.dest_to_source.notifyAll()
365
    finally:
366
      self.lock.release()
367

    
368
  def CheckAbort(self):
369
    """Check whether thread should be aborted.
370

    
371
    @raise Abort: When thread should be aborted
372

    
373
    """
374
    if not (self.src_error_message is None and
375
            self.dest_error_message is None):
376
      logging.info("Aborting")
377
      raise Abort()
378

    
379
  def Wait(self, cond, check_fn):
380
    """Waits for a condition to become true.
381

    
382
    @type cond: threading.Condition
383
    @param cond: Threading condition
384
    @type check_fn: callable
385
    @param check_fn: Function to check whether condition is true
386

    
387
    """
388
    cond.acquire()
389
    try:
390
      while check_fn(self):
391
        self.CheckAbort()
392
        cond.wait()
393
    finally:
394
      cond.release()
395

    
396
  def PollJob(self, cl, job_id, remote_import_fn=None):
397
    """Wrapper for polling a job.
398

    
399
    @type cl: L{rapi.client.GanetiRapiClient}
400
    @param cl: RAPI client
401
    @type job_id: string
402
    @param job_id: Job ID
403
    @type remote_import_fn: callable or None
404
    @param remote_import_fn: Callback for reporting received remote import
405
                             information
406

    
407
    """
408
    return rapi.client_utils.PollJob(cl, job_id,
409
                                     MoveJobPollReportCb(self.CheckAbort,
410
                                                         remote_import_fn))
411

    
412

    
413
class MoveDestExecutor(object):
414
  def __init__(self, dest_client, mrt):
415
    """Destination side of an instance move.
416

    
417
    @type dest_client: L{rapi.client.GanetiRapiClient}
418
    @param dest_client: RAPI client
419
    @type mrt: L{MoveRuntime}
420
    @param mrt: Instance move runtime information
421

    
422
    """
423
    logging.debug("Waiting for instance information to become available")
424
    mrt.Wait(mrt.source_to_dest,
425
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
426

    
427
    logging.info("Creating instance %s in remote-import mode",
428
                 mrt.move.dest_instance_name)
429
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
430
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
431
                                  mrt.move.dest_iallocator,
432
                                  mrt.src_instinfo, mrt.src_expinfo,
433
                                  mrt.move.hvparams, mrt.move.beparams,
434
                                  mrt.move.beparams, mrt.move.nics)
435
    mrt.PollJob(dest_client, job_id,
436
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
437

    
438
    logging.info("Import successful")
439

    
440
  @staticmethod
441
  def _SetImportInfo(mrt, impinfo):
442
    """Sets the remote import information and notifies source thread.
443

    
444
    @type mrt: L{MoveRuntime}
445
    @param mrt: Instance move runtime information
446
    @param impinfo: Remote import information
447

    
448
    """
449
    mrt.dest_to_source.acquire()
450
    try:
451
      mrt.dest_impinfo = impinfo
452
      mrt.dest_to_source.notifyAll()
453
    finally:
454
      mrt.dest_to_source.release()
455

    
456
  @staticmethod
457
  def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
458
                      override_hvparams, override_beparams, override_osparams,
459
                      override_nics):
460
    """Starts the instance creation in remote import mode.
461

    
462
    @type cl: L{rapi.client.GanetiRapiClient}
463
    @param cl: RAPI client
464
    @type name: string
465
    @param name: Instance name
466
    @type pnode: string or None
467
    @param pnode: Name of primary node on destination cluster
468
    @type snode: string or None
469
    @param snode: Name of secondary node on destination cluster
470
    @type iallocator: string or None
471
    @param iallocator: Name of iallocator to use
472
    @type instance: dict
473
    @param instance: Instance details from source cluster
474
    @type expinfo: dict
475
    @param expinfo: Prepared export information from source cluster
476
    @type override_hvparams: dict or None
477
    @param override_hvparams: Hypervisor parameters to override
478
    @type override_beparams: dict or None
479
    @param override_beparams: Backend parameters to override
480
    @type override_osparams: dict or None
481
    @param override_osparams: OS parameters to override
482
    @type override_nics: dict or None
483
    @param override_nics: NICs to override
484
    @return: Job ID
485

    
486
    """
487
    disk_template = instance["disk_template"]
488

    
489
    disks = [{
490
      constants.IDISK_SIZE: i["size"],
491
      constants.IDISK_MODE: i["mode"],
492
      } for i in instance["disks"]]
493

    
494
    nics = [{
495
      constants.INIC_IP: ip,
496
      constants.INIC_MAC: mac,
497
      constants.INIC_MODE: mode,
498
      constants.INIC_LINK: link,
499
      } for ip, mac, mode, link in instance["nics"]]
500

    
501
    if len(override_nics) > len(nics):
502
      raise Error("Can not create new NICs")
503

    
504
    if override_nics:
505
      assert len(override_nics) <= len(nics)
506
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
507
        nics[idx] = objects.FillDict(nic, override)
508

    
509
    # TODO: Should this be the actual up/down status? (run_state)
510
    start = (instance["config_state"] == "up")
511

    
512
    assert len(disks) == len(instance["disks"])
513
    assert len(nics) == len(instance["nics"])
514

    
515
    inst_beparams = instance["be_instance"]
516
    if not inst_beparams:
517
      inst_beparams = {}
518

    
519
    inst_hvparams = instance["hv_instance"]
520
    if not inst_hvparams:
521
      inst_hvparams = {}
522

    
523
    inst_osparams = instance["os_instance"]
524
    if not inst_osparams:
525
      inst_osparams = {}
526

    
527
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
528
                             name, disk_template, disks, nics,
529
                             os=instance["os"],
530
                             pnode=pnode,
531
                             snode=snode,
532
                             start=start,
533
                             ip_check=False,
534
                             iallocator=iallocator,
535
                             hypervisor=instance["hypervisor"],
536
                             source_handshake=expinfo["handshake"],
537
                             source_x509_ca=expinfo["x509_ca"],
538
                             source_instance_name=instance["name"],
539
                             beparams=objects.FillDict(inst_beparams,
540
                                                       override_beparams),
541
                             hvparams=objects.FillDict(inst_hvparams,
542
                                                       override_hvparams),
543
                             osparams=objects.FillDict(inst_osparams,
544
                                                       override_osparams))
545

    
546

    
547
class MoveSourceExecutor(object):
548
  def __init__(self, src_client, mrt):
549
    """Source side of an instance move.
550

    
551
    @type src_client: L{rapi.client.GanetiRapiClient}
552
    @param src_client: RAPI client
553
    @type mrt: L{MoveRuntime}
554
    @param mrt: Instance move runtime information
555

    
556
    """
557
    logging.info("Checking whether instance exists")
558
    self._CheckInstance(src_client, mrt.move.src_instance_name)
559

    
560
    logging.info("Retrieving instance information from source cluster")
561
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
562
                                     mrt.move.src_instance_name)
563

    
564
    logging.info("Preparing export on source cluster")
565
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
566
                                  mrt.move.src_instance_name)
567
    assert "handshake" in expinfo
568
    assert "x509_key_name" in expinfo
569
    assert "x509_ca" in expinfo
570

    
571
    # Hand information to destination thread
572
    mrt.source_to_dest.acquire()
573
    try:
574
      mrt.src_instinfo = instinfo
575
      mrt.src_expinfo = expinfo
576
      mrt.source_to_dest.notifyAll()
577
    finally:
578
      mrt.source_to_dest.release()
579

    
580
    logging.info("Waiting for destination information to become available")
581
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
582

    
583
    logging.info("Starting remote export on source cluster")
584
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
585
                         expinfo["x509_key_name"], mrt.dest_impinfo)
586

    
587
    logging.info("Export successful")
588

    
589
  @staticmethod
590
  def _CheckInstance(cl, name):
591
    """Checks whether the instance exists on the source cluster.
592

    
593
    @type cl: L{rapi.client.GanetiRapiClient}
594
    @param cl: RAPI client
595
    @type name: string
596
    @param name: Instance name
597

    
598
    """
599
    try:
600
      cl.GetInstance(name)
601
    except rapi.client.GanetiApiError, err:
602
      if err.code == rapi.client.HTTP_NOT_FOUND:
603
        raise Error("Instance %s not found (%s)" % (name, str(err)))
604
      raise
605

    
606
  @staticmethod
607
  def _GetInstanceInfo(cl, poll_job_fn, name):
608
    """Retrieves detailed instance information from source cluster.
609

    
610
    @type cl: L{rapi.client.GanetiRapiClient}
611
    @param cl: RAPI client
612
    @type poll_job_fn: callable
613
    @param poll_job_fn: Function to poll for job result
614
    @type name: string
615
    @param name: Instance name
616

    
617
    """
618
    job_id = cl.GetInstanceInfo(name, static=True)
619
    result = poll_job_fn(cl, job_id)
620
    assert len(result[0].keys()) == 1
621
    return result[0][result[0].keys()[0]]
622

    
623
  @staticmethod
624
  def _PrepareExport(cl, poll_job_fn, name):
625
    """Prepares export on source cluster.
626

    
627
    @type cl: L{rapi.client.GanetiRapiClient}
628
    @param cl: RAPI client
629
    @type poll_job_fn: callable
630
    @param poll_job_fn: Function to poll for job result
631
    @type name: string
632
    @param name: Instance name
633

    
634
    """
635
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
636
    return poll_job_fn(cl, job_id)[0]
637

    
638
  @staticmethod
639
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
640
    """Exports instance from source cluster.
641

    
642
    @type cl: L{rapi.client.GanetiRapiClient}
643
    @param cl: RAPI client
644
    @type poll_job_fn: callable
645
    @param poll_job_fn: Function to poll for job result
646
    @type name: string
647
    @param name: Instance name
648
    @param x509_key_name: Source X509 key
649
    @param impinfo: Import information from destination cluster
650

    
651
    """
652
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
653
                               impinfo["disks"], shutdown=True,
654
                               remove_instance=True,
655
                               x509_key_name=x509_key_name,
656
                               destination_x509_ca=impinfo["x509_ca"])
657
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
658

    
659
    if not (fin_resu and compat.all(dresults)):
660
      raise Error("Export failed for disks %s" %
661
                  utils.CommaJoin(str(idx) for idx, result
662
                                  in enumerate(dresults) if not result))
663

    
664

    
665
class MoveSourceWorker(workerpool.BaseWorker):
666
  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
667
    """Executes an instance move.
668

    
669
    @type rapi_factory: L{RapiClientFactory}
670
    @param rapi_factory: RAPI client factory
671
    @type move: L{InstanceMove}
672
    @param move: Instance move information
673

    
674
    """
675
    try:
676
      logging.info("Preparing to move %s from cluster %s to %s as %s",
677
                   move.src_instance_name, rapi_factory.src_cluster_name,
678
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
679

    
680
      mrt = MoveRuntime(move)
681

    
682
      logging.debug("Starting destination thread")
683
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
684
                                     target=mrt.HandleErrors,
685
                                     args=("dest", MoveDestExecutor,
686
                                           rapi_factory.GetDestClient(),
687
                                           mrt, ))
688
      dest_thread.start()
689
      try:
690
        mrt.HandleErrors("src", MoveSourceExecutor,
691
                         rapi_factory.GetSourceClient(), mrt)
692
      finally:
693
        dest_thread.join()
694

    
695
      if mrt.src_error_message or mrt.dest_error_message:
696
        move.error_message = ("Source error: %s, destination error: %s" %
697
                              (mrt.src_error_message, mrt.dest_error_message))
698
      else:
699
        move.error_message = None
700
    except Exception, err: # pylint: disable=W0703
701
      logging.exception("Caught unhandled exception")
702
      move.error_message = str(err)
703

    
704

    
705
def CheckRapiSetup(rapi_factory):
706
  """Checks the RAPI setup by retrieving the version.
707

    
708
  @type rapi_factory: L{RapiClientFactory}
709
  @param rapi_factory: RAPI client factory
710

    
711
  """
712
  src_client = rapi_factory.GetSourceClient()
713
  logging.info("Connecting to source RAPI server")
714
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
715

    
716
  dest_client = rapi_factory.GetDestClient()
717
  logging.info("Connecting to destination RAPI server")
718
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
719

    
720

    
721
def SetupLogging(options):
722
  """Setting up logging infrastructure.
723

    
724
  @param options: Parsed command line options
725

    
726
  """
727
  fmt = "%(asctime)s: %(threadName)s "
728
  if options.debug or options.verbose:
729
    fmt += "%(levelname)s "
730
  fmt += "%(message)s"
731

    
732
  formatter = logging.Formatter(fmt)
733

    
734
  stderr_handler = logging.StreamHandler()
735
  stderr_handler.setFormatter(formatter)
736
  if options.debug:
737
    stderr_handler.setLevel(logging.NOTSET)
738
  elif options.verbose:
739
    stderr_handler.setLevel(logging.INFO)
740
  else:
741
    stderr_handler.setLevel(logging.ERROR)
742

    
743
  root_logger = logging.getLogger("")
744
  root_logger.setLevel(logging.NOTSET)
745
  root_logger.addHandler(stderr_handler)
746

    
747

    
748
def ParseOptions():
749
  """Parses options passed to program.
750

    
751
  """
752
  program = os.path.basename(sys.argv[0])
753

    
754
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
755
                                        " <source-cluster> <dest-cluster>"
756
                                        " <instance...>"),
757
                                 prog=program)
758
  parser.add_option(cli.DEBUG_OPT)
759
  parser.add_option(cli.VERBOSE_OPT)
760
  parser.add_option(cli.IALLOCATOR_OPT)
761
  parser.add_option(cli.BACKEND_OPT)
762
  parser.add_option(cli.HVOPTS_OPT)
763
  parser.add_option(cli.OSPARAMS_OPT)
764
  parser.add_option(cli.NET_OPT)
765
  parser.add_option(SRC_RAPI_PORT_OPT)
766
  parser.add_option(SRC_CA_FILE_OPT)
767
  parser.add_option(SRC_USERNAME_OPT)
768
  parser.add_option(SRC_PASSWORD_FILE_OPT)
769
  parser.add_option(DEST_RAPI_PORT_OPT)
770
  parser.add_option(DEST_CA_FILE_OPT)
771
  parser.add_option(DEST_USERNAME_OPT)
772
  parser.add_option(DEST_PASSWORD_FILE_OPT)
773
  parser.add_option(DEST_INSTANCE_NAME_OPT)
774
  parser.add_option(DEST_PRIMARY_NODE_OPT)
775
  parser.add_option(DEST_SECONDARY_NODE_OPT)
776
  parser.add_option(PARALLEL_OPT)
777

    
778
  (options, args) = parser.parse_args()
779

    
780
  return (parser, options, args)
781

    
782

    
783
def CheckOptions(parser, options, args):
784
  """Checks options and arguments for validity.
785

    
786
  """
787
  if len(args) < 3:
788
    parser.error("Not enough arguments")
789

    
790
  src_cluster_name = args.pop(0)
791
  dest_cluster_name = args.pop(0)
792
  instance_names = args
793

    
794
  assert len(instance_names) > 0
795

    
796
  # TODO: Remove once using system default paths for SSL certificate
797
  # verification is implemented
798
  if not options.src_ca_file:
799
    parser.error("Missing source cluster CA file")
800

    
801
  if options.parallel < 1:
802
    parser.error("Number of simultaneous moves must be >= 1")
803

    
804
  if not (bool(options.iallocator) ^
805
          bool(options.dest_primary_node or options.dest_secondary_node)):
806
    parser.error("Destination node and iallocator options exclude each other")
807

    
808
  if len(instance_names) == 1:
809
    # Moving one instance only
810
    if not (options.iallocator or
811
            options.dest_primary_node or
812
            options.dest_secondary_node):
813
      parser.error("An iallocator or the destination node is required")
814

    
815
    if options.hvparams:
816
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
817

    
818
    if options.beparams:
819
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
820

    
821
    if options.nics:
822
      options.nics = cli.ParseNicOption(options.nics)
823
  else:
824
    # Moving more than one instance
825
    if (options.dest_instance_name or options.dest_primary_node or
826
        options.dest_secondary_node or options.hvparams or
827
        options.beparams or options.osparams or options.nics):
828
      parser.error("The options --dest-instance-name, --dest-primary-node,"
829
                   " --dest-secondary-node, --hypervisor-parameters,"
830
                   " --backend-parameters, --os-parameters and --net can"
831
                   " only be used when moving exactly one instance")
832

    
833
    if not options.iallocator:
834
      parser.error("An iallocator must be specified for moving more than one"
835
                   " instance")
836

    
837
  return (src_cluster_name, dest_cluster_name, instance_names)
838

    
839

    
840
@UsesRapiClient
841
def main():
842
  """Main routine.
843

    
844
  """
845
  (parser, options, args) = ParseOptions()
846

    
847
  SetupLogging(options)
848

    
849
  (src_cluster_name, dest_cluster_name, instance_names) = \
850
    CheckOptions(parser, options, args)
851

    
852
  logging.info("Source cluster: %s", src_cluster_name)
853
  logging.info("Destination cluster: %s", dest_cluster_name)
854
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
855

    
856
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
857

    
858
  CheckRapiSetup(rapi_factory)
859

    
860
  assert (len(instance_names) == 1 or
861
          not (options.dest_primary_node or options.dest_secondary_node))
862
  assert len(instance_names) == 1 or options.iallocator
863
  assert (len(instance_names) > 1 or options.iallocator or
864
          options.dest_primary_node or options.dest_secondary_node)
865
  assert (len(instance_names) == 1 or
866
          not (options.hvparams or options.beparams or options.osparams or
867
               options.nics))
868

    
869
  # Prepare list of instance moves
870
  moves = []
871
  for src_instance_name in instance_names:
872
    if options.dest_instance_name:
873
      assert len(instance_names) == 1
874
      # Rename instance
875
      dest_instance_name = options.dest_instance_name
876
    else:
877
      dest_instance_name = src_instance_name
878

    
879
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
880
                              options.dest_primary_node,
881
                              options.dest_secondary_node,
882
                              options.iallocator, options.hvparams,
883
                              options.beparams, options.osparams,
884
                              options.nics))
885

    
886
  assert len(moves) == len(instance_names)
887

    
888
  # Start workerpool
889
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
890
  try:
891
    # Add instance moves to workerpool
892
    for move in moves:
893
      wp.AddTask((rapi_factory, move))
894

    
895
    # Wait for all moves to finish
896
    wp.Quiesce()
897

    
898
  finally:
899
    wp.TerminateWorkers()
900

    
901
  # There should be no threads running at this point, hence not using locks
902
  # anymore
903

    
904
  logging.info("Instance move results:")
905

    
906
  for move in moves:
907
    if move.dest_instance_name == move.src_instance_name:
908
      name = move.src_instance_name
909
    else:
910
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
911

    
912
    if move.error_message:
913
      msg = "Failed (%s)" % move.error_message
914
    else:
915
      msg = "Success"
916

    
917
    logging.info("%s: %s", name, msg)
918

    
919
  if compat.any(move.error_message for move in moves):
920
    sys.exit(constants.EXIT_FAILURE)
921

    
922
  sys.exit(constants.EXIT_SUCCESS)
923

    
924

    
925
if __name__ == "__main__":
926
  main()