Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ a09639d1

History | View | Annotate | Download (32.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import objects
40
from ganeti import compat
41
from ganeti import rapi
42

    
43
import ganeti.rapi.client # pylint: disable=W0611
44
import ganeti.rapi.client_utils
45
from ganeti.rapi.client import UsesRapiClient
46

    
47

    
48
SRC_RAPI_PORT_OPT = \
49
  cli.cli_option("--src-rapi-port", action="store", type="int",
50
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51
                 help=("Source cluster RAPI port (defaults to %s)" %
52
                       constants.DEFAULT_RAPI_PORT))
53

    
54
SRC_CA_FILE_OPT = \
55
  cli.cli_option("--src-ca-file", action="store", type="string",
56
                 dest="src_ca_file",
57
                 help=("File containing source cluster Certificate"
58
                       " Authority (CA) in PEM format"))
59

    
60
SRC_USERNAME_OPT = \
61
  cli.cli_option("--src-username", action="store", type="string",
62
                 dest="src_username", default=None,
63
                 help="Source cluster username")
64

    
65
SRC_PASSWORD_FILE_OPT = \
66
  cli.cli_option("--src-password-file", action="store", type="string",
67
                 dest="src_password_file",
68
                 help="File containing source cluster password")
69

    
70
DEST_RAPI_PORT_OPT = \
71
  cli.cli_option("--dest-rapi-port", action="store", type="int",
72
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73
                 help=("Destination cluster RAPI port (defaults to source"
74
                       " cluster RAPI port)"))
75

    
76
DEST_CA_FILE_OPT = \
77
  cli.cli_option("--dest-ca-file", action="store", type="string",
78
                 dest="dest_ca_file",
79
                 help=("File containing destination cluster Certificate"
80
                       " Authority (CA) in PEM format (defaults to source"
81
                       " cluster CA)"))
82

    
83
DEST_USERNAME_OPT = \
84
  cli.cli_option("--dest-username", action="store", type="string",
85
                 dest="dest_username", default=None,
86
                 help=("Destination cluster username (defaults to"
87
                       " source cluster username)"))
88

    
89
DEST_PASSWORD_FILE_OPT = \
90
  cli.cli_option("--dest-password-file", action="store", type="string",
91
                 dest="dest_password_file",
92
                 help=("File containing destination cluster password"
93
                       " (defaults to source cluster password)"))
94

    
95
DEST_INSTANCE_NAME_OPT = \
96
  cli.cli_option("--dest-instance-name", action="store", type="string",
97
                 dest="dest_instance_name",
98
                 help=("Instance name on destination cluster (only"
99
                       " when moving exactly one instance)"))
100

    
101
DEST_PRIMARY_NODE_OPT = \
102
  cli.cli_option("--dest-primary-node", action="store", type="string",
103
                 dest="dest_primary_node",
104
                 help=("Primary node on destination cluster (only"
105
                       " when moving exactly one instance)"))
106

    
107
DEST_SECONDARY_NODE_OPT = \
108
  cli.cli_option("--dest-secondary-node", action="store", type="string",
109
                 dest="dest_secondary_node",
110
                 help=("Secondary node on destination cluster (only"
111
                       " when moving exactly one instance)"))
112

    
113
DEST_DISK_TEMPLATE_OPT = \
114
  cli.cli_option("--dest-disk-template", action="store", type="string",
115
                 dest="dest_disk_template", default=None,
116
                 help="Disk template to use on destination cluster")
117

    
118
COMPRESS_OPT = \
119
  cli.cli_option("--compress", action="store", type="string",
120
                 dest="compress", default="none",
121
                 help="Compression mode to use during the move (this mode has"
122
                      " to be supported by both clusters)")
123

    
124
PARALLEL_OPT = \
125
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
126
                 dest="parallel", metavar="<number>",
127
                 help="Number of instances to be moved simultaneously")
128

    
129

    
130
class Error(Exception):
131
  """Generic error.
132

    
133
  """
134

    
135

    
136
class Abort(Error):
137
  """Special exception for aborting import/export.
138

    
139
  """
