Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ 2c4ec08e

History | View | Annotate | Download (34.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import objects
40
from ganeti import compat
41
from ganeti import rapi
42

    
43
import ganeti.rapi.client # pylint: disable=W0611
44
import ganeti.rapi.client_utils
45
from ganeti.rapi.client import UsesRapiClient
46

    
47

    
48
SRC_RAPI_PORT_OPT = \
49
  cli.cli_option("--src-rapi-port", action="store", type="int",
50
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
51
                 help=("Source cluster RAPI port (defaults to %s)" %
52
                       constants.DEFAULT_RAPI_PORT))
53

    
54
SRC_CA_FILE_OPT = \
55
  cli.cli_option("--src-ca-file", action="store", type="string",
56
                 dest="src_ca_file",
57
                 help=("File containing source cluster Certificate"
58
                       " Authority (CA) in PEM format"))
59

    
60
SRC_USERNAME_OPT = \
61
  cli.cli_option("--src-username", action="store", type="string",
62
                 dest="src_username", default=None,
63
                 help="Source cluster username")
64

    
65
SRC_PASSWORD_FILE_OPT = \
66
  cli.cli_option("--src-password-file", action="store", type="string",
67
                 dest="src_password_file",
68
                 help="File containing source cluster password")
69

    
70
DEST_RAPI_PORT_OPT = \
71
  cli.cli_option("--dest-rapi-port", action="store", type="int",
72
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
73
                 help=("Destination cluster RAPI port (defaults to source"
74
                       " cluster RAPI port)"))
75

    
76
DEST_CA_FILE_OPT = \
77
  cli.cli_option("--dest-ca-file", action="store", type="string",
78
                 dest="dest_ca_file",
79
                 help=("File containing destination cluster Certificate"
80
                       " Authority (CA) in PEM format (defaults to source"
81
                       " cluster CA)"))
82

    
83
DEST_USERNAME_OPT = \
84
  cli.cli_option("--dest-username", action="store", type="string",
85
                 dest="dest_username", default=None,
86
                 help=("Destination cluster username (defaults to"
87
                       " source cluster username)"))
88

    
89
DEST_PASSWORD_FILE_OPT = \
90
  cli.cli_option("--dest-password-file", action="store", type="string",
91
                 dest="dest_password_file",
92
                 help=("File containing destination cluster password"
93
                       " (defaults to source cluster password)"))
94

    
95
DEST_INSTANCE_NAME_OPT = \
96
  cli.cli_option("--dest-instance-name", action="store", type="string",
97
                 dest="dest_instance_name",
98
                 help=("Instance name on destination cluster (only"
99
                       " when moving exactly one instance)"))
100

    
101
DEST_PRIMARY_NODE_OPT = \
102
  cli.cli_option("--dest-primary-node", action="store", type="string",
103
                 dest="dest_primary_node",
104
                 help=("Primary node on destination cluster (only"
105
                       " when moving exactly one instance)"))
106

    
107
DEST_SECONDARY_NODE_OPT = \
108
  cli.cli_option("--dest-secondary-node", action="store", type="string",
109
                 dest="dest_secondary_node",
110
                 help=("Secondary node on destination cluster (only"
111
                       " when moving exactly one instance)"))
112

    
113
DEST_DISK_TEMPLATE_OPT = \
114
  cli.cli_option("--dest-disk-template", action="store", type="string",
115
                 dest="dest_disk_template", default=None,
116
                 help="Disk template to use on destination cluster")
117

    
118
COMPRESS_OPT = \
119
  cli.cli_option("--compress", action="store", type="string",
120
                 dest="compress", default="none",
121
                 help="Compression mode to use during the move (this mode has"
122
                      " to be supported by both clusters)")
123

    
124
PARALLEL_OPT = \
125
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
126
                 dest="parallel", metavar="<number>",
127
                 help="Number of instances to be moved simultaneously")
128

    
129
OPPORTUNISTIC_TRIES_OPT = \
130
  cli.cli_option("--opportunistic-tries", action="store", type="int",
131
                 dest="opportunistic_tries", metavar="<number>",
132
                 help="Number of opportunistic instance creation attempts"
133
                      " before a normal creation is performed. An opportunistic"
134
                      " attempt will use the iallocator with all the nodes"
135
                      " currently unlocked, failing if not enough nodes are"
136
                      " available. Even though it will succeed (or fail) more"
137
                      " quickly, it can result in suboptimal instance"
138
                      " placement")
