Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ 2a7c3583

History | View | Annotate | Download (27.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable-msg=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import compat
40
from ganeti import rapi
41

    
42
import ganeti.rapi.client # pylint: disable-msg=W0611
43
import ganeti.rapi.client_utils
44

    
45

    
46
SRC_RAPI_PORT_OPT = \
47
  cli.cli_option("--src-rapi-port", action="store", type="int",
48
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49
                 help=("Source cluster RAPI port (defaults to %s)" %
50
                       constants.DEFAULT_RAPI_PORT))
51

    
52
SRC_CA_FILE_OPT = \
53
  cli.cli_option("--src-ca-file", action="store", type="string",
54
                 dest="src_ca_file",
55
                 help=("File containing source cluster Certificate"
56
                       " Authority (CA) in PEM format"))
57

    
58
SRC_USERNAME_OPT = \
59
  cli.cli_option("--src-username", action="store", type="string",
60
                 dest="src_username", default=None,
61
                 help="Source cluster username")
62

    
63
SRC_PASSWORD_FILE_OPT = \
64
  cli.cli_option("--src-password-file", action="store", type="string",
65
                 dest="src_password_file",
66
                 help="File containing source cluster password")
67

    
68
DEST_RAPI_PORT_OPT = \
69
  cli.cli_option("--dest-rapi-port", action="store", type="int",
70
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71
                 help=("Destination cluster RAPI port (defaults to source"
72
                       " cluster RAPI port)"))
73

    
74
DEST_CA_FILE_OPT = \
75
  cli.cli_option("--dest-ca-file", action="store", type="string",
76
                 dest="dest_ca_file",
77
                 help=("File containing destination cluster Certificate"
78
                       " Authority (CA) in PEM format (defaults to source"
79
                       " cluster CA)"))
80

    
81
DEST_USERNAME_OPT = \
82
  cli.cli_option("--dest-username", action="store", type="string",
83
                 dest="dest_username", default=None,
84
                 help=("Destination cluster username (defaults to"
85
                       " source cluster username)"))
86

    
87
DEST_PASSWORD_FILE_OPT = \
88
  cli.cli_option("--dest-password-file", action="store", type="string",
89
                 dest="dest_password_file",
90
                 help=("File containing destination cluster password"
91
                       " (defaults to source cluster password)"))
92

    
93
DEST_INSTANCE_NAME_OPT = \
94
  cli.cli_option("--dest-instance-name", action="store", type="string",
95
                 dest="dest_instance_name",
96
                 help=("Instance name on destination cluster (only"
97
                       " when moving exactly one instance)"))
98

    
99
DEST_PRIMARY_NODE_OPT = \
100
  cli.cli_option("--dest-primary-node", action="store", type="string",
101
                 dest="dest_primary_node",
102
                 help=("Primary node on destination cluster (only"
103
                       " when moving exactly one instance)"))
104

    
105
DEST_SECONDARY_NODE_OPT = \
106
  cli.cli_option("--dest-secondary-node", action="store", type="string",
107
                 dest="dest_secondary_node",
108
                 help=("Secondary node on destination cluster (only"
109
                       " when moving exactly one instance)"))
110

    
111
PARALLEL_OPT = \
112
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113
                 dest="parallel", metavar="<number>",
114
                 help="Number of instances to be moved simultaneously")
115

    
116

    
117
class Error(Exception):
118
  """Generic error.
119

    
120
  """
121

    
122

    
123
class Abort(Error):
124
  """Special exception for aborting import/export.
125

    
126
  """
127

    
128

    
129
class RapiClientFactory:
130
  """Factory class for creating RAPI clients.
131

    
132
  @ivar src_cluster_name: Source cluster name
133
  @ivar dest_cluster_name: Destination cluster name
134
  @ivar GetSourceClient: Callable returning new client for source cluster
135
  @ivar GetDestClient: Callable returning new client for destination cluster
136

    
137
  """
138
  def __init__(self, options, src_cluster_name, dest_cluster_name):