140

    
141

    
142
class RapiClientFactory:
143
  """Factory class for creating RAPI clients.
144

    
145
  @ivar src_cluster_name: Source cluster name
146
  @ivar dest_cluster_name: Destination cluster name
147
  @ivar GetSourceClient: Callable returning new client for source cluster
148
  @ivar GetDestClient: Callable returning new client for destination cluster
149

    
150
  """
151
  def __init__(self, options, src_cluster_name, dest_cluster_name):
152
    """Initializes this class.
153

    
154
    @param options: Program options
155
    @type src_cluster_name: string
156
    @param src_cluster_name: Source cluster name
157
    @type dest_cluster_name: string
158
    @param dest_cluster_name: Destination cluster name
159

    
160
    """
161
    self.src_cluster_name = src_cluster_name
162
    self.dest_cluster_name = dest_cluster_name
163

    
164
    # TODO: Implement timeouts for RAPI connections
165
    # TODO: Support for using system default paths for verifying SSL certificate
166
    logging.debug("Using '%s' as source CA", options.src_ca_file)
167
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
168

    
169
    if options.dest_ca_file:
170
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
171
      dest_curl_config = \
172
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
173
    else:
174
      logging.debug("Using source CA for destination")
175
      dest_curl_config = src_curl_config
176

    
177
    logging.debug("Source RAPI server is %s:%s",
178
                  src_cluster_name, options.src_rapi_port)
179
    logging.debug("Source username is '%s'", options.src_username)
180

    
181
    if options.src_username is None:
182
      src_username = ""
183
    else:
184
      src_username = options.src_username
185

    
186
    if options.src_password_file:
187
      logging.debug("Reading '%s' for source password",
188
                    options.src_password_file)
189
      src_password = utils.ReadOneLineFile(options.src_password_file,
190
                                           strict=True)
191
    else:
192
      logging.debug("Source has no password")
193
      src_password = None
194

    
195
    self.GetSourceClient = lambda: \
196
      rapi.client.GanetiRapiClient(src_cluster_name,
197
                                   port=options.src_rapi_port,
198
                                   curl_config_fn=src_curl_config,
199
                                   username=src_username,
200
                                   password=src_password)
201

    
202
    if options.dest_rapi_port:
203
      dest_rapi_port = options.dest_rapi_port
204
    else:
205
      dest_rapi_port = options.src_rapi_port
206

    
207
    if options.dest_username is None:
208
      dest_username = src_username
209
    else:
210
      dest_username = options.dest_username
211

    
212
    logging.debug("Destination RAPI server is %s:%s",
213
                  dest_cluster_name, dest_rapi_port)
214
    logging.debug("Destination username is '%s'", dest_username)
215

    
216
    if options.dest_password_file:
217
      logging.debug("Reading '%s' for destination password",
218
                    options.dest_password_file)
219
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
220
                                            strict=True)
221
    else:
222
      logging.debug("Using source password for destination")
223
      dest_password = src_password
224

    
225
    self.GetDestClient = lambda: \
226
      rapi.client.GanetiRapiClient(dest_cluster_name,
227
                                   port=dest_rapi_port,
228
                                   curl_config_fn=dest_curl_config,
229
                                   username=dest_username,
230
                                   password=dest_password)
231

    
232

    
233
class MoveJobPollReportCb(cli.JobPollReportCbBase):
234
  def __init__(self, abort_check_fn, remote_import_fn):
235
    """Initializes this class.
236

    
237
    @type abort_check_fn: callable
238
    @param abort_check_fn: Function to check whether move is aborted
239
    @type remote_import_fn: callable or None
240
    @param remote_import_fn: Callback for reporting received remote import
241
                             information
242

    
243
    """
244
    cli.JobPollReportCbBase.__init__(self)
245
    self._abort_check_fn = abort_check_fn
246
    self._remote_import_fn = remote_import_fn
247

    
248
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
249
    """Handles a log message.
250

    
251
    """
252
    if log_type == constants.ELOG_REMOTE_IMPORT:
253
      logging.debug("Received remote import information")
254

    
255
      if not self._remote_import_fn:
256
        raise RuntimeError("Received unexpected remote import information")
