Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ 6bf273d5

History | View | Annotate | Download (27.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable-msg=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import compat
40
from ganeti import rapi
41

    
42
import ganeti.rapi.client # pylint: disable-msg=W0611
43
import ganeti.rapi.client_utils
44

    
45

    
46
SRC_RAPI_PORT_OPT = \
47
  cli.cli_option("--src-rapi-port", action="store", type="int",
48
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49
                 help=("Source cluster RAPI port (defaults to %s)" %
50
                       constants.DEFAULT_RAPI_PORT))
51

    
52
SRC_CA_FILE_OPT = \
53
  cli.cli_option("--src-ca-file", action="store", type="string",
54
                 dest="src_ca_file",
55
                 help=("File containing source cluster Certificate"
56
                       " Authority (CA) in PEM format"))
57

    
58
SRC_USERNAME_OPT = \
59
  cli.cli_option("--src-username", action="store", type="string",
60
                 dest="src_username", default=None,
61
                 help="Source cluster username")
62

    
63
SRC_PASSWORD_FILE_OPT = \
64
  cli.cli_option("--src-password-file", action="store", type="string",
65
                 dest="src_password_file",
66
                 help="File containing source cluster password")
67

    
68
DEST_RAPI_PORT_OPT = \
69
  cli.cli_option("--dest-rapi-port", action="store", type="int",
70
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71
                 help=("Destination cluster RAPI port (defaults to source"
72
                       " cluster RAPI port)"))
73

    
74
DEST_CA_FILE_OPT = \
75
  cli.cli_option("--dest-ca-file", action="store", type="string",
76
                 dest="dest_ca_file",
77
                 help=("File containing destination cluster Certificate"
78
                       " Authority (CA) in PEM format (defaults to source"
79
                       " cluster CA)"))
80

    
81
DEST_USERNAME_OPT = \
82
  cli.cli_option("--dest-username", action="store", type="string",
83
                 dest="dest_username", default=None,
84
                 help=("Destination cluster username (defaults to"
85
                       " source cluster username)"))
86

    
87
DEST_PASSWORD_FILE_OPT = \
88
  cli.cli_option("--dest-password-file", action="store", type="string",
89
                 dest="dest_password_file",
90
                 help=("File containing destination cluster password"
91
                       " (defaults to source cluster password)"))
92

    
93
DEST_INSTANCE_NAME_OPT = \
94
  cli.cli_option("--dest-instance-name", action="store", type="string",
95
                 dest="dest_instance_name",
96
                 help=("Instance name on destination cluster (only"
97
                       " when moving exactly one instance)"))
98

    
99
DEST_PRIMARY_NODE_OPT = \
100
  cli.cli_option("--dest-primary-node", action="store", type="string",
101
                 dest="dest_primary_node",
102
                 help=("Primary node on destination cluster (only"
103
                       " when moving exactly one instance)"))
104

    
105
DEST_SECONDARY_NODE_OPT = \
106
  cli.cli_option("--dest-secondary-node", action="store", type="string",
107
                 dest="dest_secondary_node",
108
                 help=("Secondary node on destination cluster (only"
109
                       " when moving exactly one instance)"))
110

    
111
PARALLEL_OPT = \
112
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113
                 dest="parallel", metavar="<number>",
114
                 help="Number of instances to be moved simultaneously")
115

    
116

    
117
class Error(Exception):
118
  """Generic error.
119

    
120
  """
121

    
122

    
123
class Abort(Error):
124
  """Special exception for aborting import/export.
125

    
126
  """
127

    
128

    
129
class RapiClientFactory:
130
  """Factory class for creating RAPI clients.
131

    
132
  @ivar src_cluster_name: Source cluster name
133
  @ivar dest_cluster_name: Destination cluster name
134
  @ivar GetSourceClient: Callable returning new client for source cluster
135
  @ivar GetDestClient: Callable returning new client for destination cluster
136

    
137
  """
138
  def __init__(self, options, src_cluster_name, dest_cluster_name):
139
    """Initializes this class.
140

    
141
    @param options: Program options
142
    @type src_cluster_name: string
143
    @param src_cluster_name: Source cluster name
144
    @type dest_cluster_name: string
145
    @param dest_cluster_name: Destination cluster name
146

    
147
    """