139
    """Initializes this class.
140

    
141
    @param options: Program options
142
    @type src_cluster_name: string
143
    @param src_cluster_name: Source cluster name
144
    @type dest_cluster_name: string
145
    @param dest_cluster_name: Destination cluster name
146

    
147
    """
148
    self.src_cluster_name = src_cluster_name
149
    self.dest_cluster_name = dest_cluster_name
150

    
151
    # TODO: Implement timeouts for RAPI connections
152
    # TODO: Support for using system default paths for verifying SSL certificate
153
    logging.debug("Using '%s' as source CA", options.src_ca_file)
154
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
155

    
156
    if options.dest_ca_file:
157
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
158
      dest_curl_config = \
159
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
160
    else:
161
      logging.debug("Using source CA for destination")
162
      dest_curl_config = src_curl_config
163

    
164
    logging.debug("Source RAPI server is %s:%s",
165
                  src_cluster_name, options.src_rapi_port)
166
    logging.debug("Source username is '%s'", options.src_username)
167

    
168
    if options.src_username is None:
169
      src_username = ""
170
    else:
171
      src_username = options.src_username
172

    
173
    if options.src_password_file:
174
      logging.debug("Reading '%s' for source password",
175
                    options.src_password_file)
176
      src_password = utils.ReadOneLineFile(options.src_password_file,
177
                                           strict=True)
178
    else:
179
      logging.debug("Source has no password")
180
      src_password = None
181

    
182
    self.GetSourceClient = lambda: \
183
      rapi.client.GanetiRapiClient(src_cluster_name,
184
                                   port=options.src_rapi_port,
185
                                   curl_config_fn=src_curl_config,
186
                                   username=src_username,
187
                                   password=src_password)
188

    
189
    if options.dest_rapi_port:
190
      dest_rapi_port = options.dest_rapi_port
191
    else:
192
      dest_rapi_port = options.src_rapi_port
193

    
194
    if options.dest_username is None:
195
      dest_username = src_username
196
    else:
197
      dest_username = options.dest_username
198

    
199
    logging.debug("Destination RAPI server is %s:%s",
200
                  dest_cluster_name, dest_rapi_port)
201
    logging.debug("Destination username is '%s'", dest_username)
202

    
203
    if options.dest_password_file:
204
      logging.debug("Reading '%s' for destination password",
205
                    options.dest_password_file)
206
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
207
                                            strict=True)
208
    else:
209
      logging.debug("Using source password for destination")
210
      dest_password = src_password
211

    
212
    self.GetDestClient = lambda: \
213
      rapi.client.GanetiRapiClient(dest_cluster_name,
214
                                   port=dest_rapi_port,
215
                                   curl_config_fn=dest_curl_config,
216
                                   username=dest_username,
217
                                   password=dest_password)
218

    
219

    
220
class MoveJobPollReportCb(cli.JobPollReportCbBase):
221
  def __init__(self, abort_check_fn, remote_import_fn):
222
    """Initializes this class.
223

    
224
    @type abort_check_fn: callable
225
    @param abort_check_fn: Function to check whether move is aborted
226
    @type remote_import_fn: callable or None
227
    @param remote_import_fn: Callback for reporting received remote import
228
                             information
229

    
230
    """
231
    cli.JobPollReportCbBase.__init__(self)
232
    self._abort_check_fn = abort_check_fn
233
    self._remote_import_fn = remote_import_fn
234

    
235
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236
    """Handles a log message.
237

    
238
    """
239
    if log_type == constants.ELOG_REMOTE_IMPORT:
240
      logging.debug("Received remote import information")
241

    
242
      if not self._remote_import_fn:
243
        raise RuntimeError("Received unexpected remote import information")
244

    
245
      assert "x509_ca" in log_msg
246
      assert "disks" in log_msg
247

    
248
      self._remote_import_fn(log_msg)
249

    
250
      return
251

    
252
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253
                 utils.SafeEncode(log_msg))
254

    
255
  def ReportNotChanged(self, job_id, status):
256
    """Called if a job hasn't changed in a while.
257

    
258
    """
259
    try:
260
      # Check whether we were told to abort by the other thread