257

    
258
      assert "x509_ca" in log_msg
259
      assert "disks" in log_msg
260

    
261
      self._remote_import_fn(log_msg)
262

    
263
      return
264

    
265
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
266
                 cli.FormatLogMessage(log_type, log_msg))
267

    
268
  def ReportNotChanged(self, job_id, status):
269
    """Called if a job hasn't changed in a while.
270

    
271
    """
272
    try:
273
      # Check whether we were told to abort by the other thread
274
      self._abort_check_fn()
275
    except Abort:
276
      logging.warning("Aborting despite job %s still running", job_id)
277
      raise
278

    
279

    
280
class InstanceMove(object):
281
  """Status class for instance moves.
282

    
283
  """
284
  def __init__(self, src_instance_name, dest_instance_name,
285
               dest_pnode, dest_snode, compress, dest_iallocator,
286
               dest_disk_template, hvparams,
287
               beparams, osparams, nics):
288
    """Initializes this class.
289

    
290
    @type src_instance_name: string
291
    @param src_instance_name: Instance name on source cluster
292
    @type dest_instance_name: string
293
    @param dest_instance_name: Instance name on destination cluster
294
    @type dest_pnode: string or None
295
    @param dest_pnode: Name of primary node on destination cluster
296
    @type dest_snode: string or None
297
    @param dest_snode: Name of secondary node on destination cluster
298
    @type compress; string
299
    @param compress: Compression mode to use (has to be supported on both
300
                     clusters)
301
    @type dest_iallocator: string or None
302
    @param dest_iallocator: Name of iallocator to use
303
    @type dest_disk_template: string or None
304
    @param dest_disk_template: Disk template to use instead of the original one
305
    @type hvparams: dict or None
306
    @param hvparams: Hypervisor parameters to override
307
    @type beparams: dict or None
308
    @param beparams: Backend parameters to override
309
    @type osparams: dict or None
310
    @param osparams: OS parameters to override
311
    @type nics: dict or None
312
    @param nics: NICs to override
313

    
314
    """
315
    self.src_instance_name = src_instance_name
316
    self.dest_instance_name = dest_instance_name
317
    self.dest_pnode = dest_pnode
318
    self.dest_snode = dest_snode
319
    self.compress = compress
320
    self.dest_iallocator = dest_iallocator
321
    self.dest_disk_template = dest_disk_template
322
    self.hvparams = hvparams
323
    self.beparams = beparams
324
    self.osparams = osparams
325
    self.nics = nics
326

    
327
    self.error_message = None
328

    
329

    
330
class MoveRuntime(object):
331
  """Class to keep track of instance move.
332

    
333
  """
334
  def __init__(self, move):
335
    """Initializes this class.
336

    
337
    @type move: L{InstanceMove}
338

    
339
    """
340
    self.move = move
341

    
342
    # Thread synchronization
343
    self.lock = threading.Lock()
344
    self.source_to_dest = threading.Condition(self.lock)
345
    self.dest_to_source = threading.Condition(self.lock)
346

    
347
    # Source information
348
    self.src_error_message = None
349
    self.src_expinfo = None
350
    self.src_instinfo = None
351

    
352
    # Destination information
353
    self.dest_error_message = None
354
    self.dest_impinfo = None
355

    
356
  def HandleErrors(self, prefix, fn, *args):
357
    """Wrapper to catch errors and abort threads.
358

    
359
    @type prefix: string
360
    @param prefix: Variable name prefix ("src" or "dest")
361
    @type fn: callable
362
    @param fn: Function
363

    
364
    """
365
    assert prefix in ("dest", "src")
366

    
367
    try:
368
      # Call inner function
369
      fn(*args)
370

    
371
      errmsg = None
372
    except Abort:
373
      errmsg = "Aborted"
374
    except Exception, err:
375
      logging.exception("Caught unhandled exception")
376
      errmsg = str(err)
377

    
378
    setattr(self, "%s_error_message" % prefix, errmsg)
379

    
380
    self.lock.acquire()
381
    try:
382
      self.source_to_dest.notifyAll()
383
      self.dest_to_source.notifyAll()
384
    finally:
385
      self.lock.release()