148
    self.src_cluster_name = src_cluster_name
149
    self.dest_cluster_name = dest_cluster_name
150

    
151
    # TODO: Support for using system default paths for verifying SSL certificate
152
    # (already implemented in CertAuthorityVerify)
153
    logging.debug("Using '%s' as source CA", options.src_ca_file)
154
    src_ssl_config = rapi.client.CertAuthorityVerify(cafile=options.src_ca_file)
155

    
156
    if options.dest_ca_file:
157
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
158
      dest_ssl_config = \
159
        rapi.client.CertAuthorityVerify(cafile=options.dest_ca_file)
160
    else:
161
      logging.debug("Using source CA for destination")
162
      dest_ssl_config = src_ssl_config
163

    
164
    logging.debug("Source RAPI server is %s:%s",
165
                  src_cluster_name, options.src_rapi_port)
166
    logging.debug("Source username is '%s'", options.src_username)
167

    
168
    if options.src_username is None:
169
      src_username = ""
170
    else:
171
      src_username = options.src_username
172

    
173
    if options.src_password_file:
174
      logging.debug("Reading '%s' for source password",
175
                    options.src_password_file)
176
      src_password = utils.ReadOneLineFile(options.src_password_file,
177
                                           strict=True)
178
    else:
179
      logging.debug("Source has no password")
180
      src_password = None
181

    
182
    self.GetSourceClient = lambda: \
183
      rapi.client.GanetiRapiClient(src_cluster_name,
184
                                   port=options.src_rapi_port,
185
                                   config_ssl_verification=src_ssl_config,
186
                                   username=src_username,
187
                                   password=src_password)
188

    
189
    if options.dest_rapi_port:
190
      dest_rapi_port = options.dest_rapi_port
191
    else:
192
      dest_rapi_port = options.src_rapi_port
193

    
194
    if options.dest_username is None:
195
      dest_username = src_username
196
    else:
197
      dest_username = options.dest_username
198

    
199
    logging.debug("Destination RAPI server is %s:%s",
200
                  dest_cluster_name, dest_rapi_port)
201
    logging.debug("Destination username is '%s'", dest_username)
202

    
203
    if options.dest_password_file:
204
      logging.debug("Reading '%s' for destination password",
205
                    options.dest_password_file)
206
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
207
                                            strict=True)
208
    else:
209
      logging.debug("Using source password for destination")
210
      dest_password = src_password
211

    
212
    self.GetDestClient = lambda: \
213
      rapi.client.GanetiRapiClient(dest_cluster_name,
214
                                   port=dest_rapi_port,
215
                                   config_ssl_verification=dest_ssl_config,
216
                                   username=dest_username,
217
                                   password=dest_password)
218

    
219

    
220
class MoveJobPollReportCb(cli.JobPollReportCbBase):
221
  def __init__(self, abort_check_fn, remote_import_fn):
222
    """Initializes this class.
223

    
224
    @type abort_check_fn: callable
225
    @param abort_check_fn: Function to check whether move is aborted
226
    @type remote_import_fn: callable or None
227
    @param remote_import_fn: Callback for reporting received remote import
228
                             information
229

    
230
    """
231
    cli.JobPollReportCbBase.__init__(self)
232
    self._abort_check_fn = abort_check_fn
233
    self._remote_import_fn = remote_import_fn
234

    
235
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236
    """Handles a log message.
237

    
238
    """
239
    if log_type == constants.ELOG_REMOTE_IMPORT:
240
      logging.debug("Received remote import information")
241

    
242
      if not self._remote_import_fn:
243
        raise RuntimeError("Received unexpected remote import information")
244

    
245
      assert "x509_ca" in log_msg
246
      assert "disks" in log_msg
247

    
248
      self._remote_import_fn(log_msg)
249

    
250
      return
251

    
252
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253
                 utils.SafeEncode(log_msg))
254

    
255
  def ReportNotChanged(self, job_id, status):
256
    """Called if a job hasn't changed in a while.
257

    
258
    """
