Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ a111ebde

History | View | Annotate | Download (30.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable-msg=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import objects
40
from ganeti import compat
41
from ganeti import rapi
42

    
43
import ganeti.rapi.client # pylint: disable-msg=W0611
44
import ganeti.rapi.client_utils
45

    
46

    
47
SRC_RAPI_PORT_OPT = \
48
  cli.cli_option("--src-rapi-port", action="store", type="int",
49
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
50
                 help=("Source cluster RAPI port (defaults to %s)" %
51
                       constants.DEFAULT_RAPI_PORT))
52

    
53
SRC_CA_FILE_OPT = \
54
  cli.cli_option("--src-ca-file", action="store", type="string",
55
                 dest="src_ca_file",
56
                 help=("File containing source cluster Certificate"
57
                       " Authority (CA) in PEM format"))
58

    
59
SRC_USERNAME_OPT = \
60
  cli.cli_option("--src-username", action="store", type="string",
61
                 dest="src_username", default=None,
62
                 help="Source cluster username")
63

    
64
SRC_PASSWORD_FILE_OPT = \
65
  cli.cli_option("--src-password-file", action="store", type="string",
66
                 dest="src_password_file",
67
                 help="File containing source cluster password")
68

    
69
DEST_RAPI_PORT_OPT = \
70
  cli.cli_option("--dest-rapi-port", action="store", type="int",
71
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
72
                 help=("Destination cluster RAPI port (defaults to source"
73
                       " cluster RAPI port)"))
74

    
75
DEST_CA_FILE_OPT = \
76
  cli.cli_option("--dest-ca-file", action="store", type="string",
77
                 dest="dest_ca_file",
78
                 help=("File containing destination cluster Certificate"
79
                       " Authority (CA) in PEM format (defaults to source"
80
                       " cluster CA)"))
81

    
82
DEST_USERNAME_OPT = \
83
  cli.cli_option("--dest-username", action="store", type="string",
84
                 dest="dest_username", default=None,
85
                 help=("Destination cluster username (defaults to"
86
                       " source cluster username)"))
87

    
88
DEST_PASSWORD_FILE_OPT = \
89
  cli.cli_option("--dest-password-file", action="store", type="string",
90
                 dest="dest_password_file",
91
                 help=("File containing destination cluster password"
92
                       " (defaults to source cluster password)"))
93

    
94
DEST_INSTANCE_NAME_OPT = \
95
  cli.cli_option("--dest-instance-name", action="store", type="string",
96
                 dest="dest_instance_name",
97
                 help=("Instance name on destination cluster (only"
98
                       " when moving exactly one instance)"))
99

    
100
DEST_PRIMARY_NODE_OPT = \
101
  cli.cli_option("--dest-primary-node", action="store", type="string",
102
                 dest="dest_primary_node",
103
                 help=("Primary node on destination cluster (only"
104
                       " when moving exactly one instance)"))
105

    
106
DEST_SECONDARY_NODE_OPT = \
107
  cli.cli_option("--dest-secondary-node", action="store", type="string",
108
                 dest="dest_secondary_node",
109
                 help=("Secondary node on destination cluster (only"
110
                       " when moving exactly one instance)"))
111

    
112
PARALLEL_OPT = \
113
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
114
                 dest="parallel", metavar="<number>",
115
                 help="Number of instances to be moved simultaneously")
116

    
117

    
118
class Error(Exception):
119
  """Generic error.
120

    
121
  """
122

    
123

    
124
class Abort(Error):
125
  """Special exception for aborting import/export.
126

    
127
  """
128

    
129

    
130
class RapiClientFactory:
131
  """Factory class for creating RAPI clients.
132

    
133
  @ivar src_cluster_name: Source cluster name
134
  @ivar dest_cluster_name: Destination cluster name
135
  @ivar GetSourceClient: Callable returning new client for source cluster
136
  @ivar GetDestClient: Callable returning new client for destination cluster
137

    
138
  """