139

    
140
OPPORTUNISTIC_DELAY_OPT = \
141
  cli.cli_option("--opportunistic-delay", action="store", type="int",
142
                 dest="opportunistic_delay", metavar="<number>",
143
                 help="The delay between successive opportunistic instance"
144
                      " creation attempts, in seconds")
145

    
146

    
147
class Error(Exception):
148
  """Generic error.
149

    
150
  """
151

    
152

    
153
class Abort(Error):
154
  """Special exception for aborting import/export.
155

    
156
  """
157

    
158

    
159
class RapiClientFactory:
160
  """Factory class for creating RAPI clients.
161

    
162
  @ivar src_cluster_name: Source cluster name
163
  @ivar dest_cluster_name: Destination cluster name
164
  @ivar GetSourceClient: Callable returning new client for source cluster
165
  @ivar GetDestClient: Callable returning new client for destination cluster
166

    
167
  """
168
  def __init__(self, options, src_cluster_name, dest_cluster_name):
169
    """Initializes this class.
170

    
171
    @param options: Program options
172
    @type src_cluster_name: string
173
    @param src_cluster_name: Source cluster name
174
    @type dest_cluster_name: string
175
    @param dest_cluster_name: Destination cluster name
176

    
177
    """
178
    self.src_cluster_name = src_cluster_name
179
    self.dest_cluster_name = dest_cluster_name
180

    
181
    # TODO: Implement timeouts for RAPI connections
182
    # TODO: Support for using system default paths for verifying SSL certificate
183
    logging.debug("Using '%s' as source CA", options.src_ca_file)
184
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
185

    
186
    if options.dest_ca_file:
187
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
188
      dest_curl_config = \
189
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
190
    else:
191
      logging.debug("Using source CA for destination")
192
      dest_curl_config = src_curl_config
193

    
194
    logging.debug("Source RAPI server is %s:%s",
195
                  src_cluster_name, options.src_rapi_port)
196
    logging.debug("Source username is '%s'", options.src_username)
197

    
198
    if options.src_username is None:
199
      src_username = ""
200
    else:
201
      src_username = options.src_username
202

    
203
    if options.src_password_file:
204
      logging.debug("Reading '%s' for source password",
205
                    options.src_password_file)
206
      src_password = utils.ReadOneLineFile(options.src_password_file,
207
                                           strict=True)
208
    else:
209
      logging.debug("Source has no password")
210
      src_password = None
211

    
212
    self.GetSourceClient = lambda: \
213
      rapi.client.GanetiRapiClient(src_cluster_name,
214
                                   port=options.src_rapi_port,
215
                                   curl_config_fn=src_curl_config,
216
                                   username=src_username,
217
                                   password=src_password)
218

    
219
    if options.dest_rapi_port:
220
      dest_rapi_port = options.dest_rapi_port
221
    else:
222
      dest_rapi_port = options.src_rapi_port
223

    
224
    if options.dest_username is None:
225
      dest_username = src_username
226
    else:
227
      dest_username = options.dest_username
228

    
229
    logging.debug("Destination RAPI server is %s:%s",
230
                  dest_cluster_name, dest_rapi_port)
231
    logging.debug("Destination username is '%s'", dest_username)
232

    
233
    if options.dest_password_file:
234
      logging.debug("Reading '%s' for destination password",
235
                    options.dest_password_file)
236
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
237
                                            strict=True)
238
    else:
239
      logging.debug("Using source password for destination")
240
      dest_password = src_password
241

    
242
    self.GetDestClient = lambda: \
243
      rapi.client.GanetiRapiClient(dest_cluster_name,
244
                                   port=dest_rapi_port,
245
                                   curl_config_fn=dest_curl_config,
246
                                   username=dest_username,
247
                                   password=dest_password)
248

    
249

    
250
class MoveJobPollReportCb(cli.JobPollReportCbBase):
251
  def __init__(self, abort_check_fn, remote_import_fn):
252
    """Initializes this class.
253

    
254
    @type abort_check_fn: callable
255
    @param abort_check_fn: Function to check whether move is aborted
256
    @type remote_import_fn: callable or None
257
    @param remote_import_fn: Callback for reporting received remote import
258
                             information
259

    
260
    """
261
    cli.JobPollReportCbBase.__init__(self)
262
    self._abort_check_fn = abort_check_fn