259
    try:
260
      # Check whether we were told to abort by the other thread
261
      self._abort_check_fn()
262
    except Abort:
263
      logging.warning("Aborting despite job %s still running", job_id)
264
      raise
265

    
266

    
267
class InstanceMove(object):
268
  """Status class for instance moves.
269

    
270
  """
271
  def __init__(self, src_instance_name, dest_instance_name,
272
               dest_pnode, dest_snode, dest_iallocator):
273
    """Initializes this class.
274

    
275
    @type src_instance_name: string
276
    @param src_instance_name: Instance name on source cluster
277
    @type dest_instance_name: string
278
    @param dest_instance_name: Instance name on destination cluster
279
    @type dest_pnode: string or None
280
    @param dest_pnode: Name of primary node on destination cluster
281
    @type dest_snode: string or None
282
    @param dest_snode: Name of secondary node on destination cluster
283
    @type dest_iallocator: string or None
284
    @param dest_iallocator: Name of iallocator to use
285

    
286
    """
287
    self.src_instance_name = src_instance_name
288
    self.dest_instance_name = dest_instance_name
289
    self.dest_pnode = dest_pnode
290
    self.dest_snode = dest_snode
291
    self.dest_iallocator = dest_iallocator
292

    
293
    self.success = None
294
    self.error_message = None
295

    
296

    
297
class MoveRuntime(object):
298
  """Class to keep track of instance move.
299

    
300
  """
301
  def __init__(self, move):
302
    """Initializes this class.
303

    
304
    @type move: L{InstanceMove}
305

    
306
    """
307
    self.move = move
308

    
309
    # Thread synchronization
310
    self.lock = threading.Lock()
311
    self.source_to_dest = threading.Condition(self.lock)
312
    self.dest_to_source = threading.Condition(self.lock)
313

    
314
    # Set when threads should abort
315
    self.abort = None
316

    
317
    # Source information
318
    self.src_success = None
319
    self.src_error_message = None
320
    self.src_expinfo = None
321
    self.src_instinfo = None
322

    
323
    # Destination information
324
    self.dest_success = None
325
    self.dest_error_message = None
326
    self.dest_impinfo = None
327

    
328
  def HandleErrors(self, prefix, fn, *args):
329
    """Wrapper to catch errors and abort threads.
330

    
331
    @type prefix: string
332
    @param prefix: Variable name prefix ("src" or "dest")
333
    @type fn: callable
334
    @param fn: Function
335

    
336
    """
337
    assert prefix in ("dest", "src")
338

    
339
    try:
340
      # Call inner function
341
      fn(*args)
342

    
343
      success = True
344
      errmsg = None
345
    except Abort:
346
      success = False
347
      errmsg = "Aborted"
348
    except Exception, err:
349
      logging.exception("Caught unhandled exception")
350
      success = False
351
      errmsg = str(err)
352

    
353
    self.lock.acquire()
354
    try:
355
      # Tell all threads to abort
356
      self.abort = True
357
      self.source_to_dest.notifyAll()
358
      self.dest_to_source.notifyAll()
359
    finally:
360
      self.lock.release()
361

    
362
    setattr(self, "%s_success" % prefix, success)
363
    setattr(self, "%s_error_message" % prefix, errmsg)
364

    
365
  def CheckAbort(self):
366
    """Check whether thread should be aborted.
367

    
368
    @raise Abort: When thread should be aborted
369

    
370
    """
371
    if self.abort:
372
      logging.info("Aborting")
373
      raise Abort()
374

    
375
  def Wait(self, cond, check_fn):
376
    """Waits for a condition to become true.
377

    
378
    @type cond: threading.Condition
379
    @param cond: Threading condition
380
    @type check_fn: callable
381
    @param check_fn: Function to check whether condition is true
382

    
383
    """
384
    cond.acquire()
385
    try:
386
      while check_fn(self):
387
        self.CheckAbort()
388
        cond.wait()
389
    finally:
390
      cond.release()
391

    
392
  def PollJob(self, cl, job_id, remote_import_fn=None):