139
  def __init__(self, options, src_cluster_name, dest_cluster_name):
140
    """Initializes this class.
141

    
142
    @param options: Program options
143
    @type src_cluster_name: string
144
    @param src_cluster_name: Source cluster name
145
    @type dest_cluster_name: string
146
    @param dest_cluster_name: Destination cluster name
147

    
148
    """
149
    self.src_cluster_name = src_cluster_name
150
    self.dest_cluster_name = dest_cluster_name
151

    
152
    # TODO: Implement timeouts for RAPI connections
153
    # TODO: Support for using system default paths for verifying SSL certificate
154
    logging.debug("Using '%s' as source CA", options.src_ca_file)
155
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
156

    
157
    if options.dest_ca_file:
158
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
159
      dest_curl_config = \
160
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
161
    else:
162
      logging.debug("Using source CA for destination")
163
      dest_curl_config = src_curl_config
164

    
165
    logging.debug("Source RAPI server is %s:%s",
166
                  src_cluster_name, options.src_rapi_port)
167
    logging.debug("Source username is '%s'", options.src_username)
168

    
169
    if options.src_username is None:
170
      src_username = ""
171
    else:
172
      src_username = options.src_username
173

    
174
    if options.src_password_file:
175
      logging.debug("Reading '%s' for source password",
176
                    options.src_password_file)
177
      src_password = utils.ReadOneLineFile(options.src_password_file,
178
                                           strict=True)
179
    else:
180
      logging.debug("Source has no password")
181
      src_password = None
182

    
183
    self.GetSourceClient = lambda: \
184
      rapi.client.GanetiRapiClient(src_cluster_name,
185
                                   port=options.src_rapi_port,
186
                                   curl_config_fn=src_curl_config,
187
                                   username=src_username,
188
                                   password=src_password)
189

    
190
    if options.dest_rapi_port:
191
      dest_rapi_port = options.dest_rapi_port
192
    else:
193
      dest_rapi_port = options.src_rapi_port
194

    
195
    if options.dest_username is None:
196
      dest_username = src_username
197
    else:
198
      dest_username = options.dest_username
199

    
200
    logging.debug("Destination RAPI server is %s:%s",
201
                  dest_cluster_name, dest_rapi_port)
202
    logging.debug("Destination username is '%s'", dest_username)
203

    
204
    if options.dest_password_file:
205
      logging.debug("Reading '%s' for destination password",
206
                    options.dest_password_file)
207
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
208
                                            strict=True)
209
    else:
210
      logging.debug("Using source password for destination")
211
      dest_password = src_password
212

    
213
    self.GetDestClient = lambda: \
214
      rapi.client.GanetiRapiClient(dest_cluster_name,
215
                                   port=dest_rapi_port,
216
                                   curl_config_fn=dest_curl_config,
217
                                   username=dest_username,
218
                                   password=dest_password)
219

    
220

    
221
class MoveJobPollReportCb(cli.JobPollReportCbBase):
222
  def __init__(self, abort_check_fn, remote_import_fn):
223
    """Initializes this class.
224

    
225
    @type abort_check_fn: callable
226
    @param abort_check_fn: Function to check whether move is aborted
227
    @type remote_import_fn: callable or None
228
    @param remote_import_fn: Callback for reporting received remote import
229
                             information
230

    
231
    """
232
    cli.JobPollReportCbBase.__init__(self)
233
    self._abort_check_fn = abort_check_fn
234
    self._remote_import_fn = remote_import_fn
235

    
236
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
237
    """Handles a log message.
238

    
239
    """
240
    if log_type == constants.ELOG_REMOTE_IMPORT:
241
      logging.debug("Received remote import information")
242

    
243
      if not self._remote_import_fn:
244
        raise RuntimeError("Received unexpected remote import information")
245

    
246
      assert "x509_ca" in log_msg
247
      assert "disks" in log_msg
