Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ a396b2d6

History | View | Annotate | Download (36.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import objects
40
from ganeti import compat
41
from ganeti import rapi
42
from ganeti import errors
43

    
44
import ganeti.rapi.client # pylint: disable=W0611
45
import ganeti.rapi.client_utils
46
from ganeti.rapi.client import UsesRapiClient
47

    
48

    
49
SRC_RAPI_PORT_OPT = \
50
  cli.cli_option("--src-rapi-port", action="store", type="int",
51
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
52
                 help=("Source cluster RAPI port (defaults to %s)" %
53
                       constants.DEFAULT_RAPI_PORT))
54

    
55
SRC_CA_FILE_OPT = \
56
  cli.cli_option("--src-ca-file", action="store", type="string",
57
                 dest="src_ca_file",
58
                 help=("File containing source cluster Certificate"
59
                       " Authority (CA) in PEM format"))
60

    
61
SRC_USERNAME_OPT = \
62
  cli.cli_option("--src-username", action="store", type="string",
63
                 dest="src_username", default=None,
64
                 help="Source cluster username")
65

    
66
SRC_PASSWORD_FILE_OPT = \
67
  cli.cli_option("--src-password-file", action="store", type="string",
68
                 dest="src_password_file",
69
                 help="File containing source cluster password")
70

    
71
DEST_RAPI_PORT_OPT = \
72
  cli.cli_option("--dest-rapi-port", action="store", type="int",
73
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
74
                 help=("Destination cluster RAPI port (defaults to source"
75
                       " cluster RAPI port)"))
76

    
77
DEST_CA_FILE_OPT = \
78
  cli.cli_option("--dest-ca-file", action="store", type="string",
79
                 dest="dest_ca_file",
80
                 help=("File containing destination cluster Certificate"
81
                       " Authority (CA) in PEM format (defaults to source"
82
                       " cluster CA)"))
83

    
84
DEST_USERNAME_OPT = \
85
  cli.cli_option("--dest-username", action="store", type="string",
86
                 dest="dest_username", default=None,
87
                 help=("Destination cluster username (defaults to"
88
                       " source cluster username)"))
89

    
90
DEST_PASSWORD_FILE_OPT = \
91
  cli.cli_option("--dest-password-file", action="store", type="string",
92
                 dest="dest_password_file",
93
                 help=("File containing destination cluster password"
94
                       " (defaults to source cluster password)"))
95

    
96
DEST_INSTANCE_NAME_OPT = \
97
  cli.cli_option("--dest-instance-name", action="store", type="string",
98
                 dest="dest_instance_name",
99
                 help=("Instance name on destination cluster (only"
100
                       " when moving exactly one instance)"))
101

    
102
DEST_PRIMARY_NODE_OPT = \
103
  cli.cli_option("--dest-primary-node", action="store", type="string",
104
                 dest="dest_primary_node",
105
                 help=("Primary node on destination cluster (only"
106
                       " when moving exactly one instance)"))
107

    
108
DEST_SECONDARY_NODE_OPT = \
109
  cli.cli_option("--dest-secondary-node", action="store", type="string",
110
                 dest="dest_secondary_node",
111
                 help=("Secondary node on destination cluster (only"
112
                       " when moving exactly one instance)"))
113

    
114
DEST_DISK_TEMPLATE_OPT = \
115
  cli.cli_option("--dest-disk-template", action="store", type="string",
116
                 dest="dest_disk_template", default=None,
117
                 help="Disk template to use on destination cluster")
118

    
119
COMPRESS_OPT = \
120
  cli.cli_option("--compress", action="store", type="string",
121
                 dest="compress", default="none",
122
                 help="Compression mode to use during the move (this mode has"
123
                      " to be supported by both clusters)")
124

    
125
PARALLEL_OPT = \
126
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
127
                 dest="parallel", metavar="<number>",
128
                 help="Number of instances to be moved simultaneously")
129

    
130
OPPORTUNISTIC_TRIES_OPT = \
131
  cli.cli_option("--opportunistic-tries", action="store", type="int",
132
                 dest="opportunistic_tries", metavar="<number>",
133
                 help="Number of opportunistic instance creation attempts"
134
                      " before a normal creation is performed. An opportunistic"
135
                      " attempt will use the iallocator with all the nodes"
136
                      " currently unlocked, failing if not enough nodes are"
137
                      " available. Even though it will succeed (or fail) more"
138
                      " quickly, it can result in suboptimal instance"
139
                      " placement")
140

    
141
OPPORTUNISTIC_DELAY_OPT = \
142
  cli.cli_option("--opportunistic-delay", action="store", type="int",
143
                 dest="opportunistic_delay", metavar="<number>",
144
                 help="The delay between successive opportunistic instance"
145
                      " creation attempts, in seconds")