386

    
387
  def CheckAbort(self):
388
    """Check whether thread should be aborted.
389

    
390
    @raise Abort: When thread should be aborted
391

    
392
    """
393
    if not (self.src_error_message is None and
394
            self.dest_error_message is None):
395
      logging.info("Aborting")
396
      raise Abort()
397

    
398
  def Wait(self, cond, check_fn):
399
    """Waits for a condition to become true.
400

    
401
    @type cond: threading.Condition
402
    @param cond: Threading condition
403
    @type check_fn: callable
404
    @param check_fn: Function to check whether condition is true
405

    
406
    """
407
    cond.acquire()
408
    try:
409
      while check_fn(self):
410
        self.CheckAbort()
411
        cond.wait()
412
    finally:
413
      cond.release()
414

    
415
  def PollJob(self, cl, job_id, remote_import_fn=None):
416
    """Wrapper for polling a job.
417

    
418
    @type cl: L{rapi.client.GanetiRapiClient}
419
    @param cl: RAPI client
420
    @type job_id: string
421
    @param job_id: Job ID
422
    @type remote_import_fn: callable or None
423
    @param remote_import_fn: Callback for reporting received remote import
424
                             information
425

    
426
    """
427
    return rapi.client_utils.PollJob(cl, job_id,
428
                                     MoveJobPollReportCb(self.CheckAbort,
429
                                                         remote_import_fn))
430

    
431

    
432
class MoveDestExecutor(object):
433
  def __init__(self, dest_client, mrt):
434
    """Destination side of an instance move.
435

    
436
    @type dest_client: L{rapi.client.GanetiRapiClient}
437
    @param dest_client: RAPI client
438
    @type mrt: L{MoveRuntime}
439
    @param mrt: Instance move runtime information
440

    
441
    """
442
    logging.debug("Waiting for instance information to become available")
443
    mrt.Wait(mrt.source_to_dest,
444
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
445

    
446
    logging.info("Creating instance %s in remote-import mode",
447
                 mrt.move.dest_instance_name)
448
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
449
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
450
                                  mrt.move.compress,
451
                                  mrt.move.dest_iallocator,
452
                                  mrt.move.dest_disk_template,
453
                                  mrt.src_instinfo, mrt.src_expinfo,
454
                                  mrt.move.hvparams, mrt.move.beparams,
455
                                  mrt.move.beparams, mrt.move.nics)
456
    mrt.PollJob(dest_client, job_id,
457
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
458

    
459
    logging.info("Import successful")
460

    
461
  @staticmethod
462
  def _SetImportInfo(mrt, impinfo):
463
    """Sets the remote import information and notifies source thread.
464

    
465
    @type mrt: L{MoveRuntime}
466
    @param mrt: Instance move runtime information
467
    @param impinfo: Remote import information
468

    
469
    """
470
    mrt.dest_to_source.acquire()
471
    try:
472
      mrt.dest_impinfo = impinfo
473
      mrt.dest_to_source.notifyAll()
474
    finally:
475
      mrt.dest_to_source.release()
476

    
477
  @staticmethod
478
  def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
479
                      dest_disk_template, instance, expinfo, override_hvparams,
480
                      override_beparams, override_osparams, override_nics):
481
    """Starts the instance creation in remote import mode.
482

    
483
    @type cl: L{rapi.client.GanetiRapiClient}
484
    @param cl: RAPI client
485
    @type name: string
486
    @param name: Instance name
487
    @type pnode: string or None
488
    @param pnode: Name of primary node on destination cluster
489
    @type snode: string or None
490
    @param snode: Name of secondary node on destination cluster
491
    @type compress: string
492
    @param compress: Compression mode to use
493
    @type iallocator: string or None
494
    @param iallocator: Name of iallocator to use
495
    @type dest_disk_template: string or None
496
    @param dest_disk_template: Disk template to use instead of the original one
497
    @type instance: dict
498
    @param instance: Instance details from source cluster
499
    @type expinfo: dict
500
    @param expinfo: Prepared export information from source cluster
501
    @type override_hvparams: dict or None
502
    @param override_hvparams: Hypervisor parameters to override
503
    @type override_beparams: dict or None
504
    @param override_beparams: Backend parameters to override
505
    @type override_osparams: dict or None
506
    @param override_osparams: OS parameters to override
507
    @type override_nics: dict or None
508
    @param override_nics: NICs to override
509
    @return: Job ID
510

    
511
    """
