Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ 65b526e7

History | View | Annotate | Download (32.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import objects
40
from ganeti import compat
41
from ganeti import rapi
42

    
43
import ganeti.rapi.client # pylint: disable=W0611
44
import ganeti.rapi.client_utils
45
from ganeti.rapi.client import UsesRapiClient
46

    
47

    
48
SRC_RAPI_PORT_OPT = \
49
  cli.cli_option("--src-rapi-port", action="store", type="int",
50
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51
                 help=("Source cluster RAPI port (defaults to %s)" %
52
                       constants.DEFAULT_RAPI_PORT))
53

    
54
SRC_CA_FILE_OPT = \
55
  cli.cli_option("--src-ca-file", action="store", type="string",
56
                 dest="src_ca_file",
57
                 help=("File containing source cluster Certificate"
58
                       " Authority (CA) in PEM format"))
59

    
60
SRC_USERNAME_OPT = \
61
  cli.cli_option("--src-username", action="store", type="string",
62
                 dest="src_username", default=None,
63
                 help="Source cluster username")
64

    
65
SRC_PASSWORD_FILE_OPT = \
66
  cli.cli_option("--src-password-file", action="store", type="string",
67
                 dest="src_password_file",
68
                 help="File containing source cluster password")
69

    
70
DEST_RAPI_PORT_OPT = \
71
  cli.cli_option("--dest-rapi-port", action="store", type="int",
72
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73
                 help=("Destination cluster RAPI port (defaults to source"
74
                       " cluster RAPI port)"))
75

    
76
DEST_CA_FILE_OPT = \
77
  cli.cli_option("--dest-ca-file", action="store", type="string",
78
                 dest="dest_ca_file",
79
                 help=("File containing destination cluster Certificate"
80
                       " Authority (CA) in PEM format (defaults to source"
81
                       " cluster CA)"))
82

    
83
DEST_USERNAME_OPT = \
84
  cli.cli_option("--dest-username", action="store", type="string",
85
                 dest="dest_username", default=None,
86
                 help=("Destination cluster username (defaults to"
87
                       " source cluster username)"))
88

    
89
DEST_PASSWORD_FILE_OPT = \
90
  cli.cli_option("--dest-password-file", action="store", type="string",
91
                 dest="dest_password_file",
92
                 help=("File containing destination cluster password"
93
                       " (defaults to source cluster password)"))
94

    
95
DEST_INSTANCE_NAME_OPT = \
96
  cli.cli_option("--dest-instance-name", action="store", type="string",
97
                 dest="dest_instance_name",
98
                 help=("Instance name on destination cluster (only"
99
                       " when moving exactly one instance)"))
100

    
101
DEST_PRIMARY_NODE_OPT = \
102
  cli.cli_option("--dest-primary-node", action="store", type="string",
103
                 dest="dest_primary_node",
104
                 help=("Primary node on destination cluster (only"
105
                       " when moving exactly one instance)"))
106

    
107
DEST_SECONDARY_NODE_OPT = \
108
  cli.cli_option("--dest-secondary-node", action="store", type="string",
109
                 dest="dest_secondary_node",
110
                 help=("Secondary node on destination cluster (only"
111
                       " when moving exactly one instance)"))
112

    
113
DEST_DISK_TEMPLATE_OPT = \
114
  cli.cli_option("--dest-disk-template", action="store", type="string",
115
                 dest="dest_disk_template", default=None,
116
                 help="Disk template to use on destination cluster")
117

    
118
COMPRESS_OPT = \
119
  cli.cli_option("--compress", action="store", type="string",
120
                 dest="compress", default="none",
121
                 help="Compression mode to use during the move (this mode has"
122
                      " to be supported by both clusters)")
123

    
124
PARALLEL_OPT = \
125
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
126
                 dest="parallel", metavar="<number>",
127
                 help="Number of instances to be moved simultaneously")
128

    
129

    
130
class Error(Exception):
131
  """Generic error.
132

    
133
  """
134

    
135

    
136
class Abort(Error):
137
  """Special exception for aborting import/export.
138

    
139
  """