146

    
147

    
148
class Error(Exception):
149
  """Generic error.
150

    
151
  """
152

    
153

    
154
class Abort(Error):
155
  """Special exception for aborting import/export.
156

    
157
  """
158

    
159

    
160
class RapiClientFactory:
161
  """Factory class for creating RAPI clients.
162

    
163
  @ivar src_cluster_name: Source cluster name
164
  @ivar dest_cluster_name: Destination cluster name
165
  @ivar GetSourceClient: Callable returning new client for source cluster
166
  @ivar GetDestClient: Callable returning new client for destination cluster
167

    
168
  """
169
  def __init__(self, options, src_cluster_name, dest_cluster_name):
170
    """Initializes this class.
171

    
172
    @param options: Program options
173
    @type src_cluster_name: string
174
    @param src_cluster_name: Source cluster name
175
    @type dest_cluster_name: string
176
    @param dest_cluster_name: Destination cluster name
177

    
178
    """
179
    self.src_cluster_name = src_cluster_name
180
    self.dest_cluster_name = dest_cluster_name
181

    
182
    # TODO: Implement timeouts for RAPI connections
183
    # TODO: Support for using system default paths for verifying SSL certificate
184
    logging.debug("Using '%s' as source CA", options.src_ca_file)
185
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
186

    
187
    if options.dest_ca_file:
188
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
189
      dest_curl_config = \
190
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
191
    else:
192
      logging.debug("Using source CA for destination")
193
      dest_curl_config = src_curl_config
194

    
195
    logging.debug("Source RAPI server is %s:%s",
196
                  src_cluster_name, options.src_rapi_port)
197
    logging.debug("Source username is '%s'", options.src_username)
198

    
199
    if options.src_username is None:
200
      src_username = ""
201
    else:
202
      src_username = options.src_username
203

    
204
    if options.src_password_file:
205
      logging.debug("Reading '%s' for source password",
206
                    options.src_password_file)
207
      src_password = utils.ReadOneLineFile(options.src_password_file,
208
                                           strict=True)
209
    else:
210
      logging.debug("Source has no password")
211
      src_password = None
212

    
213
    self.GetSourceClient = lambda: \
214
      rapi.client.GanetiRapiClient(src_cluster_name,
215
                                   port=options.src_rapi_port,
216
                                   curl_config_fn=src_curl_config,
217
                                   username=src_username,
218
                                   password=src_password)
219

    
220
    if options.dest_rapi_port:
221
      dest_rapi_port = options.dest_rapi_port
222
    else:
223
      dest_rapi_port = options.src_rapi_port
224

    
225
    if options.dest_username is None:
226
      dest_username = src_username
227
    else:
228
      dest_username = options.dest_username
229

    
230
    logging.debug("Destination RAPI server is %s:%s",
231
                  dest_cluster_name, dest_rapi_port)
232
    logging.debug("Destination username is '%s'", dest_username)
233

    
234
    if options.dest_password_file:
235
      logging.debug("Reading '%s' for destination password",
236
                    options.dest_password_file)
237
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
238
                                            strict=True)
239
    else:
240
      logging.debug("Using source password for destination")
241
      dest_password = src_password
242

    
243
    self.GetDestClient = lambda: \
244
      rapi.client.GanetiRapiClient(dest_cluster_name,
245
                                   port=dest_rapi_port,
246
                                   curl_config_fn=dest_curl_config,
247
                                   username=dest_username,
248
                                   password=dest_password)
249

    
250

    
251
class MoveJobPollReportCb(cli.JobPollReportCbBase):
252
  def __init__(self, abort_check_fn, remote_import_fn):
253
    """Initializes this class.
254

    
255
    @type abort_check_fn: callable
256
    @param abort_check_fn: Function to check whether move is aborted
257
    @type remote_import_fn: callable or None
258
    @param remote_import_fn: Callback for reporting received remote import
259
                             information
260

    
261
    """
262
    cli.JobPollReportCbBase.__init__(self)
263
    self._abort_check_fn = abort_check_fn
264
    self._remote_import_fn = remote_import_fn
265

    
266
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
267
    """Handles a log message.
268

    
269
    """
270
    if log_type == constants.ELOG_REMOTE_IMPORT:
271
      logging.debug("Received remote import information")
272

    
273
      if not self._remote_import_fn:
274
        raise RuntimeError("Received unexpected remote import information")
275

    
276
      assert "x509_ca" in log_msg
277
      assert "disks" in log_msg
278

    
279
      self._remote_import_fn(log_msg)
280

    
281
      return
282

    
283
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
284
                 cli.FormatLogMessage(log_type, log_msg))