263
    self._remote_import_fn = remote_import_fn
264

    
265
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
266
    """Handles a log message.
267

    
268
    """
269
    if log_type == constants.ELOG_REMOTE_IMPORT:
270
      logging.debug("Received remote import information")
271

    
272
      if not self._remote_import_fn:
273
        raise RuntimeError("Received unexpected remote import information")
274

    
275
      assert "x509_ca" in log_msg
276
      assert "disks" in log_msg
277

    
278
      self._remote_import_fn(log_msg)
279

    
280
      return
281

    
282
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
283
                 cli.FormatLogMessage(log_type, log_msg))
284

    
285
  def ReportNotChanged(self, job_id, status):
286
    """Called if a job hasn't changed in a while.
287

    
288
    """
289
    try:
290
      # Check whether we were told to abort by the other thread
291
      self._abort_check_fn()
292
    except Abort:
293
      logging.warning("Aborting despite job %s still running", job_id)
294
      raise
295

    
296

    
297
class InstanceMove(object):
298
  """Status class for instance moves.
299

    
300
  """
301
  def __init__(self, src_instance_name, dest_instance_name,
302
               dest_pnode, dest_snode, compress, dest_iallocator,
303
               dest_disk_template, hvparams,
304
               beparams, osparams, nics):
305
    """Initializes this class.
306

    
307
    @type src_instance_name: string
308
    @param src_instance_name: Instance name on source cluster
309
    @type dest_instance_name: string
310
    @param dest_instance_name: Instance name on destination cluster
311
    @type dest_pnode: string or None
312
    @param dest_pnode: Name of primary node on destination cluster
313
    @type dest_snode: string or None
314
    @param dest_snode: Name of secondary node on destination cluster
315
    @type compress; string
316
    @param compress: Compression mode to use (has to be supported on both
317
                     clusters)
318
    @type dest_iallocator: string or None
319
    @param dest_iallocator: Name of iallocator to use
320
    @type dest_disk_template: string or None
321
    @param dest_disk_template: Disk template to use instead of the original one
322
    @type hvparams: dict or None
323
    @param hvparams: Hypervisor parameters to override
324
    @type beparams: dict or None
325
    @param beparams: Backend parameters to override
326
    @type osparams: dict or None
327
    @param osparams: OS parameters to override
328
    @type nics: dict or None
329
    @param nics: NICs to override
330

    
331
    """
332
    self.src_instance_name = src_instance_name
333
    self.dest_instance_name = dest_instance_name
334
    self.dest_pnode = dest_pnode
335
    self.dest_snode = dest_snode
336
    self.compress = compress
337
    self.dest_iallocator = dest_iallocator
338
    self.dest_disk_template = dest_disk_template
339
    self.hvparams = hvparams
340
    self.beparams = beparams
341
    self.osparams = osparams
342
    self.nics = nics
343

    
344
    self.error_message = None
345

    
346

    
347
class MoveRuntime(object):
348
  """Class to keep track of instance move.
349

    
350
  """
351
  def __init__(self, move):
352
    """Initializes this class.
353

    
354
    @type move: L{InstanceMove}
355

    
356
    """
357
    self.move = move
358

    
359
    # Thread synchronization
360
    self.lock = threading.Lock()
361
    self.source_to_dest = threading.Condition(self.lock)
362
    self.dest_to_source = threading.Condition(self.lock)
363

    
364
    # Source information
365
    self.src_error_message = None
366
    self.src_expinfo = None
367
    self.src_instinfo = None
368

    
369
    # Destination information
370
    self.dest_error_message = None
371
    self.dest_impinfo = None
372

    
373
  def HandleErrors(self, prefix, fn, *args):
374
    """Wrapper to catch errors and abort threads.
375

    
376
    @type prefix: string
377
    @param prefix: Variable name prefix ("src" or "dest")
378
    @type fn: callable
379
    @param fn: Function
380

    
381
    """
382
    assert prefix in ("dest", "src")
383

    
384
    try:
385
      # Call inner function
386
      fn(*args)
387

    
388
      errmsg = None
389
    except Abort:
390
      errmsg = "Aborted"
391
    except Exception, err:
392
      logging.exception("Caught unhandled exception")
393
      errmsg = str(err)
394

    
395
    setattr(self, "%s_error_message" % prefix, errmsg)
396

    
397
    self.lock.acquire()
398
    try:
399
      self.source_to_dest.notifyAll()
400
      self.dest_to_source.notifyAll()
401
    finally:
402
      self.lock.release()
403

    
404
  def CheckAbort(self):
405
    """Check whether thread should be aborted.
406

    
407
    @raise Abort: When thread should be aborted
408

    
409
    """
410
    if not (self.src_error_message is None and
411
            self.dest_error_message is None):
412
      logging.info("Aborting")
413
      raise Abort()
414

    
415
  def Wait(self, cond, check_fn):
416
    """Waits for a condition to become true.
417

    
418
    @type cond: threading.Condition
419
    @param cond: Threading condition
420
    @type check_fn: callable
421
    @param check_fn: Function to check whether condition is true
422

    
423
    """
424
    cond.acquire()
425
    try:
426
      while check_fn(self):
427
        self.CheckAbort()
428
        cond.wait()
429
    finally:
430
      cond.release()
431

    
432
  def PollJob(self, cl, job_id, remote_import_fn=None):
433
    """Wrapper for polling a job.
434

    
435
    @type cl: L{rapi.client.GanetiRapiClient}
436
    @param cl: RAPI client
437
    @type job_id: string
438
    @param job_id: Job ID
439
    @type remote_import_fn: callable or None
440
    @param remote_import_fn: Callback for reporting received remote import
441
                             information
442

    
443
    """
444
    return rapi.client_utils.PollJob(cl, job_id,
445
                                     MoveJobPollReportCb(self.CheckAbort,
446
                                                         remote_import_fn))
447

    
448

    
449
class MoveDestExecutor(object):
450
  def __init__(self, dest_client, mrt):
451
    """Destination side of an instance move.
452

    
453
    @type dest_client: L{rapi.client.GanetiRapiClient}
454
    @param dest_client: RAPI client
455
    @type mrt: L{MoveRuntime}
456
    @param mrt: Instance move runtime information
457

    
458
    """
459
    logging.debug("Waiting for instance information to become available")
460
    mrt.Wait(mrt.source_to_dest,
461
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
462

    
463
    logging.info("Creating instance %s in remote-import mode",
464
                 mrt.move.dest_instance_name)
465
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
466
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
467
                                  mrt.move.compress,
468
                                  mrt.move.dest_iallocator,
469
                                  mrt.move.dest_disk_template,
470
                                  mrt.src_instinfo, mrt.src_expinfo,
471
                                  mrt.move.hvparams, mrt.move.beparams,
472
                                  mrt.move.beparams, mrt.move.nics)
473
    mrt.PollJob(dest_client, job_id,
474
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
475

    
476
    logging.info("Import successful")
477

    
478
  @staticmethod
479
  def _SetImportInfo(mrt, impinfo):
480
    """Sets the remote import information and notifies source thread.
481

    
482
    @type mrt: L{MoveRuntime}
483
    @param mrt: Instance move runtime information
484
    @param impinfo: Remote import information
485

    
486
    """
487
    mrt.dest_to_source.acquire()
488
    try:
489
      mrt.dest_impinfo = impinfo
490
      mrt.dest_to_source.notifyAll()
491
    finally:
492
      mrt.dest_to_source.release()
493

    
494
  @staticmethod
495
  def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
496
                      dest_disk_template, instance, expinfo, override_hvparams,
497
                      override_beparams, override_osparams, override_nics):
498
    """Starts the instance creation in remote import mode.
499

    
500
    @type cl: L{rapi.client.GanetiRapiClient}
501
    @param cl: RAPI client
502
    @type name: string
503
    @param name: Instance name
504
    @type pnode: string or None
505
    @param pnode: Name of primary node on destination cluster
506
    @type snode: string or None
507
    @param snode: Name of secondary node on destination cluster
508
    @type compress: string
509
    @param compress: Compression mode to use
510
    @type iallocator: string or None
511
    @param iallocator: Name of iallocator to use
512
    @type dest_disk_template: string or None
513
    @param dest_disk_template: Disk template to use instead of the original one
514
    @type instance: dict
515
    @param instance: Instance details from source cluster
516
    @type expinfo: dict
517
    @param expinfo: Prepared export information from source cluster
518
    @type override_hvparams: dict or None
519
    @param override_hvparams: Hypervisor parameters to override
520
    @type override_beparams: dict or None
521
    @param override_beparams: Backend parameters to override
522
    @type override_osparams: dict or None
523
    @param override_osparams: OS parameters to override
524
    @type override_nics: dict or None
525
    @param override_nics: NICs to override
526
    @return: Job ID
527

    
528
    """