140

    
141

    
142
class RapiClientFactory:
143
  """Factory class for creating RAPI clients.
144

    
145
  @ivar src_cluster_name: Source cluster name
146
  @ivar dest_cluster_name: Destination cluster name
147
  @ivar GetSourceClient: Callable returning new client for source cluster
148
  @ivar GetDestClient: Callable returning new client for destination cluster
149

    
150
  """
151
  def __init__(self, options, src_cluster_name, dest_cluster_name):
152
    """Initializes this class.
153

    
154
    @param options: Program options
155
    @type src_cluster_name: string
156
    @param src_cluster_name: Source cluster name
157
    @type dest_cluster_name: string
158
    @param dest_cluster_name: Destination cluster name
159

    
160
    """
161
    self.src_cluster_name = src_cluster_name
162
    self.dest_cluster_name = dest_cluster_name
163

    
164
    # TODO: Implement timeouts for RAPI connections
165
    # TODO: Support for using system default paths for verifying SSL certificate
166
    logging.debug("Using '%s' as source CA", options.src_ca_file)
167
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
168

    
169
    if options.dest_ca_file:
170
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
171
      dest_curl_config = \
172
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
173
    else:
174
      logging.debug("Using source CA for destination")
175
      dest_curl_config = src_curl_config
176

    
177
    logging.debug("Source RAPI server is %s:%s",
178
                  src_cluster_name, options.src_rapi_port)
179
    logging.debug("Source username is '%s'", options.src_username)
180

    
181
    if options.src_username is None:
182
      src_username = ""
183
    else:
184
      src_username = options.src_username
185

    
186
    if options.src_password_file:
187
      logging.debug("Reading '%s' for source password",
188
                    options.src_password_file)
189
      src_password = utils.ReadOneLineFile(options.src_password_file,
190
                                           strict=True)
191
    else:
192
      logging.debug("Source has no password")
193
      src_password = None
194

    
195
    self.GetSourceClient = lambda: \
196
      rapi.client.GanetiRapiClient(src_cluster_name,
197
                                   port=options.src_rapi_port,
198
                                   curl_config_fn=src_curl_config,
199
                                   username=src_username,
200
                                   password=src_password)
201

    
202
    if options.dest_rapi_port:
203
      dest_rapi_port = options.dest_rapi_port
204
    else:
205
      dest_rapi_port = options.src_rapi_port
206

    
207
    if options.dest_username is None:
208
      dest_username = src_username
209
    else:
210
      dest_username = options.dest_username
211

    
212
    logging.debug("Destination RAPI server is %s:%s",
213
                  dest_cluster_name, dest_rapi_port)
214
    logging.debug("Destination username is '%s'", dest_username)
215

    
216
    if options.dest_password_file:
217
      logging.debug("Reading '%s' for destination password",
218
                    options.dest_password_file)
219
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
220
                                            strict=True)
221
    else:
222
      logging.debug("Using source password for destination")
223
      dest_password = src_password
224

    
225
    self.GetDestClient = lambda: \
226
      rapi.client.GanetiRapiClient(dest_cluster_name,
227
                                   port=dest_rapi_port,
228
                                   curl_config_fn=dest_curl_config,
229
                                   username=dest_username,
230
                                   password=dest_password)
231

    
232

    
233
class MoveJobPollReportCb(cli.JobPollReportCbBase):
234
  def __init__(self, abort_check_fn, remote_import_fn):
235
    """Initializes this class.
236

    
237
    @type abort_check_fn: callable
238
    @param abort_check_fn: Function to check whether move is aborted
239
    @type remote_import_fn: callable or None
240
    @param remote_import_fn: Callback for reporting received remote import
241
                             information
242

    
243
    """
244
    cli.JobPollReportCbBase.__init__(self)
245
    self._abort_check_fn = abort_check_fn
246
    self._remote_import_fn = remote_import_fn
247

    
248
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
249
    """Handles a log message.
250

    
251
    """
252
    if log_type == constants.ELOG_REMOTE_IMPORT:
253
      logging.debug("Received remote import information")
254

    
255
      if not self._remote_import_fn:
256
        raise RuntimeError("Received unexpected remote import information")
257

    
258
      assert "x509_ca" in log_msg