285

    
286
  def ReportNotChanged(self, job_id, status):
287
    """Called if a job hasn't changed in a while.
288

    
289
    """
290
    try:
291
      # Check whether we were told to abort by the other thread
292
      self._abort_check_fn()
293
    except Abort:
294
      logging.warning("Aborting despite job %s still running", job_id)
295
      raise
296

    
297

    
298
class InstanceMove(object):
299
  """Status class for instance moves.
300

    
301
  """
302
  def __init__(self, src_instance_name, dest_instance_name,
303
               dest_pnode, dest_snode, compress, dest_iallocator,
304
               dest_disk_template, hvparams,
305
               beparams, osparams, nics, opportunistic_tries,
306
               opportunistic_delay):
307
    """Initializes this class.
308

    
309
    @type src_instance_name: string
310
    @param src_instance_name: Instance name on source cluster
311
    @type dest_instance_name: string
312
    @param dest_instance_name: Instance name on destination cluster
313
    @type dest_pnode: string or None
314
    @param dest_pnode: Name of primary node on destination cluster
315
    @type dest_snode: string or None
316
    @param dest_snode: Name of secondary node on destination cluster
317
    @type compress; string
318
    @param compress: Compression mode to use (has to be supported on both
319
                     clusters)
320
    @type dest_iallocator: string or None
321
    @param dest_iallocator: Name of iallocator to use
322
    @type dest_disk_template: string or None
323
    @param dest_disk_template: Disk template to use instead of the original one
324
    @type hvparams: dict or None
325
    @param hvparams: Hypervisor parameters to override
326
    @type beparams: dict or None
327
    @param beparams: Backend parameters to override
328
    @type osparams: dict or None
329
    @param osparams: OS parameters to override
330
    @type nics: dict or None
331
    @param nics: NICs to override
332
    @type opportunistic_tries: int or None
333
    @param opportunistic_tries: Number of opportunistic creation attempts to
334
                                perform
335
    @type opportunistic_delay: int or None
336
    @param opportunistic_delay: Delay between successive creation attempts, in
337
                                seconds
338

    
339
    """
340
    self.src_instance_name = src_instance_name
341
    self.dest_instance_name = dest_instance_name
342
    self.dest_pnode = dest_pnode
343
    self.dest_snode = dest_snode
344
    self.compress = compress
345
    self.dest_iallocator = dest_iallocator
346
    self.dest_disk_template = dest_disk_template
347
    self.hvparams = hvparams
348
    self.beparams = beparams
349
    self.osparams = osparams
350
    self.nics = nics
351

    
352
    if opportunistic_tries is not None:
353
      self.opportunistic_tries = opportunistic_tries
354
    else:
355
      self.opportunistic_tries = 0
356

    
357
    if opportunistic_delay is not None:
358
      self.opportunistic_delay = opportunistic_delay
359
    else:
360
      self.opportunistic_delay = constants.DEFAULT_OPPORTUNISTIC_RETRY_INTERVAL
361

    
362
    self.error_message = None
363

    
364

    
365
class MoveRuntime(object):
366
  """Class to keep track of instance move.
367

    
368
  """
369
  def __init__(self, move):
370
    """Initializes this class.
371

    
372
    @type move: L{InstanceMove}
373

    
374
    """
375
    self.move = move
376

    
377
    # Thread synchronization
378
    self.lock = threading.Lock()
379
    self.source_to_dest = threading.Condition(self.lock)
380
    self.dest_to_source = threading.Condition(self.lock)
381

    
382
    # Source information
383
    self.src_error_message = None
384
    self.src_expinfo = None
385
    self.src_instinfo = None
386

    
387
    # Destination information
388
    self.dest_error_message = None
389
    self.dest_impinfo = None
390

    
391
  def HandleErrors(self, prefix, fn, *args):
392
    """Wrapper to catch errors and abort threads.
393

    
394
    @type prefix: string
395
    @param prefix: Variable name prefix ("src" or "dest")
396
    @type fn: callable
397
    @param fn: Function
398

    
399
    """
400
    assert prefix in ("dest", "src")
401

    
402
    try:
403
      # Call inner function
404
      fn(*args)
405

    
406
      errmsg = None
407
    except Abort:
408
      errmsg = "Aborted"
409
    except Exception, err:
410
      logging.exception("Caught unhandled exception")
411
      errmsg = str(err)
412

    
413
    setattr(self, "%s_error_message" % prefix, errmsg)
414

    
415
    self.lock.acquire()
416
    try:
417
      self.source_to_dest.notifyAll()
418
      self.dest_to_source.notifyAll()
419
    finally:
420
      self.lock.release()
421

    
422
  def CheckAbort(self):