261
      self._abort_check_fn()
262
    except Abort:
263
      logging.warning("Aborting despite job %s still running", job_id)
264
      raise
265

    
266

    
267
class InstanceMove(object):
268
  """Status class for instance moves.
269

    
270
  """
271
  def __init__(self, src_instance_name, dest_instance_name,
272
               dest_pnode, dest_snode, dest_iallocator):
273
    """Initializes this class.
274

    
275
    @type src_instance_name: string
276
    @param src_instance_name: Instance name on source cluster
277
    @type dest_instance_name: string
278
    @param dest_instance_name: Instance name on destination cluster
279
    @type dest_pnode: string or None
280
    @param dest_pnode: Name of primary node on destination cluster
281
    @type dest_snode: string or None
282
    @param dest_snode: Name of secondary node on destination cluster
283
    @type dest_iallocator: string or None
284
    @param dest_iallocator: Name of iallocator to use
285

    
286
    """
287
    self.src_instance_name = src_instance_name
288
    self.dest_instance_name = dest_instance_name
289
    self.dest_pnode = dest_pnode
290
    self.dest_snode = dest_snode
291
    self.dest_iallocator = dest_iallocator
292

    
293
    self.error_message = None
294

    
295

    
296
class MoveRuntime(object):
297
  """Class to keep track of instance move.
298

    
299
  """
300
  def __init__(self, move):
301
    """Initializes this class.
302

    
303
    @type move: L{InstanceMove}
304

    
305
    """
306
    self.move = move
307

    
308
    # Thread synchronization
309
    self.lock = threading.Lock()
310
    self.source_to_dest = threading.Condition(self.lock)
311
    self.dest_to_source = threading.Condition(self.lock)
312

    
313
    # Source information
314
    self.src_error_message = None
315
    self.src_expinfo = None
316
    self.src_instinfo = None
317

    
318
    # Destination information
319
    self.dest_error_message = None
320
    self.dest_impinfo = None
321

    
322
  def HandleErrors(self, prefix, fn, *args):
323
    """Wrapper to catch errors and abort threads.
324

    
325
    @type prefix: string
326
    @param prefix: Variable name prefix ("src" or "dest")
327
    @type fn: callable
328
    @param fn: Function
329

    
330
    """
331
    assert prefix in ("dest", "src")
332

    
333
    try:
334
      # Call inner function
335
      fn(*args)
336

    
337
      errmsg = None
338
    except Abort:
339
      errmsg = "Aborted"
340
    except Exception, err:
341
      logging.exception("Caught unhandled exception")
342
      errmsg = str(err)
343

    
344
    setattr(self, "%s_error_message" % prefix, errmsg)
345

    
346
    self.lock.acquire()
347
    try:
348
      self.source_to_dest.notifyAll()
349
      self.dest_to_source.notifyAll()
350
    finally:
351
      self.lock.release()
352

    
353
  def CheckAbort(self):
354
    """Check whether thread should be aborted.
355

    
356
    @raise Abort: When thread should be aborted
357

    
358
    """
359
    if not (self.src_error_message is None and
360
            self.dest_error_message is None):
361
      logging.info("Aborting")
362
      raise Abort()
363

    
364
  def Wait(self, cond, check_fn):
365
    """Waits for a condition to become true.
366

    
367
    @type cond: threading.Condition
368
    @param cond: Threading condition
369
    @type check_fn: callable
370
    @param check_fn: Function to check whether condition is true
371

    
372
    """
373
    cond.acquire()
374
    try:
375
      while check_fn(self):
376
        self.CheckAbort()
377
        cond.wait()
378
    finally:
379
      cond.release()
380

    
381
  def PollJob(self, cl, job_id, remote_import_fn=None):
382
    """Wrapper for polling a job.
383

    
384
    @type cl: L{rapi.client.GanetiRapiClient}
385
    @param cl: RAPI client
386
    @type job_id: string
387
    @param job_id: Job ID
388
    @type remote_import_fn: callable or None
389
    @param remote_import_fn: Callback for reporting received remote import
390
                             information
391

    
392
    """