512
    if dest_disk_template:
513
      disk_template = dest_disk_template
514
    else:
515
      disk_template = instance["disk_template"]
516

    
517
    disks = []
518
    for idisk in instance["disks"]:
519
      odisk = {
520
        constants.IDISK_SIZE: idisk["size"],
521
        constants.IDISK_MODE: idisk["mode"],
522
        constants.IDISK_NAME: str(idisk.get("name")),
523
        }
524
      spindles = idisk.get("spindles")
525
      if spindles is not None:
526
        odisk[constants.IDISK_SPINDLES] = spindles
527
      disks.append(odisk)
528

    
529
    nics = [{
530
      constants.INIC_IP: ip,
531
      constants.INIC_MAC: mac,
532
      constants.INIC_MODE: mode,
533
      constants.INIC_LINK: link,
534
      constants.INIC_NAME: vlan,
535
      constants.INIC_NETWORK: network,
536
      constants.INIC_NAME: nic_name
537
      } for nic_name, _, ip, mac, mode, link, vlan, network, _
538
        in instance["nics"]]
539

    
540
    if len(override_nics) > len(nics):
541
      raise Error("Can not create new NICs")
542

    
543
    if override_nics:
544
      assert len(override_nics) <= len(nics)
545
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
546
        nics[idx] = objects.FillDict(nic, override)
547

    
548
    # TODO: Should this be the actual up/down status? (run_state)
549
    start = (instance["config_state"] == "up")
550

    
551
    assert len(disks) == len(instance["disks"])
552
    assert len(nics) == len(instance["nics"])
553

    
554
    inst_beparams = instance["be_instance"]
555
    if not inst_beparams:
556
      inst_beparams = {}
557

    
558
    inst_hvparams = instance["hv_instance"]
559
    if not inst_hvparams:
560
      inst_hvparams = {}
561

    
562
    inst_osparams = instance["os_instance"]
563
    if not inst_osparams:
564
      inst_osparams = {}
565

    
566
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
567
                             name, disk_template, disks, nics,
568
                             os=instance["os"],
569
                             pnode=pnode,
570
                             snode=snode,
571
                             start=start,
572
                             ip_check=False,
573
                             iallocator=iallocator,
574
                             hypervisor=instance["hypervisor"],
575
                             source_handshake=expinfo["handshake"],
576
                             source_x509_ca=expinfo["x509_ca"],
577
                             compress=compress,
578
                             source_instance_name=instance["name"],
579
                             beparams=objects.FillDict(inst_beparams,
580
                                                       override_beparams),
581
                             hvparams=objects.FillDict(inst_hvparams,
582
                                                       override_hvparams),
583
                             osparams=objects.FillDict(inst_osparams,
584
                                                       override_osparams))
585

    
586

    
587
class MoveSourceExecutor(object):
588
  def __init__(self, src_client, mrt):
589
    """Source side of an instance move.
590

    
591
    @type src_client: L{rapi.client.GanetiRapiClient}
592
    @param src_client: RAPI client
593
    @type mrt: L{MoveRuntime}
594
    @param mrt: Instance move runtime information
595

    
596
    """
597
    logging.info("Checking whether instance exists")
598
    self._CheckInstance(src_client, mrt.move.src_instance_name)
599

    
600
    logging.info("Retrieving instance information from source cluster")
601
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
602
                                     mrt.move.src_instance_name)
603
    if (instinfo["disk_template"] in constants.DTS_FILEBASED):
604
      raise Error("Inter-cluster move of file-based instances is not"
605
                  " supported.")
606

    
607
    logging.info("Preparing export on source cluster")
608
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
609
                                  mrt.move.src_instance_name)
610
    assert "handshake" in expinfo
611
    assert "x509_key_name" in expinfo
612
    assert "x509_ca" in expinfo
613

    
614
    # Hand information to destination thread