259
      assert "disks" in log_msg
260

    
261
      self._remote_import_fn(log_msg)
262

    
263
      return
264

    
265
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
266
                 cli.FormatLogMessage(log_type, log_msg))
267

    
268
  def ReportNotChanged(self, job_id, status):
269
    """Called if a job hasn't changed in a while.
270

    
271
    """
272
    try:
273
      # Check whether we were told to abort by the other thread
274
      self._abort_check_fn()
275
    except Abort:
276
      logging.warning("Aborting despite job %s still running", job_id)
277
      raise
278

    
279

    
280
class InstanceMove(object):
281
  """Status class for instance moves.
282

    
283
  """
284
  def __init__(self, src_instance_name, dest_instance_name,
285
               dest_pnode, dest_snode, compress, dest_iallocator,
286
               dest_disk_template, hvparams,
287
               beparams, osparams, nics):
288
    """Initializes this class.
289

    
290
    @type src_instance_name: string
291
    @param src_instance_name: Instance name on source cluster
292
    @type dest_instance_name: string
293
    @param dest_instance_name: Instance name on destination cluster
294
    @type dest_pnode: string or None
295
    @param dest_pnode: Name of primary node on destination cluster
296
    @type dest_snode: string or None
297
    @param dest_snode: Name of secondary node on destination cluster
298
    @type compress; string
299
    @param compress: Compression mode to use (has to be supported on both
300
                     clusters)
301
    @type dest_iallocator: string or None
302
    @param dest_iallocator: Name of iallocator to use
303
    @type dest_disk_template: string or None
304
    @param dest_disk_template: Disk template to use instead of the original one
305
    @type hvparams: dict or None
306
    @param hvparams: Hypervisor parameters to override
307
    @type beparams: dict or None
308
    @param beparams: Backend parameters to override
309
    @type osparams: dict or None
310
    @param osparams: OS parameters to override
311
    @type nics: dict or None
312
    @param nics: NICs to override
313

    
314
    """
315
    self.src_instance_name = src_instance_name
316
    self.dest_instance_name = dest_instance_name
317
    self.dest_pnode = dest_pnode
318
    self.dest_snode = dest_snode
319
    self.compress = compress
320
    self.dest_iallocator = dest_iallocator
321
    self.dest_disk_template = dest_disk_template
322
    self.hvparams = hvparams
323
    self.beparams = beparams
324
    self.osparams = osparams
325
    self.nics = nics
326

    
327
    self.error_message = None
328

    
329

    
330
class MoveRuntime(object):
331
  """Class to keep track of instance move.
332

    
333
  """
334
  def __init__(self, move):
335
    """Initializes this class.
336

    
337
    @type move: L{InstanceMove}
338

    
339
    """
340
    self.move = move
341

    
342
    # Thread synchronization
343
    self.lock = threading.Lock()
344
    self.source_to_dest = threading.Condition(self.lock)
345
    self.dest_to_source = threading.Condition(self.lock)
346

    
347
    # Source information
348
    self.src_error_message = None
349
    self.src_expinfo = None
350
    self.src_instinfo = None
351

    
352
    # Destination information
353
    self.dest_error_message = None
354
    self.dest_impinfo = None
355

    
356
  def HandleErrors(self, prefix, fn, *args):
357
    """Wrapper to catch errors and abort threads.
358

    
359
    @type prefix: string
360
    @param prefix: Variable name prefix ("src" or "dest")
361
    @type fn: callable
362
    @param fn: Function
363

    
364
    """
365
    assert prefix in ("dest", "src")
366

    
367
    try:
368
      # Call inner function
369
      fn(*args)
370

    
371
      errmsg = None
372
    except Abort:
373
      errmsg = "Aborted"
374
    except Exception, err:
375
      logging.exception("Caught unhandled exception")
376
      errmsg = str(err)
377

    
378
    setattr(self, "%s_error_message" % prefix, errmsg)
379

    
380
    self.lock.acquire()
381
    try:
382
      self.source_to_dest.notifyAll()
383
      self.dest_to_source.notifyAll()
384
    finally:
385
      self.lock.release()
386

    
387
  def CheckAbort(self):