393
    """Wrapper for polling a job.
394

    
395
    @type cl: L{rapi.client.GanetiRapiClient}
396
    @param cl: RAPI client
397
    @type job_id: string
398
    @param job_id: Job ID
399
    @type remote_import_fn: callable or None
400
    @param remote_import_fn: Callback for reporting received remote import
401
                             information
402

    
403
    """
404
    return rapi.client_utils.PollJob(cl, job_id,
405
                                     MoveJobPollReportCb(self.CheckAbort,
406
                                                         remote_import_fn))
407

    
408

    
409
class MoveDestExecutor(object):
410
  def __init__(self, dest_client, mrt):
411
    """Destination side of an instance move.
412

    
413
    @type dest_client: L{rapi.client.GanetiRapiClient}
414
    @param dest_client: RAPI client
415
    @type mrt: L{MoveRuntime}
416
    @param mrt: Instance move runtime information
417

    
418
    """
419
    logging.debug("Waiting for instance information to become available")
420
    mrt.Wait(mrt.source_to_dest,
421
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
422

    
423
    logging.info("Creating instance %s in remote-import mode",
424
                 mrt.move.dest_instance_name)
425
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
426
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
427
                                  mrt.move.dest_iallocator,
428
                                  mrt.src_instinfo, mrt.src_expinfo)
429
    mrt.PollJob(dest_client, job_id,
430
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
431

    
432
    logging.info("Import successful")
433

    
434
  @staticmethod
435
  def _SetImportInfo(mrt, impinfo):
436
    """Sets the remote import information and notifies source thread.
437

    
438
    @type mrt: L{MoveRuntime}
439
    @param mrt: Instance move runtime information
440
    @param impinfo: Remote import information
441

    
442
    """
443
    mrt.dest_to_source.acquire()
444
    try:
445
      mrt.dest_impinfo = impinfo
446
      mrt.dest_to_source.notifyAll()
447
    finally:
448
      mrt.dest_to_source.release()
449

    
450
  @staticmethod
451
  def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
452
    """Starts the instance creation in remote import mode.
453

    
454
    @type cl: L{rapi.client.GanetiRapiClient}
455
    @param cl: RAPI client
456
    @type name: string
457
    @param name: Instance name
458
    @type pnode: string or None
459
    @param pnode: Name of primary node on destination cluster
460
    @type snode: string or None
461
    @param snode: Name of secondary node on destination cluster
462
    @type iallocator: string or None
463
    @param iallocator: Name of iallocator to use
464
    @type instance: dict
465
    @param instance: Instance details from source cluster
466
    @type expinfo: dict
467
    @param expinfo: Prepared export information from source cluster
468
    @return: Job ID
469

    
470
    """
471
    disk_template = instance["disk_template"]
472

    
473
    disks = [{
474
      "size": i["size"],
475
      "mode": i["mode"],
476
      } for i in instance["disks"]]
477

    
478
    nics = [{
479
      "ip": ip,
480
      "mac": mac,
481
      "mode": mode,
482
      "link": link,
483
      } for ip, mac, mode, link in instance["nics"]]
484

    
485
    # TODO: Should this be the actual up/down status? (run_state)
486
    start = (instance["config_state"] == "up")
487

    
488
    assert len(disks) == len(instance["disks"])
489
    assert len(nics) == len(instance["nics"])
490

    
491
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
492
                             name, disk_template, disks, nics,
493
                             os=instance["os"],
494
                             pnode=pnode,
495
                             snode=snode,
496
                             start=start,
497
                             ip_check=False,
498
                             iallocator=iallocator,
499
                             hypervisor=instance["hypervisor"],
500
                             source_handshake=expinfo["handshake"],
501
                             source_x509_ca=expinfo["x509_ca"],
502
                             source_instance_name=instance["name"],
503
                             beparams=instance["be_instance"],
504
                             hvparams=instance["hv_instance"])
505

    
506

    
507
class MoveSourceExecutor(object):
508
  def __init__(self, src_client, mrt):
509
    """Source side of an instance move.
510

    
511
    @type src_client: L{rapi.client.GanetiRapiClient}
512
    @param src_client: RAPI client
513
    @type mrt: L{MoveRuntime}
514
    @param mrt: Instance move runtime information
515

    
516
    """