529
    if dest_disk_template:
530
      disk_template = dest_disk_template
531
    else:
532
      disk_template = instance["disk_template"]
533

    
534
    disks = []
535
    for idisk in instance["disks"]:
536
      odisk = {
537
        constants.IDISK_SIZE: idisk["size"],
538
        constants.IDISK_MODE: idisk["mode"],
539
        constants.IDISK_NAME: str(idisk.get("name")),
540
        }
541
      spindles = idisk.get("spindles")
542
      if spindles is not None:
543
        odisk[constants.IDISK_SPINDLES] = spindles
544
      disks.append(odisk)
545

    
546
    try:
547
      nics = [{
548
        constants.INIC_IP: ip,
549
        constants.INIC_MAC: mac,
550
        constants.INIC_MODE: mode,
551
        constants.INIC_LINK: link,
552
        constants.INIC_VLAN: vlan,
553
        constants.INIC_NETWORK: network,
554
        constants.INIC_NAME: nic_name
555
        } for nic_name, _, ip, mac, mode, link, vlan, network, _
556
          in instance["nics"]]
557
    except ValueError:
558
      raise Error("Received NIC information does not match expected format; "
559
                  "Do the versions of this tool and the source cluster match?")
560

    
561
    if len(override_nics) > len(nics):
562
      raise Error("Can not create new NICs")
563

    
564
    if override_nics:
565
      assert len(override_nics) <= len(nics)
566
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
567
        nics[idx] = objects.FillDict(nic, override)
568

    
569
    # TODO: Should this be the actual up/down status? (run_state)
570
    start = (instance["config_state"] == "up")
571

    
572
    assert len(disks) == len(instance["disks"])
573
    assert len(nics) == len(instance["nics"])
574

    
575
    inst_beparams = instance["be_instance"]
576
    if not inst_beparams:
577
      inst_beparams = {}
578

    
579
    inst_hvparams = instance["hv_instance"]
580
    if not inst_hvparams:
581
      inst_hvparams = {}
582

    
583
    inst_osparams = instance["os_instance"]
584
    if not inst_osparams:
585
      inst_osparams = {}
586

    
587
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
588
                             name, disk_template, disks, nics,
589
                             os=instance["os"],
590
                             pnode=pnode,
591
                             snode=snode,
592
                             start=start,
593
                             ip_check=False,
594
                             iallocator=iallocator,
595
                             hypervisor=instance["hypervisor"],
596
                             source_handshake=expinfo["handshake"],
597
                             source_x509_ca=expinfo["x509_ca"],
598
                             compress=compress,
599
                             source_instance_name=instance["name"],
600
                             beparams=objects.FillDict(inst_beparams,
601
                                                       override_beparams),
602
                             hvparams=objects.FillDict(inst_hvparams,
603
                                                       override_hvparams),
604
                             osparams=objects.FillDict(inst_osparams,
605
                                                       override_osparams))
606

    
607

    
608
class MoveSourceExecutor(object):
609
  def __init__(self, src_client, mrt):
610
    """Source side of an instance move.
611

    
612
    @type src_client: L{rapi.client.GanetiRapiClient}
613
    @param src_client: RAPI client
614
    @type mrt: L{MoveRuntime}
615
    @param mrt: Instance move runtime information
616

    
617
    """
618
    logging.info("Checking whether instance exists")
619
    self._CheckInstance(src_client, mrt.move.src_instance_name)
620

    
621
    logging.info("Retrieving instance information from source cluster")
622
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
623
                                     mrt.move.src_instance_name)
624
    if (instinfo["disk_template"] in constants.DTS_FILEBASED):
625
      raise Error("Inter-cluster move of file-based instances is not"
626
                  " supported.")
627

    
628
    logging.info("Preparing export on source cluster")
629
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
630
                                  mrt.move.src_instance_name)
631
    assert "handshake" in expinfo
632
    assert "x509_key_name" in expinfo
633
    assert "x509_ca" in expinfo
634

    
635
    # Hand information to destination thread
636
    mrt.source_to_dest.acquire()
637
    try:
638
      mrt.src_instinfo = instinfo
639
      mrt.src_expinfo = expinfo
640
      mrt.source_to_dest.notifyAll()