388
    """Check whether thread should be aborted.
389

    
390
    @raise Abort: When thread should be aborted
391

    
392
    """
393
    if not (self.src_error_message is None and
394
            self.dest_error_message is None):
395
      logging.info("Aborting")
396
      raise Abort()
397

    
398
  def Wait(self, cond, check_fn):
399
    """Waits for a condition to become true.
400

    
401
    @type cond: threading.Condition
402
    @param cond: Threading condition
403
    @type check_fn: callable
404
    @param check_fn: Function to check whether condition is true
405

    
406
    """
407
    cond.acquire()
408
    try:
409
      while check_fn(self):
410
        self.CheckAbort()
411
        cond.wait()
412
    finally:
413
      cond.release()
414

    
415
  def PollJob(self, cl, job_id, remote_import_fn=None):
416
    """Wrapper for polling a job.
417

    
418
    @type cl: L{rapi.client.GanetiRapiClient}
419
    @param cl: RAPI client
420
    @type job_id: string
421
    @param job_id: Job ID
422
    @type remote_import_fn: callable or None
423
    @param remote_import_fn: Callback for reporting received remote import
424
                             information
425

    
426
    """
427
    return rapi.client_utils.PollJob(cl, job_id,
428
                                     MoveJobPollReportCb(self.CheckAbort,
429
                                                         remote_import_fn))
430

    
431

    
432
class MoveDestExecutor(object):
433
  def __init__(self, dest_client, mrt):
434
    """Destination side of an instance move.
435

    
436
    @type dest_client: L{rapi.client.GanetiRapiClient}
437
    @param dest_client: RAPI client
438
    @type mrt: L{MoveRuntime}
439
    @param mrt: Instance move runtime information
440

    
441
    """
442
    logging.debug("Waiting for instance information to become available")
443
    mrt.Wait(mrt.source_to_dest,
444
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
445

    
446
    logging.info("Creating instance %s in remote-import mode",
447
                 mrt.move.dest_instance_name)
448
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
449
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
450
                                  mrt.move.compress,
451
                                  mrt.move.dest_iallocator,
452
                                  mrt.move.dest_disk_template,
453
                                  mrt.src_instinfo, mrt.src_expinfo,
454
                                  mrt.move.hvparams, mrt.move.beparams,
455
                                  mrt.move.beparams, mrt.move.nics)
456
    mrt.PollJob(dest_client, job_id,
457
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
458

    
459
    logging.info("Import successful")
460

    
461
  @staticmethod
462
  def _SetImportInfo(mrt, impinfo):
463
    """Sets the remote import information and notifies source thread.
464

    
465
    @type mrt: L{MoveRuntime}
466
    @param mrt: Instance move runtime information
467
    @param impinfo: Remote import information
468

    
469
    """
470
    mrt.dest_to_source.acquire()
471
    try:
472
      mrt.dest_impinfo = impinfo
473
      mrt.dest_to_source.notifyAll()
474
    finally:
475
      mrt.dest_to_source.release()
476

    
477
  @staticmethod
478
  def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
479
                      dest_disk_template, instance, expinfo, override_hvparams,
480
                      override_beparams, override_osparams, override_nics):
481
    """Starts the instance creation in remote import mode.
482

    
483
    @type cl: L{rapi.client.GanetiRapiClient}
484
    @param cl: RAPI client
485
    @type name: string
486
    @param name: Instance name
487
    @type pnode: string or None
488
    @param pnode: Name of primary node on destination cluster
489
    @type snode: string or None
490
    @param snode: Name of secondary node on destination cluster
491
    @type compress: string
492
    @param compress: Compression mode to use
493
    @type iallocator: string or None
494
    @param iallocator: Name of iallocator to use
495
    @type dest_disk_template: string or None
496
    @param dest_disk_template: Disk template to use instead of the original one
497
    @type instance: dict
498
    @param instance: Instance details from source cluster
499
    @type expinfo: dict
500
    @param expinfo: Prepared export information from source cluster
501
    @type override_hvparams: dict or None
502
    @param override_hvparams: Hypervisor parameters to override
503
    @type override_beparams: dict or None
504
    @param override_beparams: Backend parameters to override
505
    @type override_osparams: dict or None
506
    @param override_osparams: OS parameters to override
507
    @type override_nics: dict or None
508
    @param override_nics: NICs to override
509
    @return: Job ID
510

    
511
    """