517
    logging.info("Checking whether instance exists")
518
    self._CheckInstance(src_client, mrt.move.src_instance_name)
519

    
520
    logging.info("Retrieving instance information from source cluster")
521
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
522
                                     mrt.move.src_instance_name)
523

    
524
    logging.info("Preparing export on source cluster")
525
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
526
                                  mrt.move.src_instance_name)
527
    assert "handshake" in expinfo
528
    assert "x509_key_name" in expinfo
529
    assert "x509_ca" in expinfo
530

    
531
    # Hand information to destination thread
532
    mrt.source_to_dest.acquire()
533
    try:
534
      mrt.src_instinfo = instinfo
535
      mrt.src_expinfo = expinfo
536
      mrt.source_to_dest.notifyAll()
537
    finally:
538
      mrt.source_to_dest.release()
539

    
540
    logging.info("Waiting for destination information to become available")
541
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
542

    
543
    logging.info("Starting remote export on source cluster")
544
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
545
                         expinfo["x509_key_name"], mrt.dest_impinfo)
546

    
547
    logging.info("Export successful")
548

    
549
  @staticmethod
550
  def _CheckInstance(cl, name):
551
    """Checks whether the instance exists on the source cluster.
552

    
553
    @type cl: L{rapi.client.GanetiRapiClient}
554
    @param cl: RAPI client
555
    @type name: string
556
    @param name: Instance name
557

    
558
    """
559
    try:
560
      cl.GetInstance(name)
561
    except rapi.client.GanetiApiError, err:
562
      if err.code == rapi.client.HTTP_NOT_FOUND:
563
        raise Error("Instance %s not found (%s)" % (name, str(err)))
564
      raise
565

    
566
  @staticmethod
567
  def _GetInstanceInfo(cl, poll_job_fn, name):
568
    """Retrieves detailed instance information from source cluster.
569

    
570
    @type cl: L{rapi.client.GanetiRapiClient}
571
    @param cl: RAPI client
572
    @type poll_job_fn: callable
573
    @param poll_job_fn: Function to poll for job result
574
    @type name: string
575
    @param name: Instance name
576

    
577
    """
578
    job_id = cl.GetInstanceInfo(name, static=True)
579
    result = poll_job_fn(cl, job_id)
580
    assert len(result[0].keys()) == 1
581
    return result[0][result[0].keys()[0]]
582

    
583
  @staticmethod
584
  def _PrepareExport(cl, poll_job_fn, name):
585
    """Prepares export on source cluster.
586

    
587
    @type cl: L{rapi.client.GanetiRapiClient}
588
    @param cl: RAPI client
589
    @type poll_job_fn: callable
590
    @param poll_job_fn: Function to poll for job result
591
    @type name: string
592
    @param name: Instance name
593

    
594
    """
595
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
596
    return poll_job_fn(cl, job_id)[0]
597

    
598
  @staticmethod
599
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
600
    """Exports instance from source cluster.
601

    
602
    @type cl: L{rapi.client.GanetiRapiClient}
603
    @param cl: RAPI client
604
    @type poll_job_fn: callable
605
    @param poll_job_fn: Function to poll for job result
606
    @type name: string
607
    @param name: Instance name
608
    @param x509_key_name: Source X509 key
609
    @param impinfo: Import information from destination cluster
610

    
611
    """
612
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
613
                               impinfo["disks"], shutdown=True,
614
                               remove_instance=True,
615
                               x509_key_name=x509_key_name,
616
                               destination_x509_ca=impinfo["x509_ca"])
617
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
618

    
619
    if not (fin_resu and compat.all(dresults)):
620
      raise Error("Export failed for disks %s" %
621
                  utils.CommaJoin(str(idx) for idx, result
622
                                  in enumerate(dresults) if not result))
623

    
624

    
625
class MoveSourceWorker(workerpool.BaseWorker):
626
  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
627
    """Executes an instance move.
628

    
629
    @type rapi_factory: L{RapiClientFactory}
630
    @param rapi_factory: RAPI client factory
631
    @type move: L{InstanceMove}
632
    @param move: Instance move information
633

    
634
    """