615
    mrt.source_to_dest.acquire()
616
    try:
617
      mrt.src_instinfo = instinfo
618
      mrt.src_expinfo = expinfo
619
      mrt.source_to_dest.notifyAll()
620
    finally:
621
      mrt.source_to_dest.release()
622

    
623
    logging.info("Waiting for destination information to become available")
624
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
625

    
626
    logging.info("Starting remote export on source cluster")
627
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
628
                         expinfo["x509_key_name"], mrt.move.compress,
629
                         mrt.dest_impinfo)
630

    
631
    logging.info("Export successful")
632

    
633
  @staticmethod
634
  def _CheckInstance(cl, name):
635
    """Checks whether the instance exists on the source cluster.
636

    
637
    @type cl: L{rapi.client.GanetiRapiClient}
638
    @param cl: RAPI client
639
    @type name: string
640
    @param name: Instance name
641

    
642
    """
643
    try:
644
      cl.GetInstance(name)
645
    except rapi.client.GanetiApiError, err:
646
      if err.code == rapi.client.HTTP_NOT_FOUND:
647
        raise Error("Instance %s not found (%s)" % (name, str(err)))
648
      raise
649

    
650
  @staticmethod
651
  def _GetInstanceInfo(cl, poll_job_fn, name):
652
    """Retrieves detailed instance information from source cluster.
653

    
654
    @type cl: L{rapi.client.GanetiRapiClient}
655
    @param cl: RAPI client
656
    @type poll_job_fn: callable
657
    @param poll_job_fn: Function to poll for job result
658
    @type name: string
659
    @param name: Instance name
660

    
661
    """
662
    job_id = cl.GetInstanceInfo(name, static=True)
663
    result = poll_job_fn(cl, job_id)
664
    assert len(result[0].keys()) == 1
665
    return result[0][result[0].keys()[0]]
666

    
667
  @staticmethod
668
  def _PrepareExport(cl, poll_job_fn, name):
669
    """Prepares export on source cluster.
670

    
671
    @type cl: L{rapi.client.GanetiRapiClient}
672
    @param cl: RAPI client
673
    @type poll_job_fn: callable
674
    @param poll_job_fn: Function to poll for job result
675
    @type name: string
676
    @param name: Instance name
677

    
678
    """
679
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
680
    return poll_job_fn(cl, job_id)[0]
681

    
682
  @staticmethod
683
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
684
    """Exports instance from source cluster.
685

    
686
    @type cl: L{rapi.client.GanetiRapiClient}
687
    @param cl: RAPI client
688
    @type poll_job_fn: callable
689
    @param poll_job_fn: Function to poll for job result
690
    @type name: string
691
    @param name: Instance name
692
    @param x509_key_name: Source X509 key
693
    @type compress: string
694
    @param compress: Compression mode to use
695
    @param impinfo: Import information from destination cluster
696

    
697
    """
698
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
699
                               impinfo["disks"], shutdown=True,
700
                               remove_instance=True,
701
                               x509_key_name=x509_key_name,
702
                               destination_x509_ca=impinfo["x509_ca"],
703
                               compress=compress)
704
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
705

    
706
    if not (fin_resu and compat.all(dresults)):
707
      raise Error("Export failed for disks %s" %
708
                  utils.CommaJoin(str(idx) for idx, result
709
                                  in enumerate(dresults) if not result))
710

    
711

    
712
class MoveSourceWorker(workerpool.BaseWorker):
713
  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
714
    """Executes an instance move.
715

    
716
    @type rapi_factory: L{RapiClientFactory}
717
    @param rapi_factory: RAPI client factory
718
    @type move: L{InstanceMove}
719
    @param move: Instance move information
720

    
721
    """
722
    try:
723
      logging.info("Preparing to move %s from cluster %s to %s as %s",
724
                   move.src_instance_name, rapi_factory.src_cluster_name,
725
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
726

    
727
      mrt = MoveRuntime(move)
728

    
729
      logging.debug("Starting destination thread")
730
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
731
                                     target=mrt.HandleErrors,
732
                                     args=("dest", MoveDestExecutor,
733
                                           rapi_factory.GetDestClient(),
734
                                           mrt, ))