512
    if dest_disk_template:
513
      disk_template = dest_disk_template
514
    else:
515
      disk_template = instance["disk_template"]
516

    
517
    disks = []
518
    for idisk in instance["disks"]:
519
      odisk = {
520
        constants.IDISK_SIZE: idisk["size"],
521
        constants.IDISK_MODE: idisk["mode"],
522
        constants.IDISK_NAME: str(idisk.get("name")),
523
        }
524
      spindles = idisk.get("spindles")
525
      if spindles is not None:
526
        odisk[constants.IDISK_SPINDLES] = spindles
527
      disks.append(odisk)
528

    
529
    try:
530
      nics = [{
531
        constants.INIC_IP: ip,
532
        constants.INIC_MAC: mac,
533
        constants.INIC_MODE: mode,
534
        constants.INIC_LINK: link,
535
        constants.INIC_VLAN: vlan,
536
        constants.INIC_NETWORK: network,
537
        constants.INIC_NAME: nic_name
538
        } for nic_name, _, ip, mac, mode, link, vlan, network, _
539
          in instance["nics"]]
540
    except ValueError:
541
      raise Error("Received NIC information does not match expected format; "
542
                  "Do the versions of this tool and the source cluster match?")
543

    
544
    if len(override_nics) > len(nics):
545
      raise Error("Can not create new NICs")
546

    
547
    if override_nics:
548
      assert len(override_nics) <= len(nics)
549
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
550
        nics[idx] = objects.FillDict(nic, override)
551

    
552
    # TODO: Should this be the actual up/down status? (run_state)
553
    start = (instance["config_state"] == "up")
554

    
555
    assert len(disks) == len(instance["disks"])
556
    assert len(nics) == len(instance["nics"])
557

    
558
    inst_beparams = instance["be_instance"]
559
    if not inst_beparams:
560
      inst_beparams = {}
561

    
562
    inst_hvparams = instance["hv_instance"]
563
    if not inst_hvparams:
564
      inst_hvparams = {}
565

    
566
    inst_osparams = instance["os_instance"]
567
    if not inst_osparams:
568
      inst_osparams = {}
569

    
570
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
571
                             name, disk_template, disks, nics,
572
                             os=instance["os"],
573
                             pnode=pnode,
574
                             snode=snode,
575
                             start=start,
576
                             ip_check=False,
577
                             iallocator=iallocator,
578
                             hypervisor=instance["hypervisor"],
579
                             source_handshake=expinfo["handshake"],
580
                             source_x509_ca=expinfo["x509_ca"],
581
                             compress=compress,
582
                             source_instance_name=instance["name"],
583
                             beparams=objects.FillDict(inst_beparams,
584
                                                       override_beparams),
585
                             hvparams=objects.FillDict(inst_hvparams,
586
                                                       override_hvparams),
587
                             osparams=objects.FillDict(inst_osparams,
588
                                                       override_osparams))
589

    
590

    
591
class MoveSourceExecutor(object):
592
  def __init__(self, src_client, mrt):
593
    """Source side of an instance move.
594

    
595
    @type src_client: L{rapi.client.GanetiRapiClient}
596
    @param src_client: RAPI client
597
    @type mrt: L{MoveRuntime}
598
    @param mrt: Instance move runtime information
599

    
600
    """
601
    logging.info("Checking whether instance exists")
602
    self._CheckInstance(src_client, mrt.move.src_instance_name)
603

    
604
    logging.info("Retrieving instance information from source cluster")
605
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
606
                                     mrt.move.src_instance_name)
607
    if (instinfo["disk_template"] in constants.DTS_FILEBASED):
608
      raise Error("Inter-cluster move of file-based instances is not"
609
                  " supported.")
610

    
611
    logging.info("Preparing export on source cluster")
612
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
613
                                  mrt.move.src_instance_name)
614
    assert "handshake" in expinfo
615
    assert "x509_key_name" in expinfo