635
    try:
636
      logging.info("Preparing to move %s from cluster %s to %s as %s",
637
                   move.src_instance_name, rapi_factory.src_cluster_name,
638
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
639

    
640
      mrt = MoveRuntime(move)
641

    
642
      logging.debug("Starting destination thread")
643
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
644
                                     target=mrt.HandleErrors,
645
                                     args=("dest", MoveDestExecutor,
646
                                           rapi_factory.GetDestClient(),
647
                                           mrt, ))
648
      dest_thread.start()
649
      try:
650
        mrt.HandleErrors("src", MoveSourceExecutor,
651
                         rapi_factory.GetSourceClient(), mrt)
652
      finally:
653
        dest_thread.join()
654

    
655
      move.success = (mrt.src_success and mrt.dest_success)
656
      if mrt.src_error_message or mrt.dest_error_message:
657
        move.error_message = ("Source error: %s, destination error: %s" %
658
                              (mrt.src_error_message, mrt.dest_error_message))
659
      else:
660
        move.error_message = None
661
    except Exception, err: # pylint: disable-msg=W0703
662
      logging.exception("Caught unhandled exception")
663
      move.success = False
664
      move.error_message = str(err)
665

    
666

    
667
def CheckRapiSetup(rapi_factory):
668
  """Checks the RAPI setup by retrieving the version.
669

    
670
  @type rapi_factory: L{RapiClientFactory}
671
  @param rapi_factory: RAPI client factory
672

    
673
  """
674
  src_client = rapi_factory.GetSourceClient()
675
  logging.info("Connecting to source RAPI server")
676
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
677

    
678
  dest_client = rapi_factory.GetDestClient()
679
  logging.info("Connecting to destination RAPI server")
680
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
681

    
682

    
683
def SetupLogging(options):
684
  """Setting up logging infrastructure.
685

    
686
  @param options: Parsed command line options
687

    
688
  """
689
  fmt = "%(asctime)s: %(threadName)s "
690
  if options.debug or options.verbose:
691
    fmt += "%(levelname)s "
692
  fmt += "%(message)s"
693

    
694
  formatter = logging.Formatter(fmt)
695

    
696
  stderr_handler = logging.StreamHandler()
697
  stderr_handler.setFormatter(formatter)
698
  if options.debug:
699
    stderr_handler.setLevel(logging.NOTSET)
700
  elif options.verbose:
701
    stderr_handler.setLevel(logging.INFO)
702
  else:
703
    stderr_handler.setLevel(logging.ERROR)
704

    
705
  root_logger = logging.getLogger("")
706
  root_logger.setLevel(logging.NOTSET)
707
  root_logger.addHandler(stderr_handler)
708

    
709

    
710
def ParseOptions():
711
  """Parses options passed to program.
712

    
713
  """
714
  program = os.path.basename(sys.argv[0])
715

    
716
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
717
                                        " <source-cluster> <dest-cluster>"
718
                                        " <instance...>"),
719
                                 prog=program)
720
  parser.add_option(cli.DEBUG_OPT)
721
  parser.add_option(cli.VERBOSE_OPT)
722
  parser.add_option(cli.IALLOCATOR_OPT)
723
  parser.add_option(SRC_RAPI_PORT_OPT)
724
  parser.add_option(SRC_CA_FILE_OPT)
725
  parser.add_option(SRC_USERNAME_OPT)
726
  parser.add_option(SRC_PASSWORD_FILE_OPT)
727
  parser.add_option(DEST_RAPI_PORT_OPT)
728
  parser.add_option(DEST_CA_FILE_OPT)
729
  parser.add_option(DEST_USERNAME_OPT)
730
  parser.add_option(DEST_PASSWORD_FILE_OPT)
731
  parser.add_option(DEST_INSTANCE_NAME_OPT)
732
  parser.add_option(DEST_PRIMARY_NODE_OPT)
733
  parser.add_option(DEST_SECONDARY_NODE_OPT)
734
  parser.add_option(PARALLEL_OPT)
735

    
736
  (options, args) = parser.parse_args()
737

    
738
  return (parser, options, args)