393
    return rapi.client_utils.PollJob(cl, job_id,
394
                                     MoveJobPollReportCb(self.CheckAbort,
395
                                                         remote_import_fn))
396

    
397

    
398
class MoveDestExecutor(object):
399
  def __init__(self, dest_client, mrt):
400
    """Destination side of an instance move.
401

    
402
    @type dest_client: L{rapi.client.GanetiRapiClient}
403
    @param dest_client: RAPI client
404
    @type mrt: L{MoveRuntime}
405
    @param mrt: Instance move runtime information
406

    
407
    """
408
    logging.debug("Waiting for instance information to become available")
409
    mrt.Wait(mrt.source_to_dest,
410
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
411

    
412
    logging.info("Creating instance %s in remote-import mode",
413
                 mrt.move.dest_instance_name)
414
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
415
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
416
                                  mrt.move.dest_iallocator,
417
                                  mrt.src_instinfo, mrt.src_expinfo)
418
    mrt.PollJob(dest_client, job_id,
419
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
420

    
421
    logging.info("Import successful")
422

    
423
  @staticmethod
424
  def _SetImportInfo(mrt, impinfo):
425
    """Sets the remote import information and notifies source thread.
426

    
427
    @type mrt: L{MoveRuntime}
428
    @param mrt: Instance move runtime information
429
    @param impinfo: Remote import information
430

    
431
    """
432
    mrt.dest_to_source.acquire()
433
    try:
434
      mrt.dest_impinfo = impinfo
435
      mrt.dest_to_source.notifyAll()
436
    finally:
437
      mrt.dest_to_source.release()
438

    
439
  @staticmethod
440
  def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
441
    """Starts the instance creation in remote import mode.
442

    
443
    @type cl: L{rapi.client.GanetiRapiClient}
444
    @param cl: RAPI client
445
    @type name: string
446
    @param name: Instance name
447
    @type pnode: string or None
448
    @param pnode: Name of primary node on destination cluster
449
    @type snode: string or None
450
    @param snode: Name of secondary node on destination cluster
451
    @type iallocator: string or None
452
    @param iallocator: Name of iallocator to use
453
    @type instance: dict
454
    @param instance: Instance details from source cluster
455
    @type expinfo: dict
456
    @param expinfo: Prepared export information from source cluster
457
    @return: Job ID
458

    
459
    """
460
    disk_template = instance["disk_template"]
461

    
462
    disks = [{
463
      "size": i["size"],
464
      "mode": i["mode"],
465
      } for i in instance["disks"]]
466

    
467
    nics = [{
468
      "ip": ip,
469
      "mac": mac,
470
      "mode": mode,
471
      "link": link,
472
      } for ip, mac, mode, link in instance["nics"]]
473

    
474
    # TODO: Should this be the actual up/down status? (run_state)
475
    start = (instance["config_state"] == "up")
476

    
477
    assert len(disks) == len(instance["disks"])
478
    assert len(nics) == len(instance["nics"])
479

    
480
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
481
                             name, disk_template, disks, nics,
482
                             os=instance["os"],
483
                             pnode=pnode,
484
                             snode=snode,
485
                             start=start,
486
                             ip_check=False,
487
                             iallocator=iallocator,
488
                             hypervisor=instance["hypervisor"],
489
                             source_handshake=expinfo["handshake"],
490
                             source_x509_ca=expinfo["x509_ca"],
491
                             source_instance_name=instance["name"],
492
                             beparams=instance["be_instance"],
493
                             hvparams=instance["hv_instance"])
494

    
495

    
496
class MoveSourceExecutor(object):
497
  def __init__(self, src_client, mrt):
498
    """Source side of an instance move.
499

    
500
    @type src_client: L{rapi.client.GanetiRapiClient}
501
    @param src_client: RAPI client
502
    @type mrt: L{MoveRuntime}
503
    @param mrt: Instance move runtime information
504

    
505
    """
506
    logging.info("Checking whether instance exists")
507
    self._CheckInstance(src_client, mrt.move.src_instance_name)