616
    assert "x509_ca" in expinfo
617

    
618
    # Hand information to destination thread
619
    mrt.source_to_dest.acquire()
620
    try:
621
      mrt.src_instinfo = instinfo
622
      mrt.src_expinfo = expinfo
623
      mrt.source_to_dest.notifyAll()
624
    finally:
625
      mrt.source_to_dest.release()
626

    
627
    logging.info("Waiting for destination information to become available")
628
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
629

    
630
    logging.info("Starting remote export on source cluster")
631
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
632
                         expinfo["x509_key_name"], mrt.move.compress,
633
                         mrt.dest_impinfo)
634

    
635
    logging.info("Export successful")
636

    
637
  @staticmethod
638
  def _CheckInstance(cl, name):
639
    """Checks whether the instance exists on the source cluster.
640

    
641
    @type cl: L{rapi.client.GanetiRapiClient}
642
    @param cl: RAPI client
643
    @type name: string
644
    @param name: Instance name
645

    
646
    """
647
    try:
648
      cl.GetInstance(name)
649
    except rapi.client.GanetiApiError, err:
650
      if err.code == rapi.client.HTTP_NOT_FOUND:
651
        raise Error("Instance %s not found (%s)" % (name, str(err)))
652
      raise
653

    
654
  @staticmethod
655
  def _GetInstanceInfo(cl, poll_job_fn, name):
656
    """Retrieves detailed instance information from source cluster.
657

    
658
    @type cl: L{rapi.client.GanetiRapiClient}
659
    @param cl: RAPI client
660
    @type poll_job_fn: callable
661
    @param poll_job_fn: Function to poll for job result
662
    @type name: string
663
    @param name: Instance name
664

    
665
    """
666
    job_id = cl.GetInstanceInfo(name, static=True)
667
    result = poll_job_fn(cl, job_id)
668
    assert len(result[0].keys()) == 1
669
    return result[0][result[0].keys()[0]]
670

    
671
  @staticmethod
672
  def _PrepareExport(cl, poll_job_fn, name):
673
    """Prepares export on source cluster.
674

    
675
    @type cl: L{rapi.client.GanetiRapiClient}
676
    @param cl: RAPI client
677
    @type poll_job_fn: callable
678
    @param poll_job_fn: Function to poll for job result
679
    @type name: string
680
    @param name: Instance name
681

    
682
    """
683
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
684
    return poll_job_fn(cl, job_id)[0]
685

    
686
  @staticmethod
687
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
688
    """Exports instance from source cluster.
689

    
690
    @type cl: L{rapi.client.GanetiRapiClient}
691
    @param cl: RAPI client
692
    @type poll_job_fn: callable
693
    @param poll_job_fn: Function to poll for job result
694
    @type name: string
695
    @param name: Instance name
696
    @param x509_key_name: Source X509 key
697
    @type compress: string
698
    @param compress: Compression mode to use
699
    @param impinfo: Import information from destination cluster
700

    
701
    """
702
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
703
                               impinfo["disks"], shutdown=True,
704
                               remove_instance=True,
705
                               x509_key_name=x509_key_name,
706
                               destination_x509_ca=impinfo["x509_ca"],
707
                               compress=compress)
708
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
709

    
710
    if not (fin_resu and compat.all(dresults)):
711
      raise Error("Export failed for disks %s" %
712
                  utils.CommaJoin(str(idx) for idx, result
713
                                  in enumerate(dresults) if not result))
714

    
715

    
716
class MoveSourceWorker(workerpool.BaseWorker):
717
  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
718
    """Executes an instance move.
719

    
720
    @type rapi_factory: L{RapiClientFactory}
721
    @param rapi_factory: RAPI client factory
722
    @type move: L{InstanceMove}
723
    @param move: Instance move information
724

    
725
    """
726
    try:
727
      logging.info("Preparing to move %s from cluster %s to %s as %s",
728
                   move.src_instance_name, rapi_factory.src_cluster_name,
729
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
730

    
731
      mrt = MoveRuntime(move)
732

    
733
      logging.debug("Starting destination thread")
734
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
735
                                     target=mrt.HandleErrors,
736
                                     args=("dest", MoveDestExecutor,
737
                                           rapi_factory.GetDestClient(),
738
                                           mrt, ))