739

    
740

    
741
def CheckOptions(parser, options, args):
742
  """Checks options and arguments for validity.
743

    
744
  """
745
  if len(args) < 3:
746
    parser.error("Not enough arguments")
747

    
748
  src_cluster_name = args.pop(0)
749
  dest_cluster_name = args.pop(0)
750
  instance_names = args
751

    
752
  assert len(instance_names) > 0
753

    
754
  # TODO: Remove once using system default paths for SSL certificate
755
  # verification is implemented
756
  if not options.src_ca_file:
757
    parser.error("Missing source cluster CA file")
758

    
759
  if options.parallel < 1:
760
    parser.error("Number of simultaneous moves must be >= 1")
761

    
762
  if not (bool(options.iallocator) ^
763
          bool(options.dest_primary_node or options.dest_secondary_node)):
764
    parser.error("Destination node and iallocator options exclude each other")
765

    
766
  if len(instance_names) == 1:
767
    # Moving one instance only
768
    if not (options.iallocator or
769
            options.dest_primary_node or
770
            options.dest_secondary_node):
771
      parser.error("An iallocator or the destination node is required")
772
  else:
773
    # Moving more than one instance
774
    if (options.dest_instance_name or options.dest_primary_node or
775
        options.dest_secondary_node):
776
      parser.error("The options --dest-instance-name, --dest-primary-node and"
777
                   " --dest-secondary-node can only be used when moving exactly"
778
                   " one instance")
779

    
780
    if not options.iallocator:
781
      parser.error("An iallocator must be specified for moving more than one"
782
                   " instance")
783

    
784
  return (src_cluster_name, dest_cluster_name, instance_names)
785

    
786

    
787
def main():
788
  """Main routine.
789

    
790
  """
791
  (parser, options, args) = ParseOptions()
792

    
793
  SetupLogging(options)
794

    
795
  (src_cluster_name, dest_cluster_name, instance_names) = \
796
    CheckOptions(parser, options, args)
797

    
798
  logging.info("Source cluster: %s", src_cluster_name)
799
  logging.info("Destination cluster: %s", dest_cluster_name)
800
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
801

    
802
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
803

    
804
  CheckRapiSetup(rapi_factory)
805

    
806
  assert (len(instance_names) == 1 or
807
          not (options.dest_primary_node or options.dest_secondary_node))
808
  assert len(instance_names) == 1 or options.iallocator
809
  assert (len(instance_names) > 1 or options.iallocator or
810
          options.dest_primary_node or options.dest_secondary_node)
811

    
812
  # Prepare list of instance moves
813
  moves = []
814
  for src_instance_name in instance_names:
815
    if options.dest_instance_name:
816
      assert len(instance_names) == 1
817
      # Rename instance
818
      dest_instance_name = options.dest_instance_name
819
    else:
820
      dest_instance_name = src_instance_name
821

    
822
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
823
                              options.dest_primary_node,
824
                              options.dest_secondary_node,
825
                              options.iallocator))
826

    
827
  assert len(moves) == len(instance_names)
828

    
829
  # Start workerpool
830
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
831
  try:
832
    # Add instance moves to workerpool
833
    for move in moves:
834
      wp.AddTask(rapi_factory, move)
835

    
836
    # Wait for all moves to finish
837
    wp.Quiesce()
838

    
839
  finally:
840
    wp.TerminateWorkers()
841

    
842
  # There should be no threads running at this point, hence not using locks
843
  # anymore
844

    
845
  logging.info("Instance move results:")
846

    
847
  for move in moves:
848
    if move.dest_instance_name == move.src_instance_name:
849
      name = move.src_instance_name
850
    else:
851
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
852

    
853
    if move.success and not move.error_message:
854
      msg = "Success"
855
    else:
856
      msg = "Failed (%s)" % move.error_message
857

    
858
    logging.info("%s: %s", name, msg)
859

    
860
  if compat.all(move.success for move in moves):
861
    sys.exit(constants.EXIT_SUCCESS)
862

    
863
  sys.exit(constants.EXIT_FAILURE)
864

    
865

    
866
if __name__ == "__main__":
867
  main()