641
    finally:
642
      mrt.source_to_dest.release()
643

    
644
    logging.info("Waiting for destination information to become available")
645
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
646

    
647
    logging.info("Starting remote export on source cluster")
648
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
649
                         expinfo["x509_key_name"], mrt.move.compress,
650
                         mrt.dest_impinfo)
651

    
652
    logging.info("Export successful")
653

    
654
  @staticmethod
655
  def _CheckInstance(cl, name):
656
    """Checks whether the instance exists on the source cluster.
657

    
658
    @type cl: L{rapi.client.GanetiRapiClient}
659
    @param cl: RAPI client
660
    @type name: string
661
    @param name: Instance name
662

    
663
    """
664
    try:
665
      cl.GetInstance(name)
666
    except rapi.client.GanetiApiError, err:
667
      if err.code == rapi.client.HTTP_NOT_FOUND:
668
        raise Error("Instance %s not found (%s)" % (name, str(err)))
669
      raise
670

    
671
  @staticmethod
672
  def _GetInstanceInfo(cl, poll_job_fn, name):
673
    """Retrieves detailed instance information from source cluster.
674

    
675
    @type cl: L{rapi.client.GanetiRapiClient}
676
    @param cl: RAPI client
677
    @type poll_job_fn: callable
678
    @param poll_job_fn: Function to poll for job result
679
    @type name: string
680
    @param name: Instance name
681

    
682
    """
683
    job_id = cl.GetInstanceInfo(name, static=True)
684
    result = poll_job_fn(cl, job_id)
685
    assert len(result[0].keys()) == 1
686
    return result[0][result[0].keys()[0]]
687

    
688
  @staticmethod
689
  def _PrepareExport(cl, poll_job_fn, name):
690
    """Prepares export on source cluster.
691

    
692
    @type cl: L{rapi.client.GanetiRapiClient}
693
    @param cl: RAPI client
694
    @type poll_job_fn: callable
695
    @param poll_job_fn: Function to poll for job result
696
    @type name: string
697
    @param name: Instance name
698

    
699
    """
700
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
701
    return poll_job_fn(cl, job_id)[0]
702

    
703
  @staticmethod
704
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
705
    """Exports instance from source cluster.
706

    
707
    @type cl: L{rapi.client.GanetiRapiClient}
708
    @param cl: RAPI client
709
    @type poll_job_fn: callable
710
    @param poll_job_fn: Function to poll for job result
711
    @type name: string
712
    @param name: Instance name
713
    @param x509_key_name: Source X509 key
714
    @type compress: string
715
    @param compress: Compression mode to use
716
    @param impinfo: Import information from destination cluster
717

    
718
    """
719
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
720
                               impinfo["disks"], shutdown=True,
721
                               remove_instance=True,
722
                               x509_key_name=x509_key_name,
723
                               destination_x509_ca=impinfo["x509_ca"],
724
                               compress=compress)
725
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
726

    
727
    if not (fin_resu and compat.all(dresults)):
728
      raise Error("Export failed for disks %s" %
729
                  utils.CommaJoin(str(idx) for idx, result
730
                                  in enumerate(dresults) if not result))
731

    
732

    
733
class MoveSourceWorker(workerpool.BaseWorker):
734
  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
735
    """Executes an instance move.
736

    
737
    @type rapi_factory: L{RapiClientFactory}
738
    @param rapi_factory: RAPI client factory
739
    @type move: L{InstanceMove}
740
    @param move: Instance move information
741

    
742
    """
743
    try:
744
      logging.info("Preparing to move %s from cluster %s to %s as %s",
745
                   move.src_instance_name, rapi_factory.src_cluster_name,
746
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
747

    
748
      mrt = MoveRuntime(move)
749

    
750
      logging.debug("Starting destination thread")
751
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
752
                                     target=mrt.HandleErrors,
753
                                     args=("dest", MoveDestExecutor,
754
                                           rapi_factory.GetDestClient(),
755
                                           mrt, ))
756
      dest_thread.start()
757
      try:
758
        mrt.HandleErrors("src", MoveSourceExecutor,
759
                         rapi_factory.GetSourceClient(), mrt)
760
      finally:
761
        dest_thread.join()
762

    
763
      if mrt.src_error_message or mrt.dest_error_message:
764
        move.error_message = ("Source error: %s, destination error: %s" %
765
                              (mrt.src_error_message, mrt.dest_error_message))