739
      dest_thread.start()
740
      try:
741
        mrt.HandleErrors("src", MoveSourceExecutor,
742
                         rapi_factory.GetSourceClient(), mrt)
743
      finally:
744
        dest_thread.join()
745

    
746
      if mrt.src_error_message or mrt.dest_error_message:
747
        move.error_message = ("Source error: %s, destination error: %s" %
748
                              (mrt.src_error_message, mrt.dest_error_message))
749
      else:
750
        move.error_message = None
751
    except Exception, err: # pylint: disable=W0703
752
      logging.exception("Caught unhandled exception")
753
      move.error_message = str(err)
754

    
755

    
756
def CheckRapiSetup(rapi_factory):
757
  """Checks the RAPI setup by retrieving the version.
758

    
759
  @type rapi_factory: L{RapiClientFactory}
760
  @param rapi_factory: RAPI client factory
761

    
762
  """
763
  src_client = rapi_factory.GetSourceClient()
764
  logging.info("Connecting to source RAPI server")
765
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
766

    
767
  dest_client = rapi_factory.GetDestClient()
768
  logging.info("Connecting to destination RAPI server")
769
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
770

    
771

    
772
def ParseOptions():
773
  """Parses options passed to program.
774

    
775
  """
776
  program = os.path.basename(sys.argv[0])
777

    
778
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
779
                                        " <source-cluster> <dest-cluster>"
780
                                        " <instance...>"),
781
                                 prog=program)
782
  parser.add_option(cli.DEBUG_OPT)
783
  parser.add_option(cli.VERBOSE_OPT)
784
  parser.add_option(cli.IALLOCATOR_OPT)
785
  parser.add_option(cli.BACKEND_OPT)
786
  parser.add_option(cli.HVOPTS_OPT)
787
  parser.add_option(cli.OSPARAMS_OPT)
788
  parser.add_option(cli.NET_OPT)
789
  parser.add_option(SRC_RAPI_PORT_OPT)
790
  parser.add_option(SRC_CA_FILE_OPT)
791
  parser.add_option(SRC_USERNAME_OPT)
792
  parser.add_option(SRC_PASSWORD_FILE_OPT)
793
  parser.add_option(DEST_RAPI_PORT_OPT)
794
  parser.add_option(DEST_CA_FILE_OPT)
795
  parser.add_option(DEST_USERNAME_OPT)
796
  parser.add_option(DEST_PASSWORD_FILE_OPT)
797
  parser.add_option(DEST_INSTANCE_NAME_OPT)
798
  parser.add_option(DEST_PRIMARY_NODE_OPT)
799
  parser.add_option(DEST_SECONDARY_NODE_OPT)
800
  parser.add_option(DEST_DISK_TEMPLATE_OPT)
801
  parser.add_option(COMPRESS_OPT)
802
  parser.add_option(PARALLEL_OPT)
803

    
804
  (options, args) = parser.parse_args()
805

    
806
  return (parser, options, args)
807

    
808

    
809
def CheckOptions(parser, options, args):
810
  """Checks options and arguments for validity.
811

    
812
  """
813
  if len(args) < 3:
814
    parser.error("Not enough arguments")
815

    
816
  src_cluster_name = args.pop(0)
817
  dest_cluster_name = args.pop(0)
818
  instance_names = args
819

    
820
  assert len(instance_names) > 0
821

    
822
  # TODO: Remove once using system default paths for SSL certificate
823
  # verification is implemented
824
  if not options.src_ca_file:
825
    parser.error("Missing source cluster CA file")
826

    
827
  if options.parallel < 1:
828
    parser.error("Number of simultaneous moves must be >= 1")
829

    
830
  if (bool(options.iallocator) and
831
      bool(options.dest_primary_node or options.dest_secondary_node)):
832
    parser.error("Destination node and iallocator options exclude each other")
833

    
834
  if len(instance_names) == 1:
835
    # Moving one instance only
836
    if options.hvparams:
837
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
838

    
839
    if options.beparams:
840
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
841

    
842
    if options.nics:
843
      options.nics = cli.ParseNicOption(options.nics)
844
  else:
845
    # Moving more than one instance
846
    if (options.dest_instance_name or options.dest_primary_node or
847
        options.dest_secondary_node or options.hvparams or
848
        options.beparams or options.osparams or options.nics):
849
      parser.error("The options --dest-instance-name, --dest-primary-node,"
850
                   " --dest-secondary-node, --hypervisor-parameters,"
851
                   " --backend-parameters, --os-parameters and --net can"
852
                   " only be used when moving exactly one instance")
853

    
854
  return (src_cluster_name, dest_cluster_name, instance_names)
855

    
856

    
857
def DestClusterHasDefaultIAllocator(rapi_factory):
858
  """Determines if a given cluster has a default iallocator.
859

    
860
  """
861
  result = rapi_factory.GetDestClient().GetInfo()
862
  ia_name = "default_iallocator"
863
  return ia_name in result and result[ia_name]
864

    
865

    
866
def ExitWithError(message):
867
  """Exits after an error and shows a message.
868

    
869
  """
870
  sys.stderr.write("move-instance: error: " + message + "\n")
871
  sys.exit(constants.EXIT_FAILURE)
872

    
873

    
874
@UsesRapiClient
875
def main():
876
  """Main routine.
877

    
878
  """
879
  (parser, options, args) = ParseOptions()
880

    
881
  utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
882

    
883
  (src_cluster_name, dest_cluster_name, instance_names) = \
884
    CheckOptions(parser, options, args)
885

    
886
  logging.info("Source cluster: %s", src_cluster_name)
887
  logging.info("Destination cluster: %s", dest_cluster_name)
888
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
889

    
890
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
891

    
892
  CheckRapiSetup(rapi_factory)
893

    
894
  has_iallocator = options.iallocator or \
895
                   DestClusterHasDefaultIAllocator(rapi_factory)
896

    
897
  if len(instance_names) > 1 and not has_iallocator:
898
    ExitWithError("When moving multiple nodes, an iallocator must be used. "
899
                  "None was provided and the target cluster does not have "
900
                  "a default iallocator.")
901
  if (len(instance_names) == 1 and not (has_iallocator or
902
      options.dest_primary_node or options.dest_secondary_node)):
903
    ExitWithError("Target cluster does not have a default iallocator, "
904
                  "please specify either destination nodes or an iallocator.")
905

    
906
  # Prepare list of instance moves
907
  moves = []
908
  for src_instance_name in instance_names:
909
    if options.dest_instance_name:
910
      assert len(instance_names) == 1
911
      # Rename instance
912
      dest_instance_name = options.dest_instance_name
913
    else:
914
      dest_instance_name = src_instance_name
915

    
916
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
917
                              options.dest_primary_node,
918
                              options.dest_secondary_node,
919
                              options.compress,
920
                              options.iallocator,
921
                              options.dest_disk_template,
922
                              options.hvparams,
923
                              options.beparams,
924
                              options.osparams,
925
                              options.nics))
926

    
927
  assert len(moves) == len(instance_names)
928

    
929
  # Start workerpool
930
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
931
  try:
932
    # Add instance moves to workerpool
933
    for move in moves:
934
      wp.AddTask((rapi_factory, move))
935

    
936
    # Wait for all moves to finish
937
    wp.Quiesce()
938

    
939
  finally:
940
    wp.TerminateWorkers()
941

    
942
  # There should be no threads running at this point, hence not using locks
943
  # anymore
944

    
945
  logging.info("Instance move results:")
946

    
947
  for move in moves:
948
    if move.dest_instance_name == move.src_instance_name:
949
      name = move.src_instance_name
950
    else:
951
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
952

    
953
    if move.error_message:
954
      msg = "Failed (%s)" % move.error_message
955
    else:
956
      msg = "Success"
957

    
958
    logging.info("%s: %s", name, msg)
959

    
960
  if compat.any(move.error_message for move in moves):
961
    sys.exit(constants.EXIT_FAILURE)
962

    
963
  sys.exit(constants.EXIT_SUCCESS)
964

    
965

    
966
if __name__ == "__main__":
967
  main()