423
    """Check whether thread should be aborted.
424

    
425
    @raise Abort: When thread should be aborted
426

    
427
    """
428
    if not (self.src_error_message is None and
429
            self.dest_error_message is None):
430
      logging.info("Aborting")
431
      raise Abort()
432

    
433
  def Wait(self, cond, check_fn):
434
    """Waits for a condition to become true.
435

    
436
    @type cond: threading.Condition
437
    @param cond: Threading condition
438
    @type check_fn: callable
439
    @param check_fn: Function to check whether condition is true
440

    
441
    """
442
    cond.acquire()
443
    try:
444
      while check_fn(self):
445
        self.CheckAbort()
446
        cond.wait()
447
    finally:
448
      cond.release()
449

    
450
  def PollJob(self, cl, job_id, remote_import_fn=None):
451
    """Wrapper for polling a job.
452

    
453
    @type cl: L{rapi.client.GanetiRapiClient}
454
    @param cl: RAPI client
455
    @type job_id: string
456
    @param job_id: Job ID
457
    @type remote_import_fn: callable or None
458
    @param remote_import_fn: Callback for reporting received remote import
459
                             information
460

    
461
    """
462
    return rapi.client_utils.PollJob(cl, job_id,
463
                                     MoveJobPollReportCb(self.CheckAbort,
464
                                                         remote_import_fn))
465

    
466

    
467
class MoveDestExecutor(object):
468
  def __init__(self, dest_client, mrt):
469
    """Destination side of an instance move.
470

    
471
    @type dest_client: L{rapi.client.GanetiRapiClient}
472
    @param dest_client: RAPI client
473
    @type mrt: L{MoveRuntime}
474
    @param mrt: Instance move runtime information
475

    
476
    """
477
    logging.debug("Waiting for instance information to become available")
478
    mrt.Wait(mrt.source_to_dest,
479
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
480

    
481
    logging.info("Creating instance %s in remote-import mode",
482
                 mrt.move.dest_instance_name)
483

    
484
    # Depending on whether opportunistic tries are enabled, we may have to
485
    # make multiple creation attempts
486
    creation_attempts = [True] * mrt.move.opportunistic_tries
487

    
488
    # But the last one is never opportunistic, and will block until completion
489
    # or failure
490
    creation_attempts.append(False)
491

    
492
    for is_attempt_opportunistic in creation_attempts:
493
      job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
494
                                    mrt.move.dest_pnode, mrt.move.dest_snode,
495
                                    mrt.move.compress,
496
                                    mrt.move.dest_iallocator,
497
                                    mrt.move.dest_disk_template,
498
                                    mrt.src_instinfo, mrt.src_expinfo,
499
                                    mrt.move.hvparams, mrt.move.beparams,
500
                                    mrt.move.beparams, mrt.move.nics,
501
                                    is_attempt_opportunistic
502
                                    )
503

    
504
      try:
505
        # The completion of this block signifies that the import has been
506
        # completed successfullly
507
        mrt.PollJob(dest_client, job_id,
508
                    remote_import_fn=compat.partial(self._SetImportInfo, mrt))
509
        logging.info("Import successful")
510
        return
511
      except errors.OpPrereqError, err:
512
        # Any exception in the non-opportunistic creation is to be passed on,
513
        # as well as exceptions apart from resources temporarily unavailable
514
        if not is_attempt_opportunistic or \
515
           err.args[1] != rapi.client.ECODE_TEMP_NORES:
516
          raise
517

    
518
      logging.info("Opportunistic attempt unsuccessful, waiting %d seconds"
519
                   " before another creation attempt is made",
520
                   mrt.move.opportunistic_delay)
521
      time.sleep(mrt.move.opportunistic_delay)
522

    
523
  @staticmethod
524
  def _SetImportInfo(mrt, impinfo):
525
    """Sets the remote import information and notifies source thread.
526

    
527
    @type mrt: L{MoveRuntime}
528
    @param mrt: Instance move runtime information
529
    @param impinfo: Remote import information
530

    
531
    """
532
    mrt.dest_to_source.acquire()
533
    try:
534
      mrt.dest_impinfo = impinfo
535
      mrt.dest_to_source.notifyAll()
536
    finally:
537
      mrt.dest_to_source.release()
538

    
539
  @staticmethod
540
  def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
541
                      dest_disk_template, instance, expinfo, override_hvparams,
542
                      override_beparams, override_osparams, override_nics,
543
                      is_attempt_opportunistic):
544
    """Starts the instance creation in remote import mode.
545

    
546
    @type cl: L{rapi.client.GanetiRapiClient}
547
    @param cl: RAPI client
548
    @type name: string
549
    @param name: Instance name
550
    @type pnode: string or None
551
    @param pnode: Name of primary node on destination cluster
552
    @type snode: string or None
553
    @param snode: Name of secondary node on destination cluster
554
    @type compress: string
555
    @param compress: Compression mode to use
556
    @type iallocator: string or None
557
    @param iallocator: Name of iallocator to use
558
    @type dest_disk_template: string or None
559
    @param dest_disk_template: Disk template to use instead of the original one
560
    @type instance: dict
561
    @param instance: Instance details from source cluster
562
    @type expinfo: dict
563
    @param expinfo: Prepared export information from source cluster
564
    @type override_hvparams: dict or None
565
    @param override_hvparams: Hypervisor parameters to override
566
    @type override_beparams: dict or None
567
    @param override_beparams: Backend parameters to override
568
    @type override_osparams: dict or None
569
    @param override_osparams: OS parameters to override
570
    @type override_nics: dict or None
571
    @param override_nics: NICs to override
572
    @type is_attempt_opportunistic: bool
573
    @param is_attempt_opportunistic: Whether to use opportunistic locking or not
574
    @return: Job ID
575

    
576
    """
577
    if dest_disk_template:
578
      disk_template = dest_disk_template
579
    else:
580
      disk_template = instance["disk_template"]
581

    
582
    disks = []
583
    for idisk in instance["disks"]:
584
      odisk = {
585
        constants.IDISK_SIZE: idisk["size"],
586
        constants.IDISK_MODE: idisk["mode"],
587
        constants.IDISK_NAME: str(idisk.get("name")),
588
        }
589
      spindles = idisk.get("spindles")
590
      if spindles is not None:
591
        odisk[constants.IDISK_SPINDLES] = spindles
592
      disks.append(odisk)
593

    
594
    try:
595
      nics = [{
596
        constants.INIC_IP: ip,
597
        constants.INIC_MAC: mac,
598
        constants.INIC_MODE: mode,
599
        constants.INIC_LINK: link,
600
        constants.INIC_VLAN: vlan,
601
        constants.INIC_NETWORK: network,
602
        constants.INIC_NAME: nic_name
603
        } for nic_name, _, ip, mac, mode, link, vlan, network, _
604
          in instance["nics"]]
605
    except ValueError:
606
      raise Error("Received NIC information does not match expected format; "
607
                  "Do the versions of this tool and the source cluster match?")
608

    
609
    if len(override_nics) > len(nics):
610
      raise Error("Can not create new NICs")
611

    
612
    if override_nics:
613
      assert len(override_nics) <= len(nics)
614
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
615
        nics[idx] = objects.FillDict(nic, override)
616

    
617
    # TODO: Should this be the actual up/down status? (run_state)
618
    start = (instance["config_state"] == "up")
619

    
620
    assert len(disks) == len(instance["disks"])
621
    assert len(nics) == len(instance["nics"])
622

    
623
    inst_beparams = instance["be_instance"]
624
    if not inst_beparams:
625
      inst_beparams = {}
626

    
627
    inst_hvparams = instance["hv_instance"]
628
    if not inst_hvparams:
629
      inst_hvparams = {}
630

    
631
    inst_osparams = instance["os_instance"]
632
    if not inst_osparams:
633
      inst_osparams = {}
634

    
635
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
636
                             name, disk_template, disks, nics,
637
                             os=instance["os"],
638
                             pnode=pnode,
639
                             snode=snode,
640
                             start=start,
641
                             ip_check=False,
642
                             iallocator=iallocator,
643
                             hypervisor=instance["hypervisor"],
644
                             source_handshake=expinfo["handshake"],
645
                             source_x509_ca=expinfo["x509_ca"],
646
                             compress=compress,
647
                             source_instance_name=instance["name"],
648
                             beparams=objects.FillDict(inst_beparams,
649
                                                       override_beparams),
650
                             hvparams=objects.FillDict(inst_hvparams,
651
                                                       override_hvparams),
652
                             osparams=objects.FillDict(inst_osparams,
653
                                                       override_osparams),
654
                             opportunistic_locking=is_attempt_opportunistic
655
                             )
656

    
657

    
658
class MoveSourceExecutor(object):
659
  def __init__(self, src_client, mrt):
660
    """Source side of an instance move.
661

    
662
    @type src_client: L{rapi.client.GanetiRapiClient}
663
    @param src_client: RAPI client
664
    @type mrt: L{MoveRuntime}
665
    @param mrt: Instance move runtime information
666

    
667
    """
668
    logging.info("Checking whether instance exists")
669
    self._CheckInstance(src_client, mrt.move.src_instance_name)
670

    
671
    logging.info("Retrieving instance information from source cluster")
672
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
673
                                     mrt.move.src_instance_name)
674
    if (instinfo["disk_template"] in constants.DTS_FILEBASED):
675
      raise Error("Inter-cluster move of file-based instances is not"
676
                  " supported.")
677

    
678
    logging.info("Preparing export on source cluster")
679
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
680
                                  mrt.move.src_instance_name)
681
    assert "handshake" in expinfo
682
    assert "x509_key_name" in expinfo
683
    assert "x509_ca" in expinfo
684

    
685
    # Hand information to destination thread
686
    mrt.source_to_dest.acquire()
687
    try:
688
      mrt.src_instinfo = instinfo
689
      mrt.src_expinfo = expinfo
690
      mrt.source_to_dest.notifyAll()
691
    finally:
692
      mrt.source_to_dest.release()
693

    
694
    logging.info("Waiting for destination information to become available")
695
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
696

    
697
    logging.info("Starting remote export on source cluster")
698
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
699
                         expinfo["x509_key_name"], mrt.move.compress,
700
                         mrt.dest_impinfo)
701

    
702
    logging.info("Export successful")
703

    
704
  @staticmethod
705
  def _CheckInstance(cl, name):
706
    """Checks whether the instance exists on the source cluster.
707

    
708
    @type cl: L{rapi.client.GanetiRapiClient}
709
    @param cl: RAPI client
710
    @type name: string
711
    @param name: Instance name
712

    
713
    """
714
    try:
715
      cl.GetInstance(name)
716
    except rapi.client.GanetiApiError, err:
717
      if err.code == rapi.client.HTTP_NOT_FOUND:
718
        raise Error("Instance %s not found (%s)" % (name, str(err)))
719
      raise
720

    
721
  @staticmethod
722
  def _GetInstanceInfo(cl, poll_job_fn, name):
723
    """Retrieves detailed instance information from source cluster.
724

    
725
    @type cl: L{rapi.client.GanetiRapiClient}
726
    @param cl: RAPI client
727
    @type poll_job_fn: callable
728
    @param poll_job_fn: Function to poll for job result
729
    @type name: string
730
    @param name: Instance name
731

    
732
    """
733
    job_id = cl.GetInstanceInfo(name, static=True)
734
    result = poll_job_fn(cl, job_id)
735
    assert len(result[0].keys()) == 1
736
    return result[0][result[0].keys()[0]]
737

    
738
  @staticmethod
739
  def _PrepareExport(cl, poll_job_fn, name):
740
    """Prepares export on source cluster.
741

    
742
    @type cl: L{rapi.client.GanetiRapiClient}
743
    @param cl: RAPI client
744
    @type poll_job_fn: callable
745
    @param poll_job_fn: Function to poll for job result
746
    @type name: string
747
    @param name: Instance name
748

    
749
    """
750
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
751
    return poll_job_fn(cl, job_id)[0]
752

    
753
  @staticmethod
754
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
755
    """Exports instance from source cluster.
756

    
757
    @type cl: L{rapi.client.GanetiRapiClient}
758
    @param cl: RAPI client
759
    @type poll_job_fn: callable
760
    @param poll_job_fn: Function to poll for job result
761
    @type name: string
762
    @param name: Instance name
763
    @param x509_key_name: Source X509 key
764
    @type compress: string
765
    @param compress: Compression mode to use
766
    @param impinfo: Import information from destination cluster
767

    
768
    """
769
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
770
                               impinfo["disks"], shutdown=True,
771
                               remove_instance=True,
772
                               x509_key_name=x509_key_name,
773
                               destination_x509_ca=impinfo["x509_ca"],
774
                               compress=compress)
775
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
776

    
777
    if not (fin_resu and compat.all(dresults)):
778
      raise Error("Export failed for disks %s" %
779
                  utils.CommaJoin(str(idx) for idx, result
780
                                  in enumerate(dresults) if not result))
781

    
782

    
783
class MoveSourceWorker(workerpool.BaseWorker):
784
  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
785
    """Executes an instance move.
786

    
787
    @type rapi_factory: L{RapiClientFactory}
788
    @param rapi_factory: RAPI client factory
789
    @type move: L{InstanceMove}
790
    @param move: Instance move information
791

    
792
    """
793
    try:
794
      logging.info("Preparing to move %s from cluster %s to %s as %s",
795
                   move.src_instance_name, rapi_factory.src_cluster_name,
796
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
797

    
798
      mrt = MoveRuntime(move)
799

    
800
      logging.debug("Starting destination thread")
801
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
802
                                     target=mrt.HandleErrors,
803
                                     args=("dest", MoveDestExecutor,
804
                                           rapi_factory.GetDestClient(),
805
                                           mrt, ))
806
      dest_thread.start()
807
      try:
808
        mrt.HandleErrors("src", MoveSourceExecutor,
809
                         rapi_factory.GetSourceClient(), mrt)
810
      finally:
811
        dest_thread.join()
812

    
813
      if mrt.src_error_message or mrt.dest_error_message:
814
        move.error_message = ("Source error: %s, destination error: %s" %
815
                              (mrt.src_error_message, mrt.dest_error_message))
816
      else:
817
        move.error_message = None
818
    except Exception, err: # pylint: disable=W0703
819
      logging.exception("Caught unhandled exception")
820
      move.error_message = str(err)
821

    
822

    
823
def CheckRapiSetup(rapi_factory):
824
  """Checks the RAPI setup by retrieving the version.
825

    
826
  @type rapi_factory: L{RapiClientFactory}
827
  @param rapi_factory: RAPI client factory
828

    
829
  """
830
  src_client = rapi_factory.GetSourceClient()
831
  logging.info("Connecting to source RAPI server")
832
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
833

    
834
  dest_client = rapi_factory.GetDestClient()
835
  logging.info("Connecting to destination RAPI server")
836
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
837

    
838

    
839
def ParseOptions():
840
  """Parses options passed to program.
841

    
842
  """
843
  program = os.path.basename(sys.argv[0])
844

    
845
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
846
                                        " <source-cluster> <dest-cluster>"
847
                                        " <instance...>"),
848
                                 prog=program)
849
  parser.add_option(cli.DEBUG_OPT)
850
  parser.add_option(cli.VERBOSE_OPT)
851
  parser.add_option(cli.IALLOCATOR_OPT)
852
  parser.add_option(cli.BACKEND_OPT)
853
  parser.add_option(cli.HVOPTS_OPT)
854
  parser.add_option(cli.OSPARAMS_OPT)
855
  parser.add_option(cli.NET_OPT)
856
  parser.add_option(SRC_RAPI_PORT_OPT)
857
  parser.add_option(SRC_CA_FILE_OPT)
858
  parser.add_option(SRC_USERNAME_OPT)
859
  parser.add_option(SRC_PASSWORD_FILE_OPT)
860
  parser.add_option(DEST_RAPI_PORT_OPT)
861
  parser.add_option(DEST_CA_FILE_OPT)
862
  parser.add_option(DEST_USERNAME_OPT)
863
  parser.add_option(DEST_PASSWORD_FILE_OPT)
864
  parser.add_option(DEST_INSTANCE_NAME_OPT)
865
  parser.add_option(DEST_PRIMARY_NODE_OPT)
866
  parser.add_option(DEST_SECONDARY_NODE_OPT)
867
  parser.add_option(DEST_DISK_TEMPLATE_OPT)
868
  parser.add_option(COMPRESS_OPT)
869
  parser.add_option(PARALLEL_OPT)
870
  parser.add_option(OPPORTUNISTIC_TRIES_OPT)
871
  parser.add_option(OPPORTUNISTIC_DELAY_OPT)
872

    
873
  (options, args) = parser.parse_args()
874

    
875
  return (parser, options, args)
876

    
877

    
878
def CheckOptions(parser, options, args):
879
  """Checks options and arguments for validity.
880

    
881
  """
882
  if len(args) < 3:
883
    parser.error("Not enough arguments")
884

    
885
  src_cluster_name = args.pop(0)
886
  dest_cluster_name = args.pop(0)
887
  instance_names = args
888

    
889
  assert len(instance_names) > 0
890

    
891
  # TODO: Remove once using system default paths for SSL certificate
892
  # verification is implemented
893
  if not options.src_ca_file:
894
    parser.error("Missing source cluster CA file")
895

    
896
  if options.parallel < 1:
897
    parser.error("Number of simultaneous moves must be >= 1")
898

    
899
  if (bool(options.iallocator) and
900
      bool(options.dest_primary_node or options.dest_secondary_node)):
901
    parser.error("Destination node and iallocator options exclude each other")
902

    
903
  if (not options.iallocator and (options.opportunistic_tries > 0)):
904
    parser.error("Opportunistic instance creation can only be used with an"
905
                 " iallocator")
906

    
907
  tries_specified = options.opportunistic_tries is not None
908
  delay_specified = options.opportunistic_delay is not None
909
  if tries_specified:
910
    if options.opportunistic_tries < 0:
911
      parser.error("Number of opportunistic creation attempts must be >= 0")
912
    if delay_specified:
913
      if options.opportunistic_delay <= 0:
914
        parser.error("The delay between two successive creation attempts must"
915
                     " be greater than zero")
916
  elif delay_specified:
917
    parser.error("Opportunistic delay can only be specified when opportunistic"
918
                 " tries are used")
919
  else:
920
    # The default values will be provided later
921
    pass
922

    
923
  if len(instance_names) == 1:
924
    # Moving one instance only
925
    if options.hvparams:
926
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
927

    
928
    if options.beparams:
929
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
930

    
931
    if options.nics:
932
      options.nics = cli.ParseNicOption(options.nics)
933
  else:
934
    # Moving more than one instance
935
    if (options.dest_instance_name or options.dest_primary_node or
936
        options.dest_secondary_node or options.hvparams or
937
        options.beparams or options.osparams or options.nics):
938
      parser.error("The options --dest-instance-name, --dest-primary-node,"
939
                   " --dest-secondary-node, --hypervisor-parameters,"
940
                   " --backend-parameters, --os-parameters and --net can"
941
                   " only be used when moving exactly one instance")
942

    
943
  return (src_cluster_name, dest_cluster_name, instance_names)
944

    
945

    
946
def DestClusterHasDefaultIAllocator(rapi_factory):
947
  """Determines if a given cluster has a default iallocator.
948

    
949
  """
950
  result = rapi_factory.GetDestClient().GetInfo()
951
  ia_name = "default_iallocator"
952
  return ia_name in result and result[ia_name]
953

    
954

    
955
def ExitWithError(message):
956
  """Exits after an error and shows a message.
957

    
958
  """
959
  sys.stderr.write("move-instance: error: " + message + "\n")
960
  sys.exit(constants.EXIT_FAILURE)
961

    
962

    
963
@UsesRapiClient
964
def main():
965
  """Main routine.
966

    
967
  """
968
  (parser, options, args) = ParseOptions()
969

    
970
  utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
971

    
972
  (src_cluster_name, dest_cluster_name, instance_names) = \
973
    CheckOptions(parser, options, args)
974

    
975
  logging.info("Source cluster: %s", src_cluster_name)
976
  logging.info("Destination cluster: %s", dest_cluster_name)
977
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
978

    
979
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
980

    
981
  CheckRapiSetup(rapi_factory)
982

    
983
  has_iallocator = options.iallocator or \
984
                   DestClusterHasDefaultIAllocator(rapi_factory)
985

    
986
  if len(instance_names) > 1 and not has_iallocator:
987
    ExitWithError("When moving multiple nodes, an iallocator must be used. "
988
                  "None was provided and the target cluster does not have "
989
                  "a default iallocator.")
990
  if (len(instance_names) == 1 and not (has_iallocator or
991
      options.dest_primary_node or options.dest_secondary_node)):
992
    ExitWithError("Target cluster does not have a default iallocator, "
993
                  "please specify either destination nodes or an iallocator.")
994

    
995
  # Prepare list of instance moves
996
  moves = []
997
  for src_instance_name in instance_names:
998
    if options.dest_instance_name:
999
      assert len(instance_names) == 1
1000
      # Rename instance
1001
      dest_instance_name = options.dest_instance_name
1002
    else:
1003
      dest_instance_name = src_instance_name
1004

    
1005
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
1006
                              options.dest_primary_node,
1007
                              options.dest_secondary_node,
1008
                              options.compress,
1009
                              options.iallocator,
1010
                              options.dest_disk_template,
1011
                              options.hvparams,
1012
                              options.beparams,
1013
                              options.osparams,
1014
                              options.nics,
1015
                              options.opportunistic_tries,
1016
                              options.opportunistic_delay))
1017

    
1018
  assert len(moves) == len(instance_names)
1019

    
1020
  # Start workerpool
1021
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
1022
  try:
1023
    # Add instance moves to workerpool
1024
    for move in moves:
1025
      wp.AddTask((rapi_factory, move))
1026

    
1027
    # Wait for all moves to finish
1028
    wp.Quiesce()
1029

    
1030
  finally:
1031
    wp.TerminateWorkers()
1032

    
1033
  # There should be no threads running at this point, hence not using locks
1034
  # anymore
1035

    
1036
  logging.info("Instance move results:")
1037

    
1038
  for move in moves:
1039
    if move.dest_instance_name == move.src_instance_name:
1040
      name = move.src_instance_name
1041
    else:
1042
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
1043

    
1044
    if move.error_message:
1045
      msg = "Failed (%s)" % move.error_message
1046
    else:
1047
      msg = "Success"
1048

    
1049
    logging.info("%s: %s", name, msg)
1050

    
1051
  if compat.any(move.error_message for move in moves):
1052
    sys.exit(constants.EXIT_FAILURE)
1053

    
1054
  sys.exit(constants.EXIT_SUCCESS)
1055

    
1056

    
1057
if __name__ == "__main__":
1058
  main()