766
      else:
767
        move.error_message = None
768
    except Exception, err: # pylint: disable=W0703
769
      logging.exception("Caught unhandled exception")
770
      move.error_message = str(err)
771

    
772

    
773
def CheckRapiSetup(rapi_factory):
774
  """Checks the RAPI setup by retrieving the version.
775

    
776
  @type rapi_factory: L{RapiClientFactory}
777
  @param rapi_factory: RAPI client factory
778

    
779
  """
780
  src_client = rapi_factory.GetSourceClient()
781
  logging.info("Connecting to source RAPI server")
782
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
783

    
784
  dest_client = rapi_factory.GetDestClient()
785
  logging.info("Connecting to destination RAPI server")
786
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
787

    
788

    
789
def ParseOptions():
790
  """Parses options passed to program.
791

    
792
  """
793
  program = os.path.basename(sys.argv[0])
794

    
795
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
796
                                        " <source-cluster> <dest-cluster>"
797
                                        " <instance...>"),
798
                                 prog=program)
799
  parser.add_option(cli.DEBUG_OPT)
800
  parser.add_option(cli.VERBOSE_OPT)
801
  parser.add_option(cli.IALLOCATOR_OPT)
802
  parser.add_option(cli.BACKEND_OPT)
803
  parser.add_option(cli.HVOPTS_OPT)
804
  parser.add_option(cli.OSPARAMS_OPT)
805
  parser.add_option(cli.NET_OPT)
806
  parser.add_option(SRC_RAPI_PORT_OPT)
807
  parser.add_option(SRC_CA_FILE_OPT)
808
  parser.add_option(SRC_USERNAME_OPT)
809
  parser.add_option(SRC_PASSWORD_FILE_OPT)
810
  parser.add_option(DEST_RAPI_PORT_OPT)
811
  parser.add_option(DEST_CA_FILE_OPT)
812
  parser.add_option(DEST_USERNAME_OPT)
813
  parser.add_option(DEST_PASSWORD_FILE_OPT)
814
  parser.add_option(DEST_INSTANCE_NAME_OPT)
815
  parser.add_option(DEST_PRIMARY_NODE_OPT)
816
  parser.add_option(DEST_SECONDARY_NODE_OPT)
817
  parser.add_option(DEST_DISK_TEMPLATE_OPT)
818
  parser.add_option(COMPRESS_OPT)
819
  parser.add_option(PARALLEL_OPT)
820
  parser.add_option(OPPORTUNISTIC_TRIES_OPT)
821
  parser.add_option(OPPORTUNISTIC_DELAY_OPT)
822

    
823
  (options, args) = parser.parse_args()
824

    
825
  return (parser, options, args)
826

    
827

    
828
def CheckOptions(parser, options, args):
829
  """Checks options and arguments for validity.
830

    
831
  """
832
  if len(args) < 3:
833
    parser.error("Not enough arguments")
834

    
835
  src_cluster_name = args.pop(0)
836
  dest_cluster_name = args.pop(0)
837
  instance_names = args
838

    
839
  assert len(instance_names) > 0
840

    
841
  # TODO: Remove once using system default paths for SSL certificate
842
  # verification is implemented
843
  if not options.src_ca_file:
844
    parser.error("Missing source cluster CA file")
845

    
846
  if options.parallel < 1:
847
    parser.error("Number of simultaneous moves must be >= 1")
848

    
849
  if (bool(options.iallocator) and
850
      bool(options.dest_primary_node or options.dest_secondary_node)):
851
    parser.error("Destination node and iallocator options exclude each other")
852

    
853
  if (not options.iallocator and (options.opportunistic_tries > 0)):
854
    parser.error("Opportunistic instance creation can only be used with an"
855
                 " iallocator")
856

    
857
  tries_specified = options.opportunistic_tries is not None
858
  delay_specified = options.opportunistic_delay is not None
859
  if tries_specified:
860
    if options.opportunistic_tries < 0:
861
      parser.error("Number of opportunistic creation attempts must be >= 0")
862
    if delay_specified:
863
      if options.opportunistic_delay <= 0:
864
        parser.error("The delay between two successive creation attempts must"
865
                     " be greater than zero")
866
  elif delay_specified:
867
    parser.error("Opportunistic delay can only be specified when opportunistic"
868
                 " tries are used")
869
  else:
870
    # The default values will be provided later
871
    pass