248

    
249
      self._remote_import_fn(log_msg)
250

    
251
      return
252

    
253
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
254
                 cli.FormatLogMessage(log_type, log_msg))
255

    
256
  def ReportNotChanged(self, job_id, status):
257
    """Called if a job hasn't changed in a while.
258

    
259
    """
260
    try:
261
      # Check whether we were told to abort by the other thread
262
      self._abort_check_fn()
263
    except Abort:
264
      logging.warning("Aborting despite job %s still running", job_id)
265
      raise
266

    
267

    
268
class InstanceMove(object):
269
  """Status class for instance moves.
270

    
271
  """
272
  def __init__(self, src_instance_name, dest_instance_name,
273
               dest_pnode, dest_snode, dest_iallocator,
274
               hvparams, beparams, osparams, nics):
275
    """Initializes this class.
276

    
277
    @type src_instance_name: string
278
    @param src_instance_name: Instance name on source cluster
279
    @type dest_instance_name: string
280
    @param dest_instance_name: Instance name on destination cluster
281
    @type dest_pnode: string or None
282
    @param dest_pnode: Name of primary node on destination cluster
283
    @type dest_snode: string or None
284
    @param dest_snode: Name of secondary node on destination cluster
285
    @type dest_iallocator: string or None
286
    @param dest_iallocator: Name of iallocator to use
287
    @type hvparams: dict or None
288
    @param hvparams: Hypervisor parameters to override
289
    @type beparams: dict or None
290
    @param beparams: Backend parameters to override
291
    @type osparams: dict or None
292
    @param osparams: OS parameters to override
293
    @type nics: dict or None
294
    @param nics: NICs to override
295

    
296
    """
297
    self.src_instance_name = src_instance_name
298
    self.dest_instance_name = dest_instance_name
299
    self.dest_pnode = dest_pnode
300
    self.dest_snode = dest_snode
301
    self.dest_iallocator = dest_iallocator
302
    self.hvparams = hvparams
303
    self.beparams = beparams
304
    self.osparams = osparams
305
    self.nics = nics
306

    
307
    self.error_message = None
308

    
309

    
310
class MoveRuntime(object):
311
  """Class to keep track of instance move.
312

    
313
  """
314
  def __init__(self, move):
315
    """Initializes this class.
316

    
317
    @type move: L{InstanceMove}
318

    
319
    """
320
    self.move = move
321

    
322
    # Thread synchronization
323
    self.lock = threading.Lock()
324
    self.source_to_dest = threading.Condition(self.lock)
325
    self.dest_to_source = threading.Condition(self.lock)
326

    
327
    # Source information
328
    self.src_error_message = None
329
    self.src_expinfo = None
330
    self.src_instinfo = None
331

    
332
    # Destination information
333
    self.dest_error_message = None
334
    self.dest_impinfo = None
335

    
336
  def HandleErrors(self, prefix, fn, *args):
337
    """Wrapper to catch errors and abort threads.
338

    
339
    @type prefix: string
340
    @param prefix: Variable name prefix ("src" or "dest")
341
    @type fn: callable
342
    @param fn: Function
343

    
344
    """
345
    assert prefix in ("dest", "src")
346

    
347
    try:
348
      # Call inner function
349
      fn(*args)
350

    
351
      errmsg = None
352
    except Abort:
353
      errmsg = "Aborted"
354
    except Exception, err:
355
      logging.exception("Caught unhandled exception")
356
      errmsg = str(err)
357

    
358
    setattr(self, "%s_error_message" % prefix, errmsg)
359

    
360
    self.lock.acquire()
361
    try:
362
      self.source_to_dest.notifyAll()
363
      self.dest_to_source.notifyAll()
364
    finally:
365
      self.lock.release()
366

    
367
  def CheckAbort(self):
368
    """Check whether thread should be aborted.
369

    
370
    @raise Abort: When thread should be aborted
371

    
372
    """