508

    
509
    logging.info("Retrieving instance information from source cluster")
510
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
511
                                     mrt.move.src_instance_name)
512

    
513
    logging.info("Preparing export on source cluster")
514
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
515
                                  mrt.move.src_instance_name)
516
    assert "handshake" in expinfo
517
    assert "x509_key_name" in expinfo
518
    assert "x509_ca" in expinfo
519

    
520
    # Hand information to destination thread
521
    mrt.source_to_dest.acquire()
522
    try:
523
      mrt.src_instinfo = instinfo
524
      mrt.src_expinfo = expinfo
525
      mrt.source_to_dest.notifyAll()
526
    finally:
527
      mrt.source_to_dest.release()
528

    
529
    logging.info("Waiting for destination information to become available")
530
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
531

    
532
    logging.info("Starting remote export on source cluster")
533
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
534
                         expinfo["x509_key_name"], mrt.dest_impinfo)
535

    
536
    logging.info("Export successful")
537

    
538
  @staticmethod
539
  def _CheckInstance(cl, name):
540
    """Checks whether the instance exists on the source cluster.
541

    
542
    @type cl: L{rapi.client.GanetiRapiClient}
543
    @param cl: RAPI client
544
    @type name: string
545
    @param name: Instance name
546

    
547
    """
548
    try:
549
      cl.GetInstance(name)
550
    except rapi.client.GanetiApiError, err:
551
      if err.code == rapi.client.HTTP_NOT_FOUND:
552
        raise Error("Instance %s not found (%s)" % (name, str(err)))
553
      raise
554

    
555
  @staticmethod
556
  def _GetInstanceInfo(cl, poll_job_fn, name):
557
    """Retrieves detailed instance information from source cluster.
558

    
559
    @type cl: L{rapi.client.GanetiRapiClient}
560
    @param cl: RAPI client
561
    @type poll_job_fn: callable
562
    @param poll_job_fn: Function to poll for job result
563
    @type name: string
564
    @param name: Instance name
565

    
566
    """
567
    job_id = cl.GetInstanceInfo(name, static=True)
568
    result = poll_job_fn(cl, job_id)
569
    assert len(result[0].keys()) == 1
570
    return result[0][result[0].keys()[0]]
571

    
572
  @staticmethod
573
  def _PrepareExport(cl, poll_job_fn, name):
574
    """Prepares export on source cluster.
575

    
576
    @type cl: L{rapi.client.GanetiRapiClient}
577
    @param cl: RAPI client
578
    @type poll_job_fn: callable
579
    @param poll_job_fn: Function to poll for job result
580
    @type name: string
581
    @param name: Instance name
582

    
583
    """
584
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
585
    return poll_job_fn(cl, job_id)[0]
586

    
587
  @staticmethod
588
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
589
    """Exports instance from source cluster.
590

    
591
    @type cl: L{rapi.client.GanetiRapiClient}
592
    @param cl: RAPI client
593
    @type poll_job_fn: callable
594
    @param poll_job_fn: Function to poll for job result
595
    @type name: string
596
    @param name: Instance name
597
    @param x509_key_name: Source X509 key
598
    @param impinfo: Import information from destination cluster
599

    
600
    """
601
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
602
                               impinfo["disks"], shutdown=True,
603
                               remove_instance=True,
604
                               x509_key_name=x509_key_name,
605
                               destination_x509_ca=impinfo["x509_ca"])
606
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
607

    
608
    if not (fin_resu and compat.all(dresults)):
609
      raise Error("Export failed for disks %s" %
610
                  utils.CommaJoin(str(idx) for idx, result
611
                                  in enumerate(dresults) if not result))
612

    
613

    
614
class MoveSourceWorker(workerpool.BaseWorker):
615
  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
616
    """Executes an instance move.
617

    
618
    @type rapi_factory: L{RapiClientFactory}
619
    @param rapi_factory: RAPI client factory
620
    @type move: L{InstanceMove}
621
    @param move: Instance move information
622

    
623
    """
624
    try:
625
      logging.info("Preparing to move %s from cluster %s to %s as %s",
626
                   move.src_instance_name, rapi_factory.src_cluster_name,
627
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
628

    
629
      mrt = MoveRuntime(move)
630

    
631
      logging.debug("Starting destination thread")
632
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
633
                                     target=mrt.HandleErrors,
634
                                     args=("dest", MoveDestExecutor,
635
                                           rapi_factory.GetDestClient(),
636
                                           mrt, ))
637
      dest_thread.start()
638
      try:
639
        mrt.HandleErrors("src", MoveSourceExecutor,
640
                         rapi_factory.GetSourceClient(), mrt)
641
      finally:
642
        dest_thread.join()
643

    
644
      if mrt.src_error_message or mrt.dest_error_message:
645
        move.error_message = ("Source error: %s, destination error: %s" %
646
                              (mrt.src_error_message, mrt.dest_error_message))
647
      else:
648
        move.error_message = None
649
    except Exception, err: # pylint: disable-msg=W0703
650
      logging.exception("Caught unhandled exception")
651
      move.error_message = str(err)
652

    
653

    
654
def CheckRapiSetup(rapi_factory):
655
  """Checks the RAPI setup by retrieving the version.
656

    
657
  @type rapi_factory: L{RapiClientFactory}
658
  @param rapi_factory: RAPI client factory
659

    
660
  """
661
  src_client = rapi_factory.GetSourceClient()
662
  logging.info("Connecting to source RAPI server")
663
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
664

    
665
  dest_client = rapi_factory.GetDestClient()
666
  logging.info("Connecting to destination RAPI server")
667
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
668

    
669

    
670
def SetupLogging(options):
671
  """Setting up logging infrastructure.
672

    
673
  @param options: Parsed command line options
674

    
675
  """
676
  fmt = "%(asctime)s: %(threadName)s "
677
  if options.debug or options.verbose:
678
    fmt += "%(levelname)s "
679
  fmt += "%(message)s"
680

    
681
  formatter = logging.Formatter(fmt)
682

    
683
  stderr_handler = logging.StreamHandler()
684
  stderr_handler.setFormatter(formatter)
685
  if options.debug:
686
    stderr_handler.setLevel(logging.NOTSET)
687
  elif options.verbose:
688
    stderr_handler.setLevel(logging.INFO)
689
  else:
690
    stderr_handler.setLevel(logging.ERROR)
691

    
692
  root_logger = logging.getLogger("")
693
  root_logger.setLevel(logging.NOTSET)
694
  root_logger.addHandler(stderr_handler)
695

    
696

    
697
def ParseOptions():
698
  """Parses options passed to program.
699

    
700
  """
701
  program = os.path.basename(sys.argv[0])
702

    
703
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
704
                                        " <source-cluster> <dest-cluster>"
705
                                        " <instance...>"),
706
                                 prog=program)
707
  parser.add_option(cli.DEBUG_OPT)
708
  parser.add_option(cli.VERBOSE_OPT)
709
  parser.add_option(cli.IALLOCATOR_OPT)
710
  parser.add_option(SRC_RAPI_PORT_OPT)
711
  parser.add_option(SRC_CA_FILE_OPT)
712
  parser.add_option(SRC_USERNAME_OPT)
713
  parser.add_option(SRC_PASSWORD_FILE_OPT)
714
  parser.add_option(DEST_RAPI_PORT_OPT)
715
  parser.add_option(DEST_CA_FILE_OPT)
716
  parser.add_option(DEST_USERNAME_OPT)
717
  parser.add_option(DEST_PASSWORD_FILE_OPT)
718
  parser.add_option(DEST_INSTANCE_NAME_OPT)
719
  parser.add_option(DEST_PRIMARY_NODE_OPT)
720
  parser.add_option(DEST_SECONDARY_NODE_OPT)
721
  parser.add_option(PARALLEL_OPT)
722

    
723
  (options, args) = parser.parse_args()
724

    
725
  return (parser, options, args)
726

    
727

    
728
def CheckOptions(parser, options, args):
729
  """Checks options and arguments for validity.
730

    
731
  """