872

    
873
  if len(instance_names) == 1:
874
    # Moving one instance only
875
    if options.hvparams:
876
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
877

    
878
    if options.beparams:
879
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
880

    
881
    if options.nics:
882
      options.nics = cli.ParseNicOption(options.nics)
883
  else:
884
    # Moving more than one instance
885
    if (options.dest_instance_name or options.dest_primary_node or
886
        options.dest_secondary_node or options.hvparams or
887
        options.beparams or options.osparams or options.nics):
888
      parser.error("The options --dest-instance-name, --dest-primary-node,"
889
                   " --dest-secondary-node, --hypervisor-parameters,"
890
                   " --backend-parameters, --os-parameters and --net can"
891
                   " only be used when moving exactly one instance")
892

    
893
  return (src_cluster_name, dest_cluster_name, instance_names)
894

    
895

    
896
def DestClusterHasDefaultIAllocator(rapi_factory):
897
  """Determines if a given cluster has a default iallocator.
898

    
899
  """
900
  result = rapi_factory.GetDestClient().GetInfo()
901
  ia_name = "default_iallocator"
902
  return ia_name in result and result[ia_name]
903

    
904

    
905
def ExitWithError(message):
906
  """Exits after an error and shows a message.
907

    
908
  """
909
  sys.stderr.write("move-instance: error: " + message + "\n")
910
  sys.exit(constants.EXIT_FAILURE)
911

    
912

    
913
@UsesRapiClient
914
def main():
915
  """Main routine.
916

    
917
  """
918
  (parser, options, args) = ParseOptions()
919

    
920
  utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
921

    
922
  (src_cluster_name, dest_cluster_name, instance_names) = \
923
    CheckOptions(parser, options, args)
924

    
925
  logging.info("Source cluster: %s", src_cluster_name)
926
  logging.info("Destination cluster: %s", dest_cluster_name)
927
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
928

    
929
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
930

    
931
  CheckRapiSetup(rapi_factory)
932

    
933
  has_iallocator = options.iallocator or \
934
                   DestClusterHasDefaultIAllocator(rapi_factory)
935

    
936
  if len(instance_names) > 1 and not has_iallocator:
937
    ExitWithError("When moving multiple nodes, an iallocator must be used. "
938
                  "None was provided and the target cluster does not have "
939
                  "a default iallocator.")
940
  if (len(instance_names) == 1 and not (has_iallocator or
941
      options.dest_primary_node or options.dest_secondary_node)):
942
    ExitWithError("Target cluster does not have a default iallocator, "
943
                  "please specify either destination nodes or an iallocator.")
944

    
945
  # Prepare list of instance moves
946
  moves = []
947
  for src_instance_name in instance_names:
948
    if options.dest_instance_name:
949
      assert len(instance_names) == 1
950
      # Rename instance
951
      dest_instance_name = options.dest_instance_name
952
    else:
953
      dest_instance_name = src_instance_name
954

    
955
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
956
                              options.dest_primary_node,
957
                              options.dest_secondary_node,
958
                              options.compress,
959
                              options.iallocator,
960
                              options.dest_disk_template,
961
                              options.hvparams,
962
                              options.beparams,
963
                              options.osparams,
964
                              options.nics))
965

    
966
  assert len(moves) == len(instance_names)
967

    
968
  # Start workerpool
969
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
970
  try:
971
    # Add instance moves to workerpool
972
    for move in moves:
973
      wp.AddTask((rapi_factory, move))
974

    
975
    # Wait for all moves to finish
976
    wp.Quiesce()
977

    
978
  finally:
979
    wp.TerminateWorkers()
980

    
981
  # There should be no threads running at this point, hence not using locks
982
  # anymore
983

    
984
  logging.info("Instance move results:")
985

    
986
  for move in moves:
987
    if move.dest_instance_name == move.src_instance_name:
988
      name = move.src_instance_name
989
    else:
990
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
991

    
992
    if move.error_message:
993
      msg = "Failed (%s)" % move.error_message
994
    else:
995
      msg = "Success"
996

    
997
    logging.info("%s: %s", name, msg)
998

    
999
  if compat.any(move.error_message for move in moves):
1000
    sys.exit(constants.EXIT_FAILURE)
1001

    
1002
  sys.exit(constants.EXIT_SUCCESS)
1003

    
1004

    
1005
if __name__ == "__main__":
1006
  main()