373
    if not (self.src_error_message is None and
374
            self.dest_error_message is None):
375
      logging.info("Aborting")
376
      raise Abort()
377

    
378
  def Wait(self, cond, check_fn):
379
    """Waits for a condition to become true.
380

    
381
    @type cond: threading.Condition
382
    @param cond: Threading condition
383
    @type check_fn: callable
384
    @param check_fn: Function to check whether condition is true
385

    
386
    """
387
    cond.acquire()
388
    try:
389
      while check_fn(self):
390
        self.CheckAbort()
391
        cond.wait()
392
    finally:
393
      cond.release()
394

    
395
  def PollJob(self, cl, job_id, remote_import_fn=None):
396
    """Wrapper for polling a job.
397

    
398
    @type cl: L{rapi.client.GanetiRapiClient}
399
    @param cl: RAPI client
400
    @type job_id: string
401
    @param job_id: Job ID
402
    @type remote_import_fn: callable or None
403
    @param remote_import_fn: Callback for reporting received remote import
404
                             information
405

    
406
    """
407
    return rapi.client_utils.PollJob(cl, job_id,
408
                                     MoveJobPollReportCb(self.CheckAbort,
409
                                                         remote_import_fn))
410

    
411

    
412
class MoveDestExecutor(object):
413
  def __init__(self, dest_client, mrt):
414
    """Destination side of an instance move.
415

    
416
    @type dest_client: L{rapi.client.GanetiRapiClient}
417
    @param dest_client: RAPI client
418
    @type mrt: L{MoveRuntime}
419
    @param mrt: Instance move runtime information
420

    
421
    """
422
    logging.debug("Waiting for instance information to become available")
423
    mrt.Wait(mrt.source_to_dest,
424
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
425

    
426
    logging.info("Creating instance %s in remote-import mode",
427
                 mrt.move.dest_instance_name)
428
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
429
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
430
                                  mrt.move.dest_iallocator,
431
                                  mrt.src_instinfo, mrt.src_expinfo,
432
                                  mrt.move.hvparams, mrt.move.beparams,
433
                                  mrt.move.beparams, mrt.move.nics)
434
    mrt.PollJob(dest_client, job_id,
435
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
436

    
437
    logging.info("Import successful")
438

    
439
  @staticmethod
440
  def _SetImportInfo(mrt, impinfo):
441
    """Sets the remote import information and notifies source thread.
442

    
443
    @type mrt: L{MoveRuntime}
444
    @param mrt: Instance move runtime information
445
    @param impinfo: Remote import information
446

    
447
    """
448
    mrt.dest_to_source.acquire()
449
    try:
450
      mrt.dest_impinfo = impinfo
451
      mrt.dest_to_source.notifyAll()
452
    finally:
453
      mrt.dest_to_source.release()
454

    
455
  @staticmethod
456
  def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
457
                      override_hvparams, override_beparams, override_osparams,
458
                      override_nics):
459
    """Starts the instance creation in remote import mode.
460

    
461
    @type cl: L{rapi.client.GanetiRapiClient}
462
    @param cl: RAPI client
463
    @type name: string
464
    @param name: Instance name
465
    @type pnode: string or None
466
    @param pnode: Name of primary node on destination cluster
467
    @type snode: string or None
468
    @param snode: Name of secondary node on destination cluster
469
    @type iallocator: string or None
470
    @param iallocator: Name of iallocator to use
471
    @type instance: dict
472
    @param instance: Instance details from source cluster
473
    @type expinfo: dict
474
    @param expinfo: Prepared export information from source cluster
475
    @type override_hvparams: dict or None
476
    @param override_hvparams: Hypervisor parameters to override
477
    @type override_beparams: dict or None
478
    @param override_beparams: Backend parameters to override
479
    @type override_osparams: dict or None
480
    @param override_osparams: OS parameters to override
481
    @type override_nics: dict or None
482
    @param override_nics: NICs to override
483
    @return: Job ID
484

    
485
    """
