Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ 4fe43605

History | View | Annotate | Download (36.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import random
34
import threading
35

    
36
from ganeti import cli
37
from ganeti import constants
38
from ganeti import utils
39
from ganeti import workerpool
40
from ganeti import objects
41
from ganeti import compat
42
from ganeti import rapi
43
from ganeti import errors
44

    
45
import ganeti.rapi.client # pylint: disable=W0611
46
import ganeti.rapi.client_utils
47
from ganeti.rapi.client import UsesRapiClient
48

    
49

    
50
SRC_RAPI_PORT_OPT = \
51
  cli.cli_option("--src-rapi-port", action="store", type="int",
52
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
53
                 help=("Source cluster RAPI port (defaults to %s)" %
54
                       constants.DEFAULT_RAPI_PORT))
55

    
56
SRC_CA_FILE_OPT = \
57
  cli.cli_option("--src-ca-file", action="store", type="string",
58
                 dest="src_ca_file",
59
                 help=("File containing source cluster Certificate"
60
                       " Authority (CA) in PEM format"))
61

    
62
SRC_USERNAME_OPT = \
63
  cli.cli_option("--src-username", action="store", type="string",
64
                 dest="src_username", default=None,
65
                 help="Source cluster username")
66

    
67
SRC_PASSWORD_FILE_OPT = \
68
  cli.cli_option("--src-password-file", action="store", type="string",
69
                 dest="src_password_file",
70
                 help="File containing source cluster password")
71

    
72
DEST_RAPI_PORT_OPT = \
73
  cli.cli_option("--dest-rapi-port", action="store", type="int",
74
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
75
                 help=("Destination cluster RAPI port (defaults to source"
76
                       " cluster RAPI port)"))
77

    
78
DEST_CA_FILE_OPT = \
79
  cli.cli_option("--dest-ca-file", action="store", type="string",
80
                 dest="dest_ca_file",
81
                 help=("File containing destination cluster Certificate"
82
                       " Authority (CA) in PEM format (defaults to source"
83
                       " cluster CA)"))
84

    
85
DEST_USERNAME_OPT = \
86
  cli.cli_option("--dest-username", action="store", type="string",
87
                 dest="dest_username", default=None,
88
                 help=("Destination cluster username (defaults to"
89
                       " source cluster username)"))
90

    
91
DEST_PASSWORD_FILE_OPT = \
92
  cli.cli_option("--dest-password-file", action="store", type="string",
93
                 dest="dest_password_file",
94
                 help=("File containing destination cluster password"
95
                       " (defaults to source cluster password)"))
96

    
97
DEST_INSTANCE_NAME_OPT = \
98
  cli.cli_option("--dest-instance-name", action="store", type="string",
99
                 dest="dest_instance_name",
100
                 help=("Instance name on destination cluster (only"
101
                       " when moving exactly one instance)"))
102

    
103
DEST_PRIMARY_NODE_OPT = \
104
  cli.cli_option("--dest-primary-node", action="store", type="string",
105
                 dest="dest_primary_node",
106
                 help=("Primary node on destination cluster (only"
107
                       " when moving exactly one instance)"))
108

    
109
DEST_SECONDARY_NODE_OPT = \
110
  cli.cli_option("--dest-secondary-node", action="store", type="string",
111
                 dest="dest_secondary_node",
112
                 help=("Secondary node on destination cluster (only"
113
                       " when moving exactly one instance)"))
114

    
115
DEST_DISK_TEMPLATE_OPT = \
116
  cli.cli_option("--dest-disk-template", action="store", type="string",
117
                 dest="dest_disk_template", default=None,
118
                 help="Disk template to use on destination cluster")
119

    
120
COMPRESS_OPT = \
121
  cli.cli_option("--compress", action="store", type="string",
122
                 dest="compress", default="none",
123
                 help="Compression mode to use during the move (this mode has"
124
                      " to be supported by both clusters)")
125

    
126
PARALLEL_OPT = \
127
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
128
                 dest="parallel", metavar="<number>",
129
                 help="Number of instances to be moved simultaneously")
130

    
131
OPPORTUNISTIC_TRIES_OPT = \
132
  cli.cli_option("--opportunistic-tries", action="store", type="int",
133
                 dest="opportunistic_tries", metavar="<number>",
134
                 help="Number of opportunistic instance creation attempts"
135
                      " before a normal creation is performed. An opportunistic"
136
                      " attempt will use the iallocator with all the nodes"
137
                      " currently unlocked, failing if not enough nodes are"
138
                      " available. Even though it will succeed (or fail) more"
139
                      " quickly, it can result in suboptimal instance"
140
                      " placement")