732
  if len(args) < 3:
733
    parser.error("Not enough arguments")
734

    
735
  src_cluster_name = args.pop(0)
736
  dest_cluster_name = args.pop(0)
737
  instance_names = args
738

    
739
  assert len(instance_names) > 0
740

    
741
  # TODO: Remove once using system default paths for SSL certificate
742
  # verification is implemented
743
  if not options.src_ca_file:
744
    parser.error("Missing source cluster CA file")
745

    
746
  if options.parallel < 1:
747
    parser.error("Number of simultaneous moves must be >= 1")
748

    
749
  if not (bool(options.iallocator) ^
750
          bool(options.dest_primary_node or options.dest_secondary_node)):
751
    parser.error("Destination node and iallocator options exclude each other")
752

    
753
  if len(instance_names) == 1:
754
    # Moving one instance only
755
    if not (options.iallocator or
756
            options.dest_primary_node or
757
            options.dest_secondary_node):
758
      parser.error("An iallocator or the destination node is required")
759
  else:
760
    # Moving more than one instance
761
    if (options.dest_instance_name or options.dest_primary_node or
762
        options.dest_secondary_node):
763
      parser.error("The options --dest-instance-name, --dest-primary-node and"
764
                   " --dest-secondary-node can only be used when moving exactly"
765
                   " one instance")
766

    
767
    if not options.iallocator:
768
      parser.error("An iallocator must be specified for moving more than one"
769
                   " instance")
770

    
771
  return (src_cluster_name, dest_cluster_name, instance_names)
772

    
773

    
774
@rapi.client.UsesRapiClient
775
def main():
776
  """Main routine.
777

    
778
  """
779
  (parser, options, args) = ParseOptions()
780

    
781
  SetupLogging(options)
782

    
783
  (src_cluster_name, dest_cluster_name, instance_names) = \
784
    CheckOptions(parser, options, args)
785

    
786
  logging.info("Source cluster: %s", src_cluster_name)
787
  logging.info("Destination cluster: %s", dest_cluster_name)
788
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
789

    
790
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
791

    
792
  CheckRapiSetup(rapi_factory)
793

    
794
  assert (len(instance_names) == 1 or
795
          not (options.dest_primary_node or options.dest_secondary_node))
796
  assert len(instance_names) == 1 or options.iallocator
797
  assert (len(instance_names) > 1 or options.iallocator or
798
          options.dest_primary_node or options.dest_secondary_node)
799

    
800
  # Prepare list of instance moves
801
  moves = []
802
  for src_instance_name in instance_names:
803
    if options.dest_instance_name:
804
      assert len(instance_names) == 1
805
      # Rename instance
806
      dest_instance_name = options.dest_instance_name
807
    else:
808
      dest_instance_name = src_instance_name
809

    
810
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
811
                              options.dest_primary_node,
812
                              options.dest_secondary_node,
813
                              options.iallocator))
814

    
815
  assert len(moves) == len(instance_names)
816

    
817
  # Start workerpool
818
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
819
  try:
820
    # Add instance moves to workerpool
821
    for move in moves:
822
      wp.AddTask(rapi_factory, move)
823

    
824
    # Wait for all moves to finish
825
    wp.Quiesce()
826

    
827
  finally:
828
    wp.TerminateWorkers()
829

    
830
  # There should be no threads running at this point, hence not using locks
831
  # anymore
832

    
833
  logging.info("Instance move results:")
834

    
835
  for move in moves:
836
    if move.dest_instance_name == move.src_instance_name:
837
      name = move.src_instance_name
838
    else:
839
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
840

    
841
    if move.error_message:
842
      msg = "Failed (%s)" % move.error_message
843
    else:
844
      msg = "Success"
845

    
846
    logging.info("%s: %s", name, msg)
847

    
848
  if compat.any(move.error_message for move in moves):
849
    sys.exit(constants.EXIT_FAILURE)
850

    
851
  sys.exit(constants.EXIT_SUCCESS)
852

    
853

    
854
if __name__ == "__main__":
855
  main()