486
    disk_template = instance["disk_template"]
487

    
488
    disks = [{
489
      constants.IDISK_SIZE: i["size"],
490
      constants.IDISK_MODE: i["mode"],
491
      } for i in instance["disks"]]
492

    
493
    nics = [{
494
      constants.INIC_IP: ip,
495
      constants.INIC_MAC: mac,
496
      constants.INIC_MODE: mode,
497
      constants.INIC_LINK: link,
498
      } for ip, mac, mode, link in instance["nics"]]
499

    
500
    if len(override_nics) > len(nics):
501
      raise Error("Can not create new NICs")
502

    
503
    if override_nics:
504
      assert len(override_nics) <= len(nics)
505
      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
506
        nics[idx] = objects.FillDict(nic, override)
507

    
508
    # TODO: Should this be the actual up/down status? (run_state)
509
    start = (instance["config_state"] == "up")
510

    
511
    assert len(disks) == len(instance["disks"])
512
    assert len(nics) == len(instance["nics"])
513

    
514
    inst_beparams = instance["be_instance"]
515
    if not inst_beparams:
516
      inst_beparams = {}
517

    
518
    inst_hvparams = instance["hv_instance"]
519
    if not inst_hvparams:
520
      inst_hvparams = {}
521

    
522
    inst_osparams = instance["os_instance"]
523
    if not inst_osparams:
524
      inst_osparams = {}
525

    
526
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
527
                             name, disk_template, disks, nics,
528
                             os=instance["os"],
529
                             pnode=pnode,
530
                             snode=snode,
531
                             start=start,
532
                             ip_check=False,
533
                             iallocator=iallocator,
534
                             hypervisor=instance["hypervisor"],
535
                             source_handshake=expinfo["handshake"],
536
                             source_x509_ca=expinfo["x509_ca"],
537
                             source_instance_name=instance["name"],
538
                             beparams=objects.FillDict(inst_beparams,
539
                                                       override_beparams),
540
                             hvparams=objects.FillDict(inst_hvparams,
541
                                                       override_hvparams),
542
                             osparams=objects.FillDict(inst_osparams,
543
                                                       override_osparams))
544

    
545

    
546
class MoveSourceExecutor(object):
547
  def __init__(self, src_client, mrt):
548
    """Source side of an instance move.
549

    
550
    @type src_client: L{rapi.client.GanetiRapiClient}
551
    @param src_client: RAPI client
552
    @type mrt: L{MoveRuntime}
553
    @param mrt: Instance move runtime information
554

    
555
    """
556
    logging.info("Checking whether instance exists")
557
    self._CheckInstance(src_client, mrt.move.src_instance_name)
558

    
559
    logging.info("Retrieving instance information from source cluster")
560
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
561
                                     mrt.move.src_instance_name)
562

    
563
    logging.info("Preparing export on source cluster")
564
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
565
                                  mrt.move.src_instance_name)
566
    assert "handshake" in expinfo
567
    assert "x509_key_name" in expinfo
568
    assert "x509_ca" in expinfo
569

    
570
    # Hand information to destination thread
571
    mrt.source_to_dest.acquire()
572
    try:
573
      mrt.src_instinfo = instinfo
574
      mrt.src_expinfo = expinfo
575
      mrt.source_to_dest.notifyAll()
576
    finally:
577
      mrt.source_to_dest.release()
578

    
579
    logging.info("Waiting for destination information to become available")
580
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
581

    
582
    logging.info("Starting remote export on source cluster")
583
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
584
                         expinfo["x509_key_name"], mrt.dest_impinfo)
585

    
586
    logging.info("Export successful")
587

    
588
  @staticmethod
589
  def _CheckInstance(cl, name):
590
    """Checks whether the instance exists on the source cluster.
591

    
592
    @type cl: L{rapi.client.GanetiRapiClient}
593
    @param cl: RAPI client
594
    @type name: string
595
    @param name: Instance name
596

    
597
    """