735
      dest_thread.start()
736
      try:
737
        mrt.HandleErrors("src", MoveSourceExecutor,
738
                         rapi_factory.GetSourceClient(), mrt)
739
      finally:
740
        dest_thread.join()
741

    
742
      if mrt.src_error_message or mrt.dest_error_message:
743
        move.error_message = ("Source error: %s, destination error: %s" %
744
                              (mrt.src_error_message, mrt.dest_error_message))
745
      else:
746
        move.error_message = None
747
    except Exception, err: # pylint: disable=W0703
748
      logging.exception("Caught unhandled exception")
749
      move.error_message = str(err)
750

    
751

    
752
def CheckRapiSetup(rapi_factory):
753
  """Checks the RAPI setup by retrieving the version.
754

    
755
  @type rapi_factory: L{RapiClientFactory}
756
  @param rapi_factory: RAPI client factory
757

    
758
  """
759
  src_client = rapi_factory.GetSourceClient()
760
  logging.info("Connecting to source RAPI server")
761
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
762

    
763
  dest_client = rapi_factory.GetDestClient()
764
  logging.info("Connecting to destination RAPI server")
765
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
766

    
767

    
768
def ParseOptions():
769
  """Parses options passed to program.
770

    
771
  """
772
  program = os.path.basename(sys.argv[0])
773

    
774
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
775
                                        " <source-cluster> <dest-cluster>"
776
                                        " <instance...>"),
777
                                 prog=program)
778
  parser.add_option(cli.DEBUG_OPT)
779
  parser.add_option(cli.VERBOSE_OPT)
780
  parser.add_option(cli.IALLOCATOR_OPT)
781
  parser.add_option(cli.BACKEND_OPT)
782
  parser.add_option(cli.HVOPTS_OPT)
783
  parser.add_option(cli.OSPARAMS_OPT)
784
  parser.add_option(cli.NET_OPT)
785
  parser.add_option(SRC_RAPI_PORT_OPT)
786
  parser.add_option(SRC_CA_FILE_OPT)
787
  parser.add_option(SRC_USERNAME_OPT)
788
  parser.add_option(SRC_PASSWORD_FILE_OPT)
789
  parser.add_option(DEST_RAPI_PORT_OPT)
790
  parser.add_option(DEST_CA_FILE_OPT)
791
  parser.add_option(DEST_USERNAME_OPT)
792
  parser.add_option(DEST_PASSWORD_FILE_OPT)
793
  parser.add_option(DEST_INSTANCE_NAME_OPT)
794
  parser.add_option(DEST_PRIMARY_NODE_OPT)
795
  parser.add_option(DEST_SECONDARY_NODE_OPT)
796
  parser.add_option(DEST_DISK_TEMPLATE_OPT)
797
  parser.add_option(COMPRESS_OPT)
798
  parser.add_option(PARALLEL_OPT)
799

    
800
  (options, args) = parser.parse_args()
801

    
802
  return (parser, options, args)
803

    
804

    
805
def CheckOptions(parser, options, args):
806
  """Checks options and arguments for validity.
807

    
808
  """
809
  if len(args) < 3:
810
    parser.error("Not enough arguments")
811

    
812
  src_cluster_name = args.pop(0)
813
  dest_cluster_name = args.pop(0)
814
  instance_names = args
815

    
816
  assert len(instance_names) > 0
817

    
818
  # TODO: Remove once using system default paths for SSL certificate
819
  # verification is implemented
820
  if not options.src_ca_file:
821
    parser.error("Missing source cluster CA file")
822

    
823
  if options.parallel < 1:
824
    parser.error("Number of simultaneous moves must be >= 1")
825

    
826
  if (bool(options.iallocator) and
827
      bool(options.dest_primary_node or options.dest_secondary_node)):
828
    parser.error("Destination node and iallocator options exclude each other")
829

    
830
  if len(instance_names) == 1:
831
    # Moving one instance only
832
    if options.hvparams:
833
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
834

    
835
    if options.beparams:
836
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
837

    
838
    if options.nics:
