Statistics
| Branch: | Tag: | Revision:

root / tools / move-instance @ a59b0dc4

History | View | Annotate | Download (27.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to move instances from one cluster to another.
22

    
23
"""
24

    
25
# pylint: disable-msg=C0103
26
# C0103: Invalid name move-instance
27

    
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

    
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import compat
40
from ganeti import rapi
41

    
42
import ganeti.rapi.client # pylint: disable-msg=W0611
43
import ganeti.rapi.client_utils
44

    
45

    
46
SRC_RAPI_PORT_OPT = \
47
  cli.cli_option("--src-rapi-port", action="store", type="int",
48
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49
                 help=("Source cluster RAPI port (defaults to %s)" %
50
                       constants.DEFAULT_RAPI_PORT))
51

    
52
SRC_CA_FILE_OPT = \
53
  cli.cli_option("--src-ca-file", action="store", type="string",
54
                 dest="src_ca_file",
55
                 help=("File containing source cluster Certificate"
56
                       " Authority (CA) in PEM format"))
57

    
58
SRC_USERNAME_OPT = \
59
  cli.cli_option("--src-username", action="store", type="string",
60
                 dest="src_username", default=None,
61
                 help="Source cluster username")
62

    
63
SRC_PASSWORD_FILE_OPT = \
64
  cli.cli_option("--src-password-file", action="store", type="string",
65
                 dest="src_password_file",
66
                 help="File containing source cluster password")
67

    
68
DEST_RAPI_PORT_OPT = \
69
  cli.cli_option("--dest-rapi-port", action="store", type="int",
70
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71
                 help=("Destination cluster RAPI port (defaults to source"
72
                       " cluster RAPI port)"))
73

    
74
DEST_CA_FILE_OPT = \
75
  cli.cli_option("--dest-ca-file", action="store", type="string",
76
                 dest="dest_ca_file",
77
                 help=("File containing destination cluster Certificate"
78
                       " Authority (CA) in PEM format (defaults to source"
79
                       " cluster CA)"))
80

    
81
DEST_USERNAME_OPT = \
82
  cli.cli_option("--dest-username", action="store", type="string",
83
                 dest="dest_username", default=None,
84
                 help=("Destination cluster username (defaults to"
85
                       " source cluster username)"))
86

    
87
DEST_PASSWORD_FILE_OPT = \
88
  cli.cli_option("--dest-password-file", action="store", type="string",
89
                 dest="dest_password_file",
90
                 help=("File containing destination cluster password"
91
                       " (defaults to source cluster password)"))
92

    
93
DEST_INSTANCE_NAME_OPT = \
94
  cli.cli_option("--dest-instance-name", action="store", type="string",
95
                 dest="dest_instance_name",
96
                 help=("Instance name on destination cluster (only"
97
                       " when moving exactly one instance)"))
98

    
99
DEST_PRIMARY_NODE_OPT = \
100
  cli.cli_option("--dest-primary-node", action="store", type="string",
101
                 dest="dest_primary_node",
102
                 help=("Primary node on destination cluster (only"
103
                       " when moving exactly one instance)"))
104

    
105
DEST_SECONDARY_NODE_OPT = \
106
  cli.cli_option("--dest-secondary-node", action="store", type="string",
107
                 dest="dest_secondary_node",
108
                 help=("Secondary node on destination cluster (only"
109
                       " when moving exactly one instance)"))
110

    
111
PARALLEL_OPT = \
112
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113
                 dest="parallel", metavar="<number>",
114
                 help="Number of instances to be moved simultaneously")
115

    
116

    
117
class Error(Exception):
118
  """Generic error.
119

    
120
  """
121

    
122

    
123
class Abort(Error):
124
  """Special exception for aborting import/export.
125

    
126
  """
127

    
128

    
129
class RapiClientFactory:
130
  """Factory class for creating RAPI clients.
131

    
132
  @ivar src_cluster_name: Source cluster name
133
  @ivar dest_cluster_name: Destination cluster name
134
  @ivar GetSourceClient: Callable returning new client for source cluster
135
  @ivar GetDestClient: Callable returning new client for destination cluster
136

    
137
  """
138
  def __init__(self, options, src_cluster_name, dest_cluster_name):
139
    """Initializes this class.
140

    
141
    @param options: Program options
142
    @type src_cluster_name: string
143
    @param src_cluster_name: Source cluster name
144
    @type dest_cluster_name: string
145
    @param dest_cluster_name: Destination cluster name
146

    
147
    """
148
    self.src_cluster_name = src_cluster_name
149
    self.dest_cluster_name = dest_cluster_name
150

    
151
    # TODO: Implement timeouts for RAPI connections
152
    # TODO: Support for using system default paths for verifying SSL certificate
153
    logging.debug("Using '%s' as source CA", options.src_ca_file)
154
    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
155

    
156
    if options.dest_ca_file:
157
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
158
      dest_curl_config = \
159
        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
160
    else:
161
      logging.debug("Using source CA for destination")
162
      dest_curl_config = src_curl_config
163

    
164
    logging.debug("Source RAPI server is %s:%s",
165
                  src_cluster_name, options.src_rapi_port)
166
    logging.debug("Source username is '%s'", options.src_username)
167

    
168
    if options.src_username is None:
169
      src_username = ""
170
    else:
171
      src_username = options.src_username
172

    
173
    if options.src_password_file:
174
      logging.debug("Reading '%s' for source password",
175
                    options.src_password_file)
176
      src_password = utils.ReadOneLineFile(options.src_password_file,
177
                                           strict=True)
178
    else:
179
      logging.debug("Source has no password")
180
      src_password = None
181

    
182
    self.GetSourceClient = lambda: \
183
      rapi.client.GanetiRapiClient(src_cluster_name,
184
                                   port=options.src_rapi_port,
185
                                   curl_config_fn=src_curl_config,
186
                                   username=src_username,
187
                                   password=src_password)
188

    
189
    if options.dest_rapi_port:
190
      dest_rapi_port = options.dest_rapi_port
191
    else:
192
      dest_rapi_port = options.src_rapi_port
193

    
194
    if options.dest_username is None:
195
      dest_username = src_username
196
    else:
197
      dest_username = options.dest_username
198

    
199
    logging.debug("Destination RAPI server is %s:%s",
200
                  dest_cluster_name, dest_rapi_port)
201
    logging.debug("Destination username is '%s'", dest_username)
202

    
203
    if options.dest_password_file:
204
      logging.debug("Reading '%s' for destination password",
205
                    options.dest_password_file)
206
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
207
                                            strict=True)
208
    else:
209
      logging.debug("Using source password for destination")
210
      dest_password = src_password
211

    
212
    self.GetDestClient = lambda: \
213
      rapi.client.GanetiRapiClient(dest_cluster_name,
214
                                   port=dest_rapi_port,
215
                                   curl_config_fn=dest_curl_config,
216
                                   username=dest_username,
217
                                   password=dest_password)
218

    
219

    
220
class MoveJobPollReportCb(cli.JobPollReportCbBase):
221
  def __init__(self, abort_check_fn, remote_import_fn):
222
    """Initializes this class.
223

    
224
    @type abort_check_fn: callable
225
    @param abort_check_fn: Function to check whether move is aborted
226
    @type remote_import_fn: callable or None
227
    @param remote_import_fn: Callback for reporting received remote import
228
                             information
229

    
230
    """
231
    cli.JobPollReportCbBase.__init__(self)
232
    self._abort_check_fn = abort_check_fn
233
    self._remote_import_fn = remote_import_fn
234

    
235
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236
    """Handles a log message.
237

    
238
    """
239
    if log_type == constants.ELOG_REMOTE_IMPORT:
240
      logging.debug("Received remote import information")
241

    
242
      if not self._remote_import_fn:
243
        raise RuntimeError("Received unexpected remote import information")
244

    
245
      assert "x509_ca" in log_msg
246
      assert "disks" in log_msg
247

    
248
      self._remote_import_fn(log_msg)
249

    
250
      return
251

    
252
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253
                 cli.FormatLogMessage(log_type, log_msg))
254

    
255
  def ReportNotChanged(self, job_id, status):
256
    """Called if a job hasn't changed in a while.
257

    
258
    """
259
    try:
260
      # Check whether we were told to abort by the other thread
261
      self._abort_check_fn()
262
    except Abort:
263
      logging.warning("Aborting despite job %s still running", job_id)
264
      raise
265

    
266

    
267
class InstanceMove(object):
268
  """Status class for instance moves.
269

    
270
  """
271
  def __init__(self, src_instance_name, dest_instance_name,
272
               dest_pnode, dest_snode, dest_iallocator):
273
    """Initializes this class.
274

    
275
    @type src_instance_name: string
276
    @param src_instance_name: Instance name on source cluster
277
    @type dest_instance_name: string
278
    @param dest_instance_name: Instance name on destination cluster
279
    @type dest_pnode: string or None
280
    @param dest_pnode: Name of primary node on destination cluster
281
    @type dest_snode: string or None
282
    @param dest_snode: Name of secondary node on destination cluster
283
    @type dest_iallocator: string or None
284
    @param dest_iallocator: Name of iallocator to use
285

    
286
    """
287
    self.src_instance_name = src_instance_name
288
    self.dest_instance_name = dest_instance_name
289
    self.dest_pnode = dest_pnode
290
    self.dest_snode = dest_snode
291
    self.dest_iallocator = dest_iallocator
292

    
293
    self.error_message = None
294

    
295

    
296
class MoveRuntime(object):
297
  """Class to keep track of instance move.
298

    
299
  """
300
  def __init__(self, move):
301
    """Initializes this class.
302

    
303
    @type move: L{InstanceMove}
304

    
305
    """
306
    self.move = move
307

    
308
    # Thread synchronization
309
    self.lock = threading.Lock()
310
    self.source_to_dest = threading.Condition(self.lock)
311
    self.dest_to_source = threading.Condition(self.lock)
312

    
313
    # Source information
314
    self.src_error_message = None
315
    self.src_expinfo = None
316
    self.src_instinfo = None
317

    
318
    # Destination information
319
    self.dest_error_message = None
320
    self.dest_impinfo = None
321

    
322
  def HandleErrors(self, prefix, fn, *args):
323
    """Wrapper to catch errors and abort threads.
324

    
325
    @type prefix: string
326
    @param prefix: Variable name prefix ("src" or "dest")
327
    @type fn: callable
328
    @param fn: Function
329

    
330
    """
331
    assert prefix in ("dest", "src")
332

    
333
    try:
334
      # Call inner function
335
      fn(*args)
336

    
337
      errmsg = None
338
    except Abort:
339
      errmsg = "Aborted"
340
    except Exception, err:
341
      logging.exception("Caught unhandled exception")
342
      errmsg = str(err)
343

    
344
    setattr(self, "%s_error_message" % prefix, errmsg)
345

    
346
    self.lock.acquire()
347
    try:
348
      self.source_to_dest.notifyAll()
349
      self.dest_to_source.notifyAll()
350
    finally:
351
      self.lock.release()
352

    
353
  def CheckAbort(self):
354
    """Check whether thread should be aborted.
355

    
356
    @raise Abort: When thread should be aborted
357

    
358
    """
359
    if not (self.src_error_message is None and
360
            self.dest_error_message is None):
361
      logging.info("Aborting")
362
      raise Abort()
363

    
364
  def Wait(self, cond, check_fn):
365
    """Waits for a condition to become true.
366

    
367
    @type cond: threading.Condition
368
    @param cond: Threading condition
369
    @type check_fn: callable
370
    @param check_fn: Function to check whether condition is true
371

    
372
    """
373
    cond.acquire()
374
    try:
375
      while check_fn(self):
376
        self.CheckAbort()
377
        cond.wait()
378
    finally:
379
      cond.release()
380

    
381
  def PollJob(self, cl, job_id, remote_import_fn=None):
382
    """Wrapper for polling a job.
383

    
384
    @type cl: L{rapi.client.GanetiRapiClient}
385
    @param cl: RAPI client
386
    @type job_id: string
387
    @param job_id: Job ID
388
    @type remote_import_fn: callable or None
389
    @param remote_import_fn: Callback for reporting received remote import
390
                             information
391

    
392
    """
393
    return rapi.client_utils.PollJob(cl, job_id,
394
                                     MoveJobPollReportCb(self.CheckAbort,
395
                                                         remote_import_fn))
396

    
397

    
398
class MoveDestExecutor(object):
399
  def __init__(self, dest_client, mrt):
400
    """Destination side of an instance move.
401

    
402
    @type dest_client: L{rapi.client.GanetiRapiClient}
403
    @param dest_client: RAPI client
404
    @type mrt: L{MoveRuntime}
405
    @param mrt: Instance move runtime information
406

    
407
    """
408
    logging.debug("Waiting for instance information to become available")
409
    mrt.Wait(mrt.source_to_dest,
410
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
411

    
412
    logging.info("Creating instance %s in remote-import mode",
413
                 mrt.move.dest_instance_name)
414
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
415
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
416
                                  mrt.move.dest_iallocator,
417
                                  mrt.src_instinfo, mrt.src_expinfo)
418
    mrt.PollJob(dest_client, job_id,
419
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
420

    
421
    logging.info("Import successful")
422

    
423
  @staticmethod
424
  def _SetImportInfo(mrt, impinfo):
425
    """Sets the remote import information and notifies source thread.
426

    
427
    @type mrt: L{MoveRuntime}
428
    @param mrt: Instance move runtime information
429
    @param impinfo: Remote import information
430

    
431
    """
432
    mrt.dest_to_source.acquire()
433
    try:
434
      mrt.dest_impinfo = impinfo
435
      mrt.dest_to_source.notifyAll()
436
    finally:
437
      mrt.dest_to_source.release()
438

    
439
  @staticmethod
440
  def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
441
    """Starts the instance creation in remote import mode.
442

    
443
    @type cl: L{rapi.client.GanetiRapiClient}
444
    @param cl: RAPI client
445
    @type name: string
446
    @param name: Instance name
447
    @type pnode: string or None
448
    @param pnode: Name of primary node on destination cluster
449
    @type snode: string or None
450
    @param snode: Name of secondary node on destination cluster
451
    @type iallocator: string or None
452
    @param iallocator: Name of iallocator to use
453
    @type instance: dict
454
    @param instance: Instance details from source cluster
455
    @type expinfo: dict
456
    @param expinfo: Prepared export information from source cluster
457
    @return: Job ID
458

    
459
    """
460
    disk_template = instance["disk_template"]
461

    
462
    disks = [{
463
      "size": i["size"],
464
      "mode": i["mode"],
465
      } for i in instance["disks"]]
466

    
467
    nics = [{
468
      "ip": ip,
469
      "mac": mac,
470
      "mode": mode,
471
      "link": link,
472
      } for ip, mac, mode, link in instance["nics"]]
473

    
474
    # TODO: Should this be the actual up/down status? (run_state)
475
    start = (instance["config_state"] == "up")
476

    
477
    assert len(disks) == len(instance["disks"])
478
    assert len(nics) == len(instance["nics"])
479

    
480
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
481
                             name, disk_template, disks, nics,
482
                             os=instance["os"],
483
                             pnode=pnode,
484
                             snode=snode,
485
                             start=start,
486
                             ip_check=False,
487
                             iallocator=iallocator,
488
                             hypervisor=instance["hypervisor"],
489
                             source_handshake=expinfo["handshake"],
490
                             source_x509_ca=expinfo["x509_ca"],
491
                             source_instance_name=instance["name"],
492
                             beparams=instance["be_instance"],
493
                             hvparams=instance["hv_instance"],
494
                             osparams=instance["os_instance"])
495

    
496

    
497
class MoveSourceExecutor(object):
498
  def __init__(self, src_client, mrt):
499
    """Source side of an instance move.
500

    
501
    @type src_client: L{rapi.client.GanetiRapiClient}
502
    @param src_client: RAPI client
503
    @type mrt: L{MoveRuntime}
504
    @param mrt: Instance move runtime information
505

    
506
    """
507
    logging.info("Checking whether instance exists")
508
    self._CheckInstance(src_client, mrt.move.src_instance_name)
509

    
510
    logging.info("Retrieving instance information from source cluster")
511
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
512
                                     mrt.move.src_instance_name)
513

    
514
    logging.info("Preparing export on source cluster")
515
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
516
                                  mrt.move.src_instance_name)
517
    assert "handshake" in expinfo
518
    assert "x509_key_name" in expinfo
519
    assert "x509_ca" in expinfo
520

    
521
    # Hand information to destination thread
522
    mrt.source_to_dest.acquire()
523
    try:
524
      mrt.src_instinfo = instinfo
525
      mrt.src_expinfo = expinfo
526
      mrt.source_to_dest.notifyAll()
527
    finally:
528
      mrt.source_to_dest.release()
529

    
530
    logging.info("Waiting for destination information to become available")
531
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
532

    
533
    logging.info("Starting remote export on source cluster")
534
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
535
                         expinfo["x509_key_name"], mrt.dest_impinfo)
536

    
537
    logging.info("Export successful")
538

    
539
  @staticmethod
540
  def _CheckInstance(cl, name):
541
    """Checks whether the instance exists on the source cluster.
542

    
543
    @type cl: L{rapi.client.GanetiRapiClient}
544
    @param cl: RAPI client
545
    @type name: string
546
    @param name: Instance name
547

    
548
    """
549
    try:
550
      cl.GetInstance(name)
551
    except rapi.client.GanetiApiError, err:
552
      if err.code == rapi.client.HTTP_NOT_FOUND:
553
        raise Error("Instance %s not found (%s)" % (name, str(err)))
554
      raise
555

    
556
  @staticmethod
557
  def _GetInstanceInfo(cl, poll_job_fn, name):
558
    """Retrieves detailed instance information from source cluster.
559

    
560
    @type cl: L{rapi.client.GanetiRapiClient}
561
    @param cl: RAPI client
562
    @type poll_job_fn: callable
563
    @param poll_job_fn: Function to poll for job result
564
    @type name: string
565
    @param name: Instance name
566

    
567
    """
568
    job_id = cl.GetInstanceInfo(name, static=True)
569
    result = poll_job_fn(cl, job_id)
570
    assert len(result[0].keys()) == 1
571
    return result[0][result[0].keys()[0]]
572

    
573
  @staticmethod
574
  def _PrepareExport(cl, poll_job_fn, name):
575
    """Prepares export on source cluster.
576

    
577
    @type cl: L{rapi.client.GanetiRapiClient}
578
    @param cl: RAPI client
579
    @type poll_job_fn: callable
580
    @param poll_job_fn: Function to poll for job result
581
    @type name: string
582
    @param name: Instance name
583

    
584
    """
585
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
586
    return poll_job_fn(cl, job_id)[0]
587

    
588
  @staticmethod
589
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
590
    """Exports instance from source cluster.
591

    
592
    @type cl: L{rapi.client.GanetiRapiClient}
593
    @param cl: RAPI client
594
    @type poll_job_fn: callable
595
    @param poll_job_fn: Function to poll for job result
596
    @type name: string
597
    @param name: Instance name
598
    @param x509_key_name: Source X509 key
599
    @param impinfo: Import information from destination cluster
600

    
601
    """
602
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
603
                               impinfo["disks"], shutdown=True,
604
                               remove_instance=True,
605
                               x509_key_name=x509_key_name,
606
                               destination_x509_ca=impinfo["x509_ca"])
607
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
608

    
609
    if not (fin_resu and compat.all(dresults)):
610
      raise Error("Export failed for disks %s" %
611
                  utils.CommaJoin(str(idx) for idx, result
612
                                  in enumerate(dresults) if not result))
613

    
614

    
615
class MoveSourceWorker(workerpool.BaseWorker):
616
  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
617
    """Executes an instance move.
618

    
619
    @type rapi_factory: L{RapiClientFactory}
620
    @param rapi_factory: RAPI client factory
621
    @type move: L{InstanceMove}
622
    @param move: Instance move information
623

    
624
    """
625
    try:
626
      logging.info("Preparing to move %s from cluster %s to %s as %s",
627
                   move.src_instance_name, rapi_factory.src_cluster_name,
628
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
629

    
630
      mrt = MoveRuntime(move)
631

    
632
      logging.debug("Starting destination thread")
633
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
634
                                     target=mrt.HandleErrors,
635
                                     args=("dest", MoveDestExecutor,
636
                                           rapi_factory.GetDestClient(),
637
                                           mrt, ))
638
      dest_thread.start()
639
      try:
640
        mrt.HandleErrors("src", MoveSourceExecutor,
641
                         rapi_factory.GetSourceClient(), mrt)
642
      finally:
643
        dest_thread.join()
644

    
645
      if mrt.src_error_message or mrt.dest_error_message:
646
        move.error_message = ("Source error: %s, destination error: %s" %
647
                              (mrt.src_error_message, mrt.dest_error_message))
648
      else:
649
        move.error_message = None
650
    except Exception, err: # pylint: disable-msg=W0703
651
      logging.exception("Caught unhandled exception")
652
      move.error_message = str(err)
653

    
654

    
655
def CheckRapiSetup(rapi_factory):
656
  """Checks the RAPI setup by retrieving the version.
657

    
658
  @type rapi_factory: L{RapiClientFactory}
659
  @param rapi_factory: RAPI client factory
660

    
661
  """
662
  src_client = rapi_factory.GetSourceClient()
663
  logging.info("Connecting to source RAPI server")
664
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
665

    
666
  dest_client = rapi_factory.GetDestClient()
667
  logging.info("Connecting to destination RAPI server")
668
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
669

    
670

    
671
def SetupLogging(options):
672
  """Setting up logging infrastructure.
673

    
674
  @param options: Parsed command line options
675

    
676
  """
677
  fmt = "%(asctime)s: %(threadName)s "
678
  if options.debug or options.verbose:
679
    fmt += "%(levelname)s "
680
  fmt += "%(message)s"
681

    
682
  formatter = logging.Formatter(fmt)
683

    
684
  stderr_handler = logging.StreamHandler()
685
  stderr_handler.setFormatter(formatter)
686
  if options.debug:
687
    stderr_handler.setLevel(logging.NOTSET)
688
  elif options.verbose:
689
    stderr_handler.setLevel(logging.INFO)
690
  else:
691
    stderr_handler.setLevel(logging.ERROR)
692

    
693
  root_logger = logging.getLogger("")
694
  root_logger.setLevel(logging.NOTSET)
695
  root_logger.addHandler(stderr_handler)
696

    
697

    
698
def ParseOptions():
699
  """Parses options passed to program.
700

    
701
  """
702
  program = os.path.basename(sys.argv[0])
703

    
704
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
705
                                        " <source-cluster> <dest-cluster>"
706
                                        " <instance...>"),
707
                                 prog=program)
708
  parser.add_option(cli.DEBUG_OPT)
709
  parser.add_option(cli.VERBOSE_OPT)
710
  parser.add_option(cli.IALLOCATOR_OPT)
711
  parser.add_option(SRC_RAPI_PORT_OPT)
712
  parser.add_option(SRC_CA_FILE_OPT)
713
  parser.add_option(SRC_USERNAME_OPT)
714
  parser.add_option(SRC_PASSWORD_FILE_OPT)
715
  parser.add_option(DEST_RAPI_PORT_OPT)
716
  parser.add_option(DEST_CA_FILE_OPT)
717
  parser.add_option(DEST_USERNAME_OPT)
718
  parser.add_option(DEST_PASSWORD_FILE_OPT)
719
  parser.add_option(DEST_INSTANCE_NAME_OPT)
720
  parser.add_option(DEST_PRIMARY_NODE_OPT)
721
  parser.add_option(DEST_SECONDARY_NODE_OPT)
722
  parser.add_option(PARALLEL_OPT)
723

    
724
  (options, args) = parser.parse_args()
725

    
726
  return (parser, options, args)
727

    
728

    
729
def CheckOptions(parser, options, args):
730
  """Checks options and arguments for validity.
731

    
732
  """
733
  if len(args) < 3:
734
    parser.error("Not enough arguments")
735

    
736
  src_cluster_name = args.pop(0)
737
  dest_cluster_name = args.pop(0)
738
  instance_names = args
739

    
740
  assert len(instance_names) > 0
741

    
742
  # TODO: Remove once using system default paths for SSL certificate
743
  # verification is implemented
744
  if not options.src_ca_file:
745
    parser.error("Missing source cluster CA file")
746

    
747
  if options.parallel < 1:
748
    parser.error("Number of simultaneous moves must be >= 1")
749

    
750
  if not (bool(options.iallocator) ^
751
          bool(options.dest_primary_node or options.dest_secondary_node)):
752
    parser.error("Destination node and iallocator options exclude each other")
753

    
754
  if len(instance_names) == 1:
755
    # Moving one instance only
756
    if not (options.iallocator or
757
            options.dest_primary_node or
758
            options.dest_secondary_node):
759
      parser.error("An iallocator or the destination node is required")
760
  else:
761
    # Moving more than one instance
762
    if (options.dest_instance_name or options.dest_primary_node or
763
        options.dest_secondary_node):
764
      parser.error("The options --dest-instance-name, --dest-primary-node and"
765
                   " --dest-secondary-node can only be used when moving exactly"
766
                   " one instance")
767

    
768
    if not options.iallocator:
769
      parser.error("An iallocator must be specified for moving more than one"
770
                   " instance")
771

    
772
  return (src_cluster_name, dest_cluster_name, instance_names)
773

    
774

    
775
@rapi.client.UsesRapiClient
776
def main():
777
  """Main routine.
778

    
779
  """
780
  (parser, options, args) = ParseOptions()
781

    
782
  SetupLogging(options)
783

    
784
  (src_cluster_name, dest_cluster_name, instance_names) = \
785
    CheckOptions(parser, options, args)
786

    
787
  logging.info("Source cluster: %s", src_cluster_name)
788
  logging.info("Destination cluster: %s", dest_cluster_name)
789
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
790

    
791
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
792

    
793
  CheckRapiSetup(rapi_factory)
794

    
795
  assert (len(instance_names) == 1 or
796
          not (options.dest_primary_node or options.dest_secondary_node))
797
  assert len(instance_names) == 1 or options.iallocator
798
  assert (len(instance_names) > 1 or options.iallocator or
799
          options.dest_primary_node or options.dest_secondary_node)
800

    
801
  # Prepare list of instance moves
802
  moves = []
803
  for src_instance_name in instance_names:
804
    if options.dest_instance_name:
805
      assert len(instance_names) == 1
806
      # Rename instance
807
      dest_instance_name = options.dest_instance_name
808
    else:
809
      dest_instance_name = src_instance_name
810

    
811
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
812
                              options.dest_primary_node,
813
                              options.dest_secondary_node,
814
                              options.iallocator))
815

    
816
  assert len(moves) == len(instance_names)
817

    
818
  # Start workerpool
819
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
820
  try:
821
    # Add instance moves to workerpool
822
    for move in moves:
823
      wp.AddTask((rapi_factory, move))
824

    
825
    # Wait for all moves to finish
826
    wp.Quiesce()
827

    
828
  finally:
829
    wp.TerminateWorkers()
830

    
831
  # There should be no threads running at this point, hence not using locks
832
  # anymore
833

    
834
  logging.info("Instance move results:")
835

    
836
  for move in moves:
837
    if move.dest_instance_name == move.src_instance_name:
838
      name = move.src_instance_name
839
    else:
840
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
841

    
842
    if move.error_message:
843
      msg = "Failed (%s)" % move.error_message
844
    else:
845
      msg = "Success"
846

    
847
    logging.info("%s: %s", name, msg)
848

    
849
  if compat.any(move.error_message for move in moves):
850
    sys.exit(constants.EXIT_FAILURE)
851

    
852
  sys.exit(constants.EXIT_SUCCESS)
853

    
854

    
855
if __name__ == "__main__":
856
  main()