141

    
142
OPPORTUNISTIC_DELAY_OPT = \
143
  cli.cli_option("--opportunistic-delay", action="store", type="int",
144
                 dest="opportunistic_delay", metavar="<number>",
145
                 help="The delay between successive opportunistic instance"
146
                      " creation attempts, in seconds")
147

    
148

    
149
class Error(Exception):
150
  """Generic error.
151

    
152
  """
153

    
154

    
155
class Abort(Error):
156
  """Special exception for aborting import/export.
157

    
158
  """
159

    
160

    
161
class RapiClientFactory:
162
  """Factory class for creating RAPI clients.
163

    
164
  @ivar src_cluster_name: Source cluster name
165
  @ivar dest_cluster_name: Destination cluster name
166
  @ivar GetSourceClient: Callable returning new client for source cluster
167
  @ivar GetDestClient: Callable returning new client for destination cluster
168

    
169
  """
170
  def __init__(self, options, src_cluster_name, dest_cluster_name):
171
    """Initializes this class.
172

    
173
    @param options: Program options
174
    @type src_cluster_name: string
175
    @param src_cluster_name: Source cluster name
176
    @type dest_cluster_name: string
177
    @param dest_cluster_name: Destination cluster name
178

    
179
    """
180
    self.src_cluster_name = src_cluster_name
181
    self.dest_cluster_name = dest_cluster_name
182

    
183
    # TODO: Implement timeouts for RAPI connections
184
    # TODO: Support for using system default paths for verifying SSL certificate
185
    logging.debug("Using '%s' as source CA", options.src_ca_file)
186
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
187

    
188
    if options.dest_ca_file:
189
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
190
      dest_curl_config = \
191
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
192
    else:
193
      logging.debug("Using source CA for destination")
194
      dest_curl_config = src_curl_config
195

    
196
    logging.debug("Source RAPI server is %s:%s",
197
                  src_cluster_name, options.src_rapi_port)
198
    logging.debug("Source username is '%s'", options.src_username)
199

    
200
    if options.src_username is None:
201
      src_username = ""
202
    else:
203
      src_username = options.src_username
204

    
205
    if options.src_password_file:
206
      logging.debug("Reading '%s' for source password",
207
                    options.src_password_file)
208
      src_password = utils.ReadOneLineFile(options.src_password_file,
209
                                           strict=True)
210
    else:
211
      logging.debug("Source has no password")
212
      src_password = None
213

    
214
    self.GetSourceClient = lambda: \
215
      rapi.client.GanetiRapiClient(src_cluster_name,
216
                                   port=options.src_rapi_port,
217
                                   curl_config_fn=src_curl_config,
218
                                   username=src_username,
219
                                   password=src_password)
220

    
221
    if options.dest_rapi_port:
222
      dest_rapi_port = options.dest_rapi_port
223
    else:
224
      dest_rapi_port = options.src_rapi_port
225

    
226
    if options.dest_username is None:
227
      dest_username = src_username
228
    else:
229
      dest_username = options.dest_username
230

    
231
    logging.debug("Destination RAPI server is %s:%s",
232
                  dest_cluster_name, dest_rapi_port)
233
    logging.debug("Destination username is '%s'", dest_username)
234

    
235
    if options.dest_password_file:
236
      logging.debug("Reading '%s' for destination password",
237
                    options.dest_password_file)
238
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
239
                                            strict=True)
240
    else:
241
      logging.debug("Using source password for destination")
242
      dest_password = src_password
243

    
244
    self.GetDestClient = lambda: \
245
      rapi.client.GanetiRapiClient(dest_cluster_name,
246
                                   port=dest_rapi_port,
247
                                   curl_config_fn=dest_curl_config,
248
                                   username=dest_username,
249
                                   password=dest_password)
250

    
251

    
252
class MoveJobPollReportCb(cli.JobPollReportCbBase):
253
  def __init__(self, abort_check_fn, remote_import_fn):
254
    """Initializes this class.
255

    
256
    @type abort_check_fn: callable
257
    @param abort_check_fn: Function to check whether move is aborted
258
    @type remote_import_fn: callable or None
259
    @param remote_import_fn: Callback for reporting received remote import
260
                             information
261

    
262
    """
263
    cli.JobPollReportCbBase.__init__(self)
264
    self._abort_check_fn = abort_check_fn
265
    self._remote_import_fn = remote_import_fn
266

    
267
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
268
    """Handles a log message.
269

    
270
    """
271
    if log_type == constants.ELOG_REMOTE_IMPORT:
272
      logging.debug("Received remote import information")
273

    
274
      if not self._remote_import_fn:
275
        raise RuntimeError("Received unexpected remote import information")
276

    
277
      assert "x509_ca" in log_msg
278
      assert "disks" in log_msg
279

    
280
      self._remote_import_fn(log_msg)
281

    
282
      return
283

    
284
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
285
                 cli.FormatLogMessage(log_type, log_msg))
286

    
287
  def ReportNotChanged(self, job_id, status):
288
    """Called if a job hasn't changed in a while.
289

    
290
    """
291
    try:
292
      # Check whether we were told to abort by the other thread
293
      self._abort_check_fn()
294
    except Abort:
295
      logging.warning("Aborting despite job %s still running", job_id)
296
      raise
297

    
298

    
299
class InstanceMove(object):
300
  """Status class for instance moves.
301

    
302
  """
303
  def __init__(self, src_instance_name, dest_instance_name,
304
               dest_pnode, dest_snode, compress, dest_iallocator,
305
               dest_disk_template, hvparams,
306
               beparams, osparams, nics, opportunistic_tries,
307
               opportunistic_delay):
308
    """Initializes this class.
309

    
310
    @type src_instance_name: string
311
    @param src_instance_name: Instance name on source cluster
312
    @type dest_instance_name: string
313
    @param dest_instance_name: Instance name on destination cluster
314
    @type dest_pnode: string or None
315
    @param dest_pnode: Name of primary node on destination cluster
316
    @type dest_snode: string or None
317
    @param dest_snode: Name of secondary node on destination cluster
318
    @type compress; string
319
    @param compress: Compression mode to use (has to be supported on both
320
                     clusters)
321
    @type dest_iallocator: string or None
322
    @param dest_iallocator: Name of iallocator to use
323
    @type dest_disk_template: string or None
324
    @param dest_disk_template: Disk template to use instead of the original one
325
    @type hvparams: dict or None
326
    @param hvparams: Hypervisor parameters to override
327
    @type beparams: dict or None
328
    @param beparams: Backend parameters to override
329
    @type osparams: dict or None
330
    @param osparams: OS parameters to override
331
    @type nics: dict or None
332
    @param nics: NICs to override
333
    @type opportunistic_tries: int or None
334
    @param opportunistic_tries: Number of opportunistic creation attempts to
335
                                perform
336
    @type opportunistic_delay: int or None
337
    @param opportunistic_delay: Delay between successive creation attempts, in
338
                                seconds
339

    
340
    """
341
    self.src_instance_name = src_instance_name
342
    self.dest_instance_name = dest_instance_name
343
    self.dest_pnode = dest_pnode
344
    self.dest_snode = dest_snode
345
    self.compress = compress
346
    self.dest_iallocator = dest_iallocator
347
    self.dest_disk_template = dest_disk_template
348
    self.hvparams = hvparams
349
    self.beparams = beparams
350
    self.osparams = osparams
351
    self.nics = nics
352

    
353
    if opportunistic_tries is not None:
354
      self.opportunistic_tries = opportunistic_tries
355
    else:
356
      self.opportunistic_tries = 0
357

    
358
    if opportunistic_delay is not None:
359
      self.opportunistic_delay = opportunistic_delay
360
    else:
361
      self.opportunistic_delay = constants.DEFAULT_OPPORTUNISTIC_RETRY_INTERVAL
362

    
363
    self.error_message = None
364

    
365

    
366
class MoveRuntime(object):
367
  """Class to keep track of instance move.
368

    
369
  """
370
  def __init__(self, move):
371
    """Initializes this class.
372

    
373
    @type move: L{InstanceMove}
374

    
375
    """
376
    self.move = move
377

    
378
    # Thread synchronization
379
    self.lock = threading.Lock()
380
    self.source_to_dest = threading.Condition(self.lock)
381
    self.dest_to_source = threading.Condition(self.lock)
382

    
383
    # Source information
384
    self.src_error_message = None
385
    self.src_expinfo = None
386
    self.src_instinfo = None
387

    
388
    # Destination information
389
    self.dest_error_message = None
390
    self.dest_impinfo = None
391

    
392
  def HandleErrors(self, prefix, fn, *args):
393
    """Wrapper to catch errors and abort threads.
394

    
395
    @type prefix: string
396
    @param prefix: Variable name prefix ("src" or "dest")
397
    @type fn: callable
398
    @param fn: Function
399

    
400
    """
401
    assert prefix in ("dest", "src")
402

    
403
    try:
404
      # Call inner function
405
      fn(*args)
406

    
407
      errmsg = None
408
    except Abort:
409
      errmsg = "Aborted"
410
    except Exception, err:
411
      logging.exception("Caught unhandled exception")
412
      errmsg = str(err)
413

    
414
    setattr(self, "%s_error_message" % prefix, errmsg)
415

    
416
    self.lock.acquire()
417
    try:
418
      self.source_to_dest.notifyAll()
419
      self.dest_to_source.notifyAll()
420
    finally:
421
      self.lock.release()
422

    
423
  def CheckAbort(self):
424
    """Check whether thread should be aborted.
425

    
426
    @raise Abort: When thread should be aborted
427

    
428
    """
429
    if not (self.src_error_message is None and
430
            self.dest_error_message is None):
431
      logging.info("Aborting")
432
      raise Abort()
433

    
434
  def Wait(self, cond, check_fn):
435
    """Waits for a condition to become true.
436

    
437
    @type cond: threading.Condition
438
    @param cond: Threading condition
439
    @type check_fn: callable
440
    @param check_fn: Function to check whether condition is true
441

    
442
    """
443
    cond.acquire()
444
    try:
445
      while check_fn(self):
446
        self.CheckAbort()
447
        cond.wait()
448
    finally:
449
      cond.release()
450

    
451
  def PollJob(self, cl, job_id, remote_import_fn=None):
452
    """Wrapper for polling a job.
453

    
454
    @type cl: L{rapi.client.GanetiRapiClient}
455
    @param cl: RAPI client
456
    @type job_id: string
457
    @param job_id: Job ID
458
    @type remote_import_fn: callable or None
459
    @param remote_import_fn: Callback for reporting received remote import
460
                             information
461

    
462
    """
463
    return rapi.client_utils.PollJob(cl, job_id,
464
                                     MoveJobPollReportCb(self.CheckAbort,
465
                                                         remote_import_fn))
466

    
467

    
468
class MoveDestExecutor(object):
469
  def __init__(self, dest_client, mrt):
470
    """Destination side of an instance move.
471

    
472
    @type dest_client: L{rapi.client.GanetiRapiClient}
473
    @param dest_client: RAPI client
474
    @type mrt: L{MoveRuntime}
475
    @param mrt: Instance move runtime information
476

    
477
    """
478
    logging.debug("Waiting for instance information to become available")
479
    mrt.Wait(mrt.source_to_dest,
480
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
481

    
482
    logging.info("Creating instance %s in remote-import mode",
483
                 mrt.move.dest_instance_name)
484

    
485
    # Depending on whether opportunistic tries are enabled, we may have to
486
    # make multiple creation attempts
487
    creation_attempts = [True] * mrt.move.opportunistic_tries
488

    
489
    # But the last one is never opportunistic, and will block until completion
490
    # or failure
491
    creation_attempts.append(False)
492

    
493
    # Initiate the RNG for the variations
494
    random.seed()
495

    
496
    for is_attempt_opportunistic in creation_attempts:
497
      job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
498
                                    mrt.move.dest_pnode, mrt.move.dest_snode,
499
                                    mrt.move.compress,
500
                                    mrt.move.dest_iallocator,
501
                                    mrt.move.dest_disk_template,
502
                                    mrt.src_instinfo, mrt.src_expinfo,
503
                                    mrt.move.hvparams, mrt.move.beparams,
504
                                    mrt.move.beparams, mrt.move.nics,
505
                                    is_attempt_opportunistic
506
                                    )
507

    
508
      try:
509
        # The completion of this block signifies that the import has been
510
        # completed successfullly
511
        mrt.PollJob(dest_client, job_id,
512
                    remote_import_fn=compat.partial(self._SetImportInfo, mrt))
513
        logging.info("Import successful")
514
        return
515
      except errors.OpPrereqError, err:
516
        # Any exception in the non-opportunistic creation is to be passed on,
517
        # as well as exceptions apart from resources temporarily unavailable
518
        if not is_attempt_opportunistic or \
519
           err.args[1] != rapi.client.ECODE_TEMP_NORES:
520
          raise
521

    
522
      delay_to_use = MoveDestExecutor._VaryDelay(mrt)
523
      logging.info("Opportunistic attempt unsuccessful, waiting %.2f seconds"
524
                   " before another creation attempt is made",
525
                   delay_to_use)
526
      time.sleep(delay_to_use)
527

    
528
  @staticmethod
529
  def _VaryDelay(mrt):
530
    """ Varies the opportunistic delay by a small amount.
531

    
532
    """
533
    MAX_VARIATION = 0.15
534
    variation_factor = (1.0 + random.uniform(-MAX_VARIATION, MAX_VARIATION))
535
    return mrt.move.opportunistic_delay * variation_factor
536

    
537
  @staticmethod
538
  def _SetImportInfo(mrt, impinfo):
539
    """Sets the remote import information and notifies source thread.
540

    
541
    @type mrt: L{MoveRuntime}
542
    @param mrt: Instance move runtime information
543
    @param impinfo: Remote import information
544

    
545
    """
546
    mrt.dest_to_source.acquire()
547
    try:
548
      mrt.dest_impinfo = impinfo
549
      mrt.dest_to_source.notifyAll()
550
    finally:
551
      mrt.dest_to_source.release()
552

    
553
  @staticmethod
554
  def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
555
                      dest_disk_template, instance, expinfo, override_hvparams,
556
                      override_beparams, override_osparams, override_nics,
557
                      is_attempt_opportunistic):
558
    """Starts the instance creation in remote import mode.
559

    
560
    @type cl: L{rapi.client.GanetiRapiClient}
561
    @param cl: RAPI client
562
    @type name: string
563
    @param name: Instance name
564
    @type pnode: string or None
565
    @param pnode: Name of primary node on destination cluster
566
    @type snode: string or None
567
    @param snode: Name of secondary node on destination cluster
568
    @type compress: string
569
    @param compress: Compression mode to use
570
    @type iallocator: string or None
571
    @param iallocator: Name of iallocator to use
572
    @type dest_disk_template: string or None
573
    @param dest_disk_template: Disk template to use instead of the original one
574
    @type instance: dict
575
    @param instance: Instance details from source cluster
576
    @type expinfo: dict
577
    @param expinfo: Prepared export information from source cluster
578
    @type override_hvparams: dict or None
579
    @param override_hvparams: Hypervisor parameters to override
580
    @type override_beparams: dict or None
581
    @param override_beparams: Backend parameters to override
582
    @type override_osparams: dict or None
583
    @param override_osparams: OS parameters to override
584
    @type override_nics: dict or None
585
    @param override_nics: NICs to override
586
    @type is_attempt_opportunistic: bool
587
    @param is_attempt_opportunistic: Whether to use opportunistic locking or not
588
    @return: Job ID
589

    
590
    """
591
    if dest_disk_template:
592
      disk_template = dest_disk_template
593
    else:
594
      disk_template = instance["disk_template"]
595

    
596
    disks = []
597
    for idisk in instance["disks"]:
598
      odisk = {
599
        constants.IDISK_SIZE: idisk["size"],
600
        constants.IDISK_MODE: idisk["mode"],
601
        constants.IDISK_NAME: str(idisk.get("name")),
602
        }
603
      spindles = idisk.get("spindles")
604
      if spindles is not None:
605
        odisk[constants.IDISK_SPINDLES] = spindles
606
      disks.append(odisk)
607

    
608
    try:
609
      nics = [{
610
        constants.INIC_IP: ip,
611
        constants.INIC_MAC: mac,
612
        constants.INIC_MODE: mode,
613
        constants.INIC_LINK: link,
614
        constants.INIC_VLAN: vlan,
615
        constants.INIC_NETWORK: network,
616
        constants.INIC_NAME: nic_name
617
        } for nic_name, _, ip, mac, mode, link, vlan, network, _
618
          in instance["nics"]]
619
    except ValueError:
620
      raise Error("Received NIC information does not match expected format; "
621
                  "Do the versions of this tool and the source cluster match?")
622

    
623
    if len(override_nics) > len(nics):
624
      raise Error("Can not create new NICs")
625

    
626
    if override_nics:
627
      assert len(override_nics) <= len(nics)
628
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
629
        nics[idx] = objects.FillDict(nic, override)
630

    
631
    # TODO: Should this be the actual up/down status? (run_state)
632
    start = (instance["config_state"] == "up")
633

    
634
    assert len(disks) == len(instance["disks"])
635
    assert len(nics) == len(instance["nics"])
636

    
637
    inst_beparams = instance["be_instance"]
638
    if not inst_beparams:
639
      inst_beparams = {}
640

    
641
    inst_hvparams = instance["hv_instance"]
642
    if not inst_hvparams:
643
      inst_hvparams = {}
644

    
645
    inst_osparams = instance["os_instance"]
646
    if not inst_osparams:
647
      inst_osparams = {}
648

    
649
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
650
                             name, disk_template, disks, nics,
651
                             os=instance["os"],
652
                             pnode=pnode,
653
                             snode=snode,
654
                             start=start,
655
                             ip_check=False,
656
                             iallocator=iallocator,
657
                             hypervisor=instance["hypervisor"],
658
                             source_handshake=expinfo["handshake"],
659
                             source_x509_ca=expinfo["x509_ca"],
660
                             compress=compress,
661
                             source_instance_name=instance["name"],
662
                             beparams=objects.FillDict(inst_beparams,
663
                                                       override_beparams),
664
                             hvparams=objects.FillDict(inst_hvparams,
665
                                                       override_hvparams),
666
                             osparams=objects.FillDict(inst_osparams,
667
                                                       override_osparams),
668
                             opportunistic_locking=is_attempt_opportunistic
669
                             )
670

    
671

    
672
class MoveSourceExecutor(object):
673
  def __init__(self, src_client, mrt):
674
    """Source side of an instance move.
675

    
676
    @type src_client: L{rapi.client.GanetiRapiClient}
677
    @param src_client: RAPI client
678
    @type mrt: L{MoveRuntime}
679
    @param mrt: Instance move runtime information
680

    
681
    """
682
    logging.info("Checking whether instance exists")
683
    self._CheckInstance(src_client, mrt.move.src_instance_name)
684

    
685
    logging.info("Retrieving instance information from source cluster")
686
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
687
                                     mrt.move.src_instance_name)
688
    if (instinfo["disk_template"] in constants.DTS_FILEBASED):
689
      raise Error("Inter-cluster move of file-based instances is not"
690
                  " supported.")
691

    
692
    logging.info("Preparing export on source cluster")
693
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
694
                                  mrt.move.src_instance_name)
695
    assert "handshake" in expinfo
696
    assert "x509_key_name" in expinfo
697
    assert "x509_ca" in expinfo
698

    
699
    # Hand information to destination thread
700
    mrt.source_to_dest.acquire()
701
    try:
702
      mrt.src_instinfo = instinfo
703
      mrt.src_expinfo = expinfo
704
      mrt.source_to_dest.notifyAll()
705
    finally:
706
      mrt.source_to_dest.release()
707

    
708
    logging.info("Waiting for destination information to become available")
709
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
710

    
711
    logging.info("Starting remote export on source cluster")
712
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
713
                         expinfo["x509_key_name"], mrt.move.compress,
714
                         mrt.dest_impinfo)
715

    
716
    logging.info("Export successful")
717

    
718
  @staticmethod
719
  def _CheckInstance(cl, name):
720
    """Checks whether the instance exists on the source cluster.
721

    
722
    @type cl: L{rapi.client.GanetiRapiClient}
723
    @param cl: RAPI client
724
    @type name: string
725
    @param name: Instance name
726

    
727
    """
728
    try:
729
      cl.GetInstance(name)
730
    except rapi.client.GanetiApiError, err:
731
      if err.code == rapi.client.HTTP_NOT_FOUND:
732
        raise Error("Instance %s not found (%s)" % (name, str(err)))
733
      raise
734

    
735
  @staticmethod
736
  def _GetInstanceInfo(cl, poll_job_fn, name):
737
    """Retrieves detailed instance information from source cluster.
738

    
739
    @type cl: L{rapi.client.GanetiRapiClient}
740
    @param cl: RAPI client
741
    @type poll_job_fn: callable
742
    @param poll_job_fn: Function to poll for job result
743
    @type name: string
744
    @param name: Instance name
745

    
746
    """
747
    job_id = cl.GetInstanceInfo(name, static=True)
748
    result = poll_job_fn(cl, job_id)
749
    assert len(result[0].keys()) == 1
750
    return result[0][result[0].keys()[0]]
751

    
752
  @staticmethod
753
  def _PrepareExport(cl, poll_job_fn, name):
754
    """Prepares export on source cluster.
755

    
756
    @type cl: L{rapi.client.GanetiRapiClient}
757
    @param cl: RAPI client
758
    @type poll_job_fn: callable
759
    @param poll_job_fn: Function to poll for job result
760
    @type name: string
761
    @param name: Instance name
762

    
763
    """
764
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
765
    return poll_job_fn(cl, job_id)[0]
766

    
767
  @staticmethod
768
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
769
    """Exports instance from source cluster.
770

    
771
    @type cl: L{rapi.client.GanetiRapiClient}
772
    @param cl: RAPI client
773
    @type poll_job_fn: callable
774
    @param poll_job_fn: Function to poll for job result
775
    @type name: string
776
    @param name: Instance name
777
    @param x509_key_name: Source X509 key
778
    @type compress: string
779
    @param compress: Compression mode to use
780
    @param impinfo: Import information from destination cluster
781

    
782
    """
783
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
784
                               impinfo["disks"], shutdown=True,
785
                               remove_instance=True,
786
                               x509_key_name=x509_key_name,
787
                               destination_x509_ca=impinfo["x509_ca"],
788
                               compress=compress)
789
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
790

    
791
    if not (fin_resu and compat.all(dresults)):
792
      raise Error("Export failed for disks %s" %
793
                  utils.CommaJoin(str(idx) for idx, result
794
                                  in enumerate(dresults) if not result))
795

    
796

    
797
class MoveSourceWorker(workerpool.BaseWorker):
798
  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
799
    """Executes an instance move.
800

    
801
    @type rapi_factory: L{RapiClientFactory}
802
    @param rapi_factory: RAPI client factory
803
    @type move: L{InstanceMove}
804
    @param move: Instance move information
805

    
806
    """
807
    try:
808
      logging.info("Preparing to move %s from cluster %s to %s as %s",
809
                   move.src_instance_name, rapi_factory.src_cluster_name,
810
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
811

    
812
      mrt = MoveRuntime(move)
813

    
814
      logging.debug("Starting destination thread")
815
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
816
                                     target=mrt.HandleErrors,
817
                                     args=("dest", MoveDestExecutor,
818
                                           rapi_factory.GetDestClient(),
819
                                           mrt, ))
820
      dest_thread.start()
821
      try:
822
        mrt.HandleErrors("src", MoveSourceExecutor,
823
                         rapi_factory.GetSourceClient(), mrt)
824
      finally:
825
        dest_thread.join()
826

    
827
      if mrt.src_error_message or mrt.dest_error_message:
828
        move.error_message = ("Source error: %s, destination error: %s" %
829
                              (mrt.src_error_message, mrt.dest_error_message))
830
      else:
831
        move.error_message = None
832
    except Exception, err: # pylint: disable=W0703
833
      logging.exception("Caught unhandled exception")
834
      move.error_message = str(err)
835

    
836

    
837
def CheckRapiSetup(rapi_factory):
838
  """Checks the RAPI setup by retrieving the version.
839

    
840
  @type rapi_factory: L{RapiClientFactory}
841
  @param rapi_factory: RAPI client factory
842

    
843
  """
844
  src_client = rapi_factory.GetSourceClient()
845
  logging.info("Connecting to source RAPI server")
846
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
847

    
848
  dest_client = rapi_factory.GetDestClient()
849
  logging.info("Connecting to destination RAPI server")
850
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
851

    
852

    
853
def ParseOptions():
854
  """Parses options passed to program.
855

    
856
  """
857
  program = os.path.basename(sys.argv[0])
858

    
859
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
860
                                        " <source-cluster> <dest-cluster>"
861
                                        " <instance...>"),
862
                                 prog=program)
863
  parser.add_option(cli.DEBUG_OPT)
864
  parser.add_option(cli.VERBOSE_OPT)
865
  parser.add_option(cli.IALLOCATOR_OPT)
866
  parser.add_option(cli.BACKEND_OPT)
867
  parser.add_option(cli.HVOPTS_OPT)
868
  parser.add_option(cli.OSPARAMS_OPT)
869
  parser.add_option(cli.NET_OPT)
870
  parser.add_option(SRC_RAPI_PORT_OPT)
871
  parser.add_option(SRC_CA_FILE_OPT)
872
  parser.add_option(SRC_USERNAME_OPT)
873
  parser.add_option(SRC_PASSWORD_FILE_OPT)
874
  parser.add_option(DEST_RAPI_PORT_OPT)
875
  parser.add_option(DEST_CA_FILE_OPT)
876
  parser.add_option(DEST_USERNAME_OPT)
877
  parser.add_option(DEST_PASSWORD_FILE_OPT)
878
  parser.add_option(DEST_INSTANCE_NAME_OPT)
879
  parser.add_option(DEST_PRIMARY_NODE_OPT)
880
  parser.add_option(DEST_SECONDARY_NODE_OPT)
881
  parser.add_option(DEST_DISK_TEMPLATE_OPT)
882
  parser.add_option(COMPRESS_OPT)
883
  parser.add_option(PARALLEL_OPT)
884
  parser.add_option(OPPORTUNISTIC_TRIES_OPT)
885
  parser.add_option(OPPORTUNISTIC_DELAY_OPT)
886

    
887
  (options, args) = parser.parse_args()
888

    
889
  return (parser, options, args)
890

    
891

    
892
def CheckOptions(parser, options, args):
893
  """Checks options and arguments for validity.
894

    
895
  """
896
  if len(args) < 3:
897
    parser.error("Not enough arguments")
898

    
899
  src_cluster_name = args.pop(0)
900
  dest_cluster_name = args.pop(0)
901
  instance_names = args
902

    
903
  assert len(instance_names) > 0
904

    
905
  # TODO: Remove once using system default paths for SSL certificate
906
  # verification is implemented
907
  if not options.src_ca_file:
908
    parser.error("Missing source cluster CA file")
909

    
910
  if options.parallel < 1:
911
    parser.error("Number of simultaneous moves must be >= 1")
912

    
913
  if (bool(options.iallocator) and
914
      bool(options.dest_primary_node or options.dest_secondary_node)):
915
    parser.error("Destination node and iallocator options exclude each other")
916

    
917
  if (not options.iallocator and (options.opportunistic_tries > 0)):
918
    parser.error("Opportunistic instance creation can only be used with an"
919
                 " iallocator")
920

    
921
  tries_specified = options.opportunistic_tries is not None
922
  delay_specified = options.opportunistic_delay is not None
923
  if tries_specified:
924
    if options.opportunistic_tries < 0:
925
      parser.error("Number of opportunistic creation attempts must be >= 0")
926
    if delay_specified:
927
      if options.opportunistic_delay <= 0:
928
        parser.error("The delay between two successive creation attempts must"
929
                     " be greater than zero")
930
  elif delay_specified:
931
    parser.error("Opportunistic delay can only be specified when opportunistic"
932
                 " tries are used")
933
  else:
934
    # The default values will be provided later
935
    pass
936

    
937
  if len(instance_names) == 1:
938
    # Moving one instance only
939
    if options.hvparams:
940
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
941

    
942
    if options.beparams:
943
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
944

    
945
    if options.nics:
946
      options.nics = cli.ParseNicOption(options.nics)
947
  else:
948
    # Moving more than one instance
949
    if (options.dest_instance_name or options.dest_primary_node or
950
        options.dest_secondary_node or options.hvparams or
951
        options.beparams or options.osparams or options.nics):
952
      parser.error("The options --dest-instance-name, --dest-primary-node,"
953
                   " --dest-secondary-node, --hypervisor-parameters,"
954
                   " --backend-parameters, --os-parameters and --net can"
955
                   " only be used when moving exactly one instance")
956

    
957
  return (src_cluster_name, dest_cluster_name, instance_names)
958

    
959

    
960
def DestClusterHasDefaultIAllocator(rapi_factory):
961
  """Determines if a given cluster has a default iallocator.
962

    
963
  """
964
  result = rapi_factory.GetDestClient().GetInfo()
965
  ia_name = "default_iallocator"
966
  return ia_name in result and result[ia_name]
967

    
968

    
969
def ExitWithError(message):
970
  """Exits after an error and shows a message.
971

    
972
  """
973
  sys.stderr.write("move-instance: error: " + message + "\n")
974
  sys.exit(constants.EXIT_FAILURE)
975

    
976

    
977
@UsesRapiClient
978
def main():
979
  """Main routine.
980

    
981
  """
982
  (parser, options, args) = ParseOptions()
983

    
984
  utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
985

    
986
  (src_cluster_name, dest_cluster_name, instance_names) = \
987
    CheckOptions(parser, options, args)
988

    
989
  logging.info("Source cluster: %s", src_cluster_name)
990
  logging.info("Destination cluster: %s", dest_cluster_name)
991
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
992

    
993
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
994

    
995
  CheckRapiSetup(rapi_factory)
996

    
997
  has_iallocator = options.iallocator or \
998
                   DestClusterHasDefaultIAllocator(rapi_factory)
999

    
1000
  if len(instance_names) > 1 and not has_iallocator:
1001
    ExitWithError("When moving multiple nodes, an iallocator must be used. "
1002
                  "None was provided and the target cluster does not have "
1003
                  "a default iallocator.")
1004
  if (len(instance_names) == 1 and not (has_iallocator or
1005
      options.dest_primary_node or options.dest_secondary_node)):
1006
    ExitWithError("Target cluster does not have a default iallocator, "
1007
                  "please specify either destination nodes or an iallocator.")
1008

    
1009
  # Prepare list of instance moves
1010
  moves = []
1011
  for src_instance_name in instance_names:
1012
    if options.dest_instance_name:
1013
      assert len(instance_names) == 1
1014
      # Rename instance
1015
      dest_instance_name = options.dest_instance_name
1016
    else:
1017
      dest_instance_name = src_instance_name
1018

    
1019
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
1020
                              options.dest_primary_node,
1021
                              options.dest_secondary_node,
1022
                              options.compress,
1023
                              options.iallocator,
1024
                              options.dest_disk_template,
1025
                              options.hvparams,
1026
                              options.beparams,
1027
                              options.osparams,
1028
                              options.nics,
1029
                              options.opportunistic_tries,
1030
                              options.opportunistic_delay))
1031

    
1032
  assert len(moves) == len(instance_names)
1033

    
1034
  # Start workerpool
1035
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
1036
  try:
1037
    # Add instance moves to workerpool
1038
    for move in moves:
1039
      wp.AddTask((rapi_factory, move))
1040

    
1041
    # Wait for all moves to finish
1042
    wp.Quiesce()
1043

    
1044
  finally:
1045
    wp.TerminateWorkers()
1046

    
1047
  # There should be no threads running at this point, hence not using locks
1048
  # anymore
1049

    
1050
  logging.info("Instance move results:")
1051

    
1052
  for move in moves:
1053
    if move.dest_instance_name == move.src_instance_name:
1054
      name = move.src_instance_name
1055
    else:
1056
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
1057

    
1058
    if move.error_message:
1059
      msg = "Failed (%s)" % move.error_message
1060
    else:
1061
      msg = "Success"
1062

    
1063
    logging.info("%s: %s", name, msg)
1064

    
1065
  if compat.any(move.error_message for move in moves):
1066
    sys.exit(constants.EXIT_FAILURE)
1067

    
1068
  sys.exit(constants.EXIT_SUCCESS)
1069

    
1070

    
1071
if __name__ == "__main__":
1072
  main()