839
      options.nics = cli.ParseNicOption(options.nics)
840
  else:
841
    # Moving more than one instance
842
    if (options.dest_instance_name or options.dest_primary_node or
843
        options.dest_secondary_node or options.hvparams or
844
        options.beparams or options.osparams or options.nics):
845
      parser.error("The options --dest-instance-name, --dest-primary-node,"
846
                   " --dest-secondary-node, --hypervisor-parameters,"
847
                   " --backend-parameters, --os-parameters and --net can"
848
                   " only be used when moving exactly one instance")
849

    
850
  return (src_cluster_name, dest_cluster_name, instance_names)
851

    
852

    
853
def DestClusterHasDefaultIAllocator(rapi_factory):
854
  """Determines if a given cluster has a default iallocator.
855

    
856
  """
857
  result = rapi_factory.GetDestClient().GetInfo()
858
  ia_name = "default_iallocator"
859
  return ia_name in result and result[ia_name]
860

    
861

    
862
def ExitWithError(message):
863
  """Exits after an error and shows a message.
864

    
865
  """
866
  sys.stderr.write("move-instance: error: " + message + "\n")
867
  sys.exit(constants.EXIT_FAILURE)
868

    
869

    
870
@UsesRapiClient
871
def main():
872
  """Main routine.
873

    
874
  """
875
  (parser, options, args) = ParseOptions()
876

    
877
  utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
878

    
879
  (src_cluster_name, dest_cluster_name, instance_names) = \
880
    CheckOptions(parser, options, args)
881

    
882
  logging.info("Source cluster: %s", src_cluster_name)
883
  logging.info("Destination cluster: %s", dest_cluster_name)
884
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
885

    
886
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
887

    
888
  CheckRapiSetup(rapi_factory)
889

    
890
  has_iallocator = options.iallocator or \
891
                   DestClusterHasDefaultIAllocator(rapi_factory)
892

    
893
  if len(instance_names) > 1 and not has_iallocator:
894
    ExitWithError("When moving multiple nodes, an iallocator must be used. "
895
                  "None was provided and the target cluster does not have "
896
                  "a default iallocator.")
897
  if (len(instance_names) == 1 and not (has_iallocator or
898
      options.dest_primary_node or options.dest_secondary_node)):
899
    ExitWithError("Target cluster does not have a default iallocator, "
900
                  "please specify either destination nodes or an iallocator.")
901

    
902
  # Prepare list of instance moves
903
  moves = []
904
  for src_instance_name in instance_names:
905
    if options.dest_instance_name:
906
      assert len(instance_names) == 1
907
      # Rename instance
908
      dest_instance_name = options.dest_instance_name
909
    else:
910
      dest_instance_name = src_instance_name
911

    
912
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
913
                              options.dest_primary_node,
914
                              options.dest_secondary_node,
915
                              options.compress,
916
                              options.iallocator,
917
                              options.dest_disk_template,
918
                              options.hvparams,
919
                              options.beparams,
920
                              options.osparams,
921
                              options.nics))
922

    
923
  assert len(moves) == len(instance_names)
924

    
925
  # Start workerpool
926
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
927
  try:
928
    # Add instance moves to workerpool
929
    for move in moves:
930
      wp.AddTask((rapi_factory, move))
931

    
932
    # Wait for all moves to finish
933
    wp.Quiesce()
934

    
935
  finally:
936
    wp.TerminateWorkers()
937

    
938
  # There should be no threads running at this point, hence not using locks
939
  # anymore
940

    
941
  logging.info("Instance move results:")
942

    
943
  for move in moves:
944
    if move.dest_instance_name == move.src_instance_name:
945
      name = move.src_instance_name
946
    else:
947
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
948

    
949
    if move.error_message:
950
      msg = "Failed (%s)" % move.error_message
951
    else:
952
      msg = "Success"
953

    
954
    logging.info("%s: %s", name, msg)
955

    
956
  if compat.any(move.error_message for move in moves):
957
    sys.exit(constants.EXIT_FAILURE)
958

    
959
  sys.exit(constants.EXIT_SUCCESS)
960

    
961

    
962
if __name__ == "__main__":
963
  main()