598
    try:
599
      cl.GetInstance(name)
600
    except rapi.client.GanetiApiError, err:
601
      if err.code == rapi.client.HTTP_NOT_FOUND:
602
        raise Error("Instance %s not found (%s)" % (name, str(err)))
603
      raise
604

    
605
  @staticmethod
606
  def _GetInstanceInfo(cl, poll_job_fn, name):
607
    """Retrieves detailed instance information from source cluster.
608

    
609
    @type cl: L{rapi.client.GanetiRapiClient}
610
    @param cl: RAPI client
611
    @type poll_job_fn: callable
612
    @param poll_job_fn: Function to poll for job result
613
    @type name: string
614
    @param name: Instance name
615

    
616
    """
617
    job_id = cl.GetInstanceInfo(name, static=True)
618
    result = poll_job_fn(cl, job_id)
619
    assert len(result[0].keys()) == 1
620
    return result[0][result[0].keys()[0]]
621

    
622
  @staticmethod
623
  def _PrepareExport(cl, poll_job_fn, name):
624
    """Prepares export on source cluster.
625

    
626
    @type cl: L{rapi.client.GanetiRapiClient}
627
    @param cl: RAPI client
628
    @type poll_job_fn: callable
629
    @param poll_job_fn: Function to poll for job result
630
    @type name: string
631
    @param name: Instance name
632

    
633
    """
634
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
635
    return poll_job_fn(cl, job_id)[0]
636

    
637
  @staticmethod
638
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
639
    """Exports instance from source cluster.
640

    
641
    @type cl: L{rapi.client.GanetiRapiClient}
642
    @param cl: RAPI client
643
    @type poll_job_fn: callable
644
    @param poll_job_fn: Function to poll for job result
645
    @type name: string
646
    @param name: Instance name
647
    @param x509_key_name: Source X509 key
648
    @param impinfo: Import information from destination cluster
649

    
650
    """
651
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
652
                               impinfo["disks"], shutdown=True,
653
                               remove_instance=True,
654
                               x509_key_name=x509_key_name,
655
                               destination_x509_ca=impinfo["x509_ca"])
656
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
657

    
658
    if not (fin_resu and compat.all(dresults)):
659
      raise Error("Export failed for disks %s" %
660
                  utils.CommaJoin(str(idx) for idx, result
661
                                  in enumerate(dresults) if not result))
662

    
663

    
664
class MoveSourceWorker(workerpool.BaseWorker):
665
  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
666
    """Executes an instance move.
667

    
668
    @type rapi_factory: L{RapiClientFactory}
669
    @param rapi_factory: RAPI client factory
670
    @type move: L{InstanceMove}
671
    @param move: Instance move information
672

    
673
    """
674
    try:
675
      logging.info("Preparing to move %s from cluster %s to %s as %s",
676
                   move.src_instance_name, rapi_factory.src_cluster_name,
677
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
678

    
679
      mrt = MoveRuntime(move)
680

    
681
      logging.debug("Starting destination thread")
682
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
683
                                     target=mrt.HandleErrors,
684
                                     args=("dest", MoveDestExecutor,
685
                                           rapi_factory.GetDestClient(),
686
                                           mrt, ))
687
      dest_thread.start()
688
      try:
689
        mrt.HandleErrors("src", MoveSourceExecutor,
690
                         rapi_factory.GetSourceClient(), mrt)
691
      finally:
692
        dest_thread.join()
693

    
694
      if mrt.src_error_message or mrt.dest_error_message:
695
        move.error_message = ("Source error: %s, destination error: %s" %
696
                              (mrt.src_error_message, mrt.dest_error_message))
697
      else:
698
        move.error_message = None
699
    except Exception, err: # pylint: disable-msg=W0703
700
      logging.exception("Caught unhandled exception")
701
      move.error_message = str(err)
702

    
703

    
704
def CheckRapiSetup(rapi_factory):
705
  """Checks the RAPI setup by retrieving the version.
706

    
707
  @type rapi_factory: L{RapiClientFactory}
708
  @param rapi_factory: RAPI client factory
709

    
710
  """
711
  src_client = rapi_factory.GetSourceClient()
712
  logging.info("Connecting to source RAPI server")
713
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
714

    
715
  dest_client = rapi_factory.GetDestClient()
716
  logging.info("Connecting to destination RAPI server")
717
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
718

    
719

    
720
def SetupLogging(options):
721
  """Setting up logging infrastructure.
722

    
723
  @param options: Parsed command line options
724

    
725
  """
726
  fmt = "%(asctime)s: %(threadName)s "
727
  if options.debug or options.verbose:
728
    fmt += "%(levelname)s "
729
  fmt += "%(message)s"
730

    
731
  formatter = logging.Formatter(fmt)
732

    
733
  stderr_handler = logging.StreamHandler()
734
  stderr_handler.setFormatter(formatter)
735
  if options.debug:
736
    stderr_handler.setLevel(logging.NOTSET)
737
  elif options.verbose:
738
    stderr_handler.setLevel(logging.INFO)
739
  else:
740
    stderr_handler.setLevel(logging.ERROR)
741

    
742
  root_logger = logging.getLogger("")
743
  root_logger.setLevel(logging.NOTSET)
744
  root_logger.addHandler(stderr_handler)
745

    
746

    
747
def ParseOptions():
748
  """Parses options passed to program.
749

    
750
  """
751
  program = os.path.basename(sys.argv[0])
752

    
753
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
754
                                        " <source-cluster> <dest-cluster>"
755
                                        " <instance...>"),
756
                                 prog=program)
757
  parser.add_option(cli.DEBUG_OPT)
758
  parser.add_option(cli.VERBOSE_OPT)
759
  parser.add_option(cli.IALLOCATOR_OPT)
760
  parser.add_option(cli.BACKEND_OPT)
761
  parser.add_option(cli.HVOPTS_OPT)
762
  parser.add_option(cli.OSPARAMS_OPT)
763
  parser.add_option(cli.NET_OPT)
764
  parser.add_option(SRC_RAPI_PORT_OPT)
765
  parser.add_option(SRC_CA_FILE_OPT)
766
  parser.add_option(SRC_USERNAME_OPT)
767
  parser.add_option(SRC_PASSWORD_FILE_OPT)
768
  parser.add_option(DEST_RAPI_PORT_OPT)
769
  parser.add_option(DEST_CA_FILE_OPT)
770
  parser.add_option(DEST_USERNAME_OPT)
771
  parser.add_option(DEST_PASSWORD_FILE_OPT)
772
  parser.add_option(DEST_INSTANCE_NAME_OPT)
773
  parser.add_option(DEST_PRIMARY_NODE_OPT)
774
  parser.add_option(DEST_SECONDARY_NODE_OPT)
775
  parser.add_option(PARALLEL_OPT)
776

    
777
  (options, args) = parser.parse_args()
778

    
779
  return (parser, options, args)
780

    
781

    
782
def CheckOptions(parser, options, args):
783
  """Checks options and arguments for validity.
784

    
785
  """
786
  if len(args) < 3:
787
    parser.error("Not enough arguments")
788

    
789
  src_cluster_name = args.pop(0)
790
  dest_cluster_name = args.pop(0)
791
  instance_names = args
792

    
793
  assert len(instance_names) > 0
794

    
795
  # TODO: Remove once using system default paths for SSL certificate
796
  # verification is implemented
797
  if not options.src_ca_file:
798
    parser.error("Missing source cluster CA file")
799

    
800
  if options.parallel < 1:
801
    parser.error("Number of simultaneous moves must be >= 1")
802

    
803
  if not (bool(options.iallocator) ^
804
          bool(options.dest_primary_node or options.dest_secondary_node)):
805
    parser.error("Destination node and iallocator options exclude each other")
806

    
807
  if len(instance_names) == 1:
808
    # Moving one instance only
809
    if not (options.iallocator or
810
            options.dest_primary_node or
811
            options.dest_secondary_node):
812
      parser.error("An iallocator or the destination node is required")
813

    
814
    if options.hvparams:
815
      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
816

    
817
    if options.beparams:
818
      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
819

    
820
    if options.nics:
821
      options.nics = cli.ParseNicOption(options.nics)
822
  else:
823
    # Moving more than one instance
824
    if (options.dest_instance_name or options.dest_primary_node or
825
        options.dest_secondary_node or options.hvparams or
826
        options.beparams or options.osparams or options.nics):
827
      parser.error("The options --dest-instance-name, --dest-primary-node,"
828
                   " --dest-secondary-node, --hypervisor-parameters,"
829
                   " --backend-parameters, --os-parameters and --net can"
830
                   " only be used when moving exactly one instance")
831

    
832
    if not options.iallocator:
833
      parser.error("An iallocator must be specified for moving more than one"
834
                   " instance")
835

    
836
  return (src_cluster_name, dest_cluster_name, instance_names)
837

    
838

    
839
@rapi.client.UsesRapiClient
840
def main():
841
  """Main routine.
842

    
843
  """
844
  (parser, options, args) = ParseOptions()
845

    
846
  SetupLogging(options)
847

    
848
  (src_cluster_name, dest_cluster_name, instance_names) = \
849
    CheckOptions(parser, options, args)
850

    
851
  logging.info("Source cluster: %s", src_cluster_name)
852
  logging.info("Destination cluster: %s", dest_cluster_name)
853
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
854

    
855
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
856

    
857
  CheckRapiSetup(rapi_factory)
858

    
859
  assert (len(instance_names) == 1 or
860
          not (options.dest_primary_node or options.dest_secondary_node))
861
  assert len(instance_names) == 1 or options.iallocator
862
  assert (len(instance_names) > 1 or options.iallocator or
863
          options.dest_primary_node or options.dest_secondary_node)
864
  assert (len(instance_names) == 1 or
865
          not (options.hvparams or options.beparams or options.osparams or
866
               options.nics))
867

    
868
  # Prepare list of instance moves
869
  moves = []
870
  for src_instance_name in instance_names:
871
    if options.dest_instance_name:
872
      assert len(instance_names) == 1
873
      # Rename instance
874
      dest_instance_name = options.dest_instance_name
875
    else:
876
      dest_instance_name = src_instance_name
877

    
878
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
879
                              options.dest_primary_node,
880
                              options.dest_secondary_node,
881
                              options.iallocator, options.hvparams,
882
                              options.beparams, options.osparams,
883
                              options.nics))
884

    
885
  assert len(moves) == len(instance_names)
886

    
887
  # Start workerpool
888
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
889
  try:
890
    # Add instance moves to workerpool
891
    for move in moves:
892
      wp.AddTask((rapi_factory, move))
893

    
894
    # Wait for all moves to finish
895
    wp.Quiesce()
896

    
897
  finally:
898
    wp.TerminateWorkers()
899

    
900
  # There should be no threads running at this point, hence not using locks
901
  # anymore
902

    
903
  logging.info("Instance move results:")
904

    
905
  for move in moves:
906
    if move.dest_instance_name == move.src_instance_name:
907
      name = move.src_instance_name
908
    else:
909
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
910

    
911
    if move.error_message:
912
      msg = "Failed (%s)" % move.error_message
913
    else:
914
      msg = "Success"
915

    
916
    logging.info("%s: %s", name, msg)
917

    
918
  if compat.any(move.error_message for move in moves):
919
    sys.exit(constants.EXIT_FAILURE)
920

    
921
  sys.exit(constants.EXIT_SUCCESS)
922

    
923

    
924
if __name__ == "__main__":
925
  main()