Revision 6bf273d5

b/Makefile.am
162 162
	doc/install-quick.rst \
163 163
	doc/install.rst \
164 164
	doc/locking.rst \
165
	doc/move-instance.rst \
165 166
	doc/news.rst \
166 167
	doc/rapi.rst \
167 168
	doc/security.rst \
......
242 243
	tools/cfgupgrade12 \
243 244
	tools/cluster-merge \
244 245
	tools/lvmstrap \
246
	tools/move-instance \
245 247
	tools/sanitize-config
246 248

  
247 249
pkglib_python_scripts = \
b/doc/index.rst
22 22
   hooks.rst
23 23
   iallocator.rst
24 24
   rapi.rst
25
   move-instance.rst
25 26
   devnotes.rst
26 27
   news.rst
27 28
   glossary.rst
b/doc/move-instance.rst
1
=================================
2
Moving instances between clusters
3
=================================
4

  
5
Starting with Ganeti 2.2, instances can be moved between separate Ganeti
6
clusters using a new tool, ``move-instance``. The tool has a number of
7
features:
8

  
9
- Moving a single or multiple instances
10
- Moving instances in parallel (``--parallel`` option)
11
- Renaming instance (only when moving a single instance)
12
- SSL certificate verification for RAPI connections
13

  
14
The design of the inter-cluster instances moves is described in detail
15
in the :doc:`Ganeti 2.2 design document <design-2.2>`. The instance move
16
tool talks to the Ganeti clusters via RAPI and can run on any machine
17
which can connect to the cluster's RAPI. Despite their similar name, the
18
instance move tool should not be confused with the ``gnt-instance move``
19
command, which is used to move without changes (instead of export/import
20
plus rename) an instance within the cluster.
21

  
22

  
23
Configuring clusters for instance moves
24
---------------------------------------
25

  
26
To prevent third parties from accessing the instance data, all data
27
exchanged between the clusters is signed using a secret key, the
28
"cluster domain secret". It is recommended to assign the same domain
29
secret to all clusters of the same security domain, so that instances
30
can be easily moved between them. By checking the signatures, the
31
destination cluster can be sure the third party (e.g. this tool) didn't
32
modify the received crypto keys and connection information.
33

  
34
.. highlight:: sh
35

  
36
To create a new, random cluster domain secret, run the following command
37
on the master node::
38

  
39
  gnt-cluster renew-crypto --new-cluster-domain-secret
40

  
41

  
42
To set the cluster domain secret, run the following command on the
43
master node::
44

  
45
  gnt-cluster renew-crypto --cluster-domain-secret=/.../ganeti.cds
46

  
47

  
48
Moving instances
49
----------------
50

  
51
As soon as the clusters share a cluster domain secret, instances can be
52
moved. The tool usage is as follows::
53

  
54
  move-instance [options] <source-cluster> <destination-cluster> <instance-name...>
55

  
56
Multiple instances can be moved with one invocation of the instance move
57
tool, though a few options are only available when moving a single
58
instance.
59

  
60
The most important options are listed below. Unless specified otherwise,
61
destination-related options default to the source value (e.g. setting
62
``--src-rapi-port=1234`` will make ``--dest-rapi-port``'s default 1234).
63

  
64
``--src-rapi-port``/``--dest-rapi-port``
65
  RAPI server TCP port, defaults to 5080.
66
``--src-ca-file``/``--dest-ca-file``
67
  Path to file containing source cluster Certificate Authority (CA) in
68
  PEM format. For self-signed certificates, this is the certificate
69
  itself. For certificates signed by a third party CA, the complete
70
  chain must be in the file (see documentation for
71
  ``SSL_CTX_load_verify_locations(3)``).
72
``--src-username``/``--dest-username``
73
  RAPI username, must have write access to cluster.
74
``--src-password-file``/``--dest-password-file``
75
  Path to file containing RAPI password (make sure to restrict access to
76
  this file).
77
``--dest-instance-name``
78
  When moving a single instance: Change name of instance on destination
79
  cluster.
80
``--dest-primary-node``
81
  When moving a single instance: Primary node on destination cluster.
82
``--dest-secondary-node``
83
  When moving a single instance: Secondary node on destination cluster.
84
``--iallocator``
85
  Iallocator for creating instance on destination cluster.
86
``--parallel``
87
  Number of instance moves to run in parallel.
88
``--verbose``/``--debug``
89
  Increase output verbosity.
90

  
91
The exit value of the tool is zero if and only if all instance moves
92
were successful.
93

  
94
.. vim: set textwidth=72 :
95
.. Local Variables:
96
.. mode: rst
97
.. fill-column: 72
98
.. End:
b/tools/move-instance
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21
"""Tool to move instances from one cluster to another.
22

  
23
"""
24

  
25
# pylint: disable-msg=C0103
26
# C0103: Invalid name move-instance
27

  
28
import os
29
import sys
30
import time
31
import logging
32
import optparse
33
import threading
34

  
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti import workerpool
39
from ganeti import compat
40
from ganeti import rapi
41

  
42
import ganeti.rapi.client # pylint: disable-msg=W0611
43
import ganeti.rapi.client_utils
44

  
45

  
46
SRC_RAPI_PORT_OPT = \
47
  cli.cli_option("--src-rapi-port", action="store", type="int",
48
                 dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
49
                 help=("Source cluster RAPI port (defaults to %s)" %
50
                       constants.DEFAULT_RAPI_PORT))
51

  
52
SRC_CA_FILE_OPT = \
53
  cli.cli_option("--src-ca-file", action="store", type="string",
54
                 dest="src_ca_file",
55
                 help=("File containing source cluster Certificate"
56
                       " Authority (CA) in PEM format"))
57

  
58
SRC_USERNAME_OPT = \
59
  cli.cli_option("--src-username", action="store", type="string",
60
                 dest="src_username", default=None,
61
                 help="Source cluster username")
62

  
63
SRC_PASSWORD_FILE_OPT = \
64
  cli.cli_option("--src-password-file", action="store", type="string",
65
                 dest="src_password_file",
66
                 help="File containing source cluster password")
67

  
68
DEST_RAPI_PORT_OPT = \
69
  cli.cli_option("--dest-rapi-port", action="store", type="int",
70
                 dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
71
                 help=("Destination cluster RAPI port (defaults to source"
72
                       " cluster RAPI port)"))
73

  
74
DEST_CA_FILE_OPT = \
75
  cli.cli_option("--dest-ca-file", action="store", type="string",
76
                 dest="dest_ca_file",
77
                 help=("File containing destination cluster Certificate"
78
                       " Authority (CA) in PEM format (defaults to source"
79
                       " cluster CA)"))
80

  
81
DEST_USERNAME_OPT = \
82
  cli.cli_option("--dest-username", action="store", type="string",
83
                 dest="dest_username", default=None,
84
                 help=("Destination cluster username (defaults to"
85
                       " source cluster username)"))
86

  
87
DEST_PASSWORD_FILE_OPT = \
88
  cli.cli_option("--dest-password-file", action="store", type="string",
89
                 dest="dest_password_file",
90
                 help=("File containing destination cluster password"
91
                       " (defaults to source cluster password)"))
92

  
93
DEST_INSTANCE_NAME_OPT = \
94
  cli.cli_option("--dest-instance-name", action="store", type="string",
95
                 dest="dest_instance_name",
96
                 help=("Instance name on destination cluster (only"
97
                       " when moving exactly one instance)"))
98

  
99
DEST_PRIMARY_NODE_OPT = \
100
  cli.cli_option("--dest-primary-node", action="store", type="string",
101
                 dest="dest_primary_node",
102
                 help=("Primary node on destination cluster (only"
103
                       " when moving exactly one instance)"))
104

  
105
DEST_SECONDARY_NODE_OPT = \
106
  cli.cli_option("--dest-secondary-node", action="store", type="string",
107
                 dest="dest_secondary_node",
108
                 help=("Secondary node on destination cluster (only"
109
                       " when moving exactly one instance)"))
110

  
111
PARALLEL_OPT = \
112
  cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
113
                 dest="parallel", metavar="<number>",
114
                 help="Number of instances to be moved simultaneously")
115

  
116

  
117
class Error(Exception):
118
  """Generic error.
119

  
120
  """
121

  
122

  
123
class Abort(Error):
124
  """Special exception for aborting import/export.
125

  
126
  """
127

  
128

  
129
class RapiClientFactory:
130
  """Factory class for creating RAPI clients.
131

  
132
  @ivar src_cluster_name: Source cluster name
133
  @ivar dest_cluster_name: Destination cluster name
134
  @ivar GetSourceClient: Callable returning new client for source cluster
135
  @ivar GetDestClient: Callable returning new client for destination cluster
136

  
137
  """
138
  def __init__(self, options, src_cluster_name, dest_cluster_name):
139
    """Initializes this class.
140

  
141
    @param options: Program options
142
    @type src_cluster_name: string
143
    @param src_cluster_name: Source cluster name
144
    @type dest_cluster_name: string
145
    @param dest_cluster_name: Destination cluster name
146

  
147
    """
148
    self.src_cluster_name = src_cluster_name
149
    self.dest_cluster_name = dest_cluster_name
150

  
151
    # TODO: Support for using system default paths for verifying SSL certificate
152
    # (already implemented in CertAuthorityVerify)
153
    logging.debug("Using '%s' as source CA", options.src_ca_file)
154
    src_ssl_config = rapi.client.CertAuthorityVerify(cafile=options.src_ca_file)
155

  
156
    if options.dest_ca_file:
157
      logging.debug("Using '%s' as destination CA", options.dest_ca_file)
158
      dest_ssl_config = \
159
        rapi.client.CertAuthorityVerify(cafile=options.dest_ca_file)
160
    else:
161
      logging.debug("Using source CA for destination")
162
      dest_ssl_config = src_ssl_config
163

  
164
    logging.debug("Source RAPI server is %s:%s",
165
                  src_cluster_name, options.src_rapi_port)
166
    logging.debug("Source username is '%s'", options.src_username)
167

  
168
    if options.src_username is None:
169
      src_username = ""
170
    else:
171
      src_username = options.src_username
172

  
173
    if options.src_password_file:
174
      logging.debug("Reading '%s' for source password",
175
                    options.src_password_file)
176
      src_password = utils.ReadOneLineFile(options.src_password_file,
177
                                           strict=True)
178
    else:
179
      logging.debug("Source has no password")
180
      src_password = None
181

  
182
    self.GetSourceClient = lambda: \
183
      rapi.client.GanetiRapiClient(src_cluster_name,
184
                                   port=options.src_rapi_port,
185
                                   config_ssl_verification=src_ssl_config,
186
                                   username=src_username,
187
                                   password=src_password)
188

  
189
    if options.dest_rapi_port:
190
      dest_rapi_port = options.dest_rapi_port
191
    else:
192
      dest_rapi_port = options.src_rapi_port
193

  
194
    if options.dest_username is None:
195
      dest_username = src_username
196
    else:
197
      dest_username = options.dest_username
198

  
199
    logging.debug("Destination RAPI server is %s:%s",
200
                  dest_cluster_name, dest_rapi_port)
201
    logging.debug("Destination username is '%s'", dest_username)
202

  
203
    if options.dest_password_file:
204
      logging.debug("Reading '%s' for destination password",
205
                    options.dest_password_file)
206
      dest_password = utils.ReadOneLineFile(options.dest_password_file,
207
                                            strict=True)
208
    else:
209
      logging.debug("Using source password for destination")
210
      dest_password = src_password
211

  
212
    self.GetDestClient = lambda: \
213
      rapi.client.GanetiRapiClient(dest_cluster_name,
214
                                   port=dest_rapi_port,
215
                                   config_ssl_verification=dest_ssl_config,
216
                                   username=dest_username,
217
                                   password=dest_password)
218

  
219

  
220
class MoveJobPollReportCb(cli.JobPollReportCbBase):
221
  def __init__(self, abort_check_fn, remote_import_fn):
222
    """Initializes this class.
223

  
224
    @type abort_check_fn: callable
225
    @param abort_check_fn: Function to check whether move is aborted
226
    @type remote_import_fn: callable or None
227
    @param remote_import_fn: Callback for reporting received remote import
228
                             information
229

  
230
    """
231
    cli.JobPollReportCbBase.__init__(self)
232
    self._abort_check_fn = abort_check_fn
233
    self._remote_import_fn = remote_import_fn
234

  
235
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
236
    """Handles a log message.
237

  
238
    """
239
    if log_type == constants.ELOG_REMOTE_IMPORT:
240
      logging.debug("Received remote import information")
241

  
242
      if not self._remote_import_fn:
243
        raise RuntimeError("Received unexpected remote import information")
244

  
245
      assert "x509_ca" in log_msg
246
      assert "disks" in log_msg
247

  
248
      self._remote_import_fn(log_msg)
249

  
250
      return
251

  
252
    logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
253
                 utils.SafeEncode(log_msg))
254

  
255
  def ReportNotChanged(self, job_id, status):
256
    """Called if a job hasn't changed in a while.
257

  
258
    """
259
    try:
260
      # Check whether we were told to abort by the other thread
261
      self._abort_check_fn()
262
    except Abort:
263
      logging.warning("Aborting despite job %s still running", job_id)
264
      raise
265

  
266

  
267
class InstanceMove(object):
268
  """Status class for instance moves.
269

  
270
  """
271
  def __init__(self, src_instance_name, dest_instance_name,
272
               dest_pnode, dest_snode, dest_iallocator):
273
    """Initializes this class.
274

  
275
    @type src_instance_name: string
276
    @param src_instance_name: Instance name on source cluster
277
    @type dest_instance_name: string
278
    @param dest_instance_name: Instance name on destination cluster
279
    @type dest_pnode: string or None
280
    @param dest_pnode: Name of primary node on destination cluster
281
    @type dest_snode: string or None
282
    @param dest_snode: Name of secondary node on destination cluster
283
    @type dest_iallocator: string or None
284
    @param dest_iallocator: Name of iallocator to use
285

  
286
    """
287
    self.src_instance_name = src_instance_name
288
    self.dest_instance_name = dest_instance_name
289
    self.dest_pnode = dest_pnode
290
    self.dest_snode = dest_snode
291
    self.dest_iallocator = dest_iallocator
292

  
293
    self.success = None
294
    self.error_message = None
295

  
296

  
297
class MoveRuntime(object):
298
  """Class to keep track of instance move.
299

  
300
  """
301
  def __init__(self, move):
302
    """Initializes this class.
303

  
304
    @type move: L{InstanceMove}
305

  
306
    """
307
    self.move = move
308

  
309
    # Thread synchronization
310
    self.lock = threading.Lock()
311
    self.source_to_dest = threading.Condition(self.lock)
312
    self.dest_to_source = threading.Condition(self.lock)
313

  
314
    # Set when threads should abort
315
    self.abort = None
316

  
317
    # Source information
318
    self.src_success = None
319
    self.src_error_message = None
320
    self.src_expinfo = None
321
    self.src_instinfo = None
322

  
323
    # Destination information
324
    self.dest_success = None
325
    self.dest_error_message = None
326
    self.dest_impinfo = None
327

  
328
  def HandleErrors(self, prefix, fn, *args):
329
    """Wrapper to catch errors and abort threads.
330

  
331
    @type prefix: string
332
    @param prefix: Variable name prefix ("src" or "dest")
333
    @type fn: callable
334
    @param fn: Function
335

  
336
    """
337
    assert prefix in ("dest", "src")
338

  
339
    try:
340
      # Call inner function
341
      fn(*args)
342

  
343
      success = True
344
      errmsg = None
345
    except Abort:
346
      success = False
347
      errmsg = "Aborted"
348
    except Exception, err:
349
      logging.exception("Caught unhandled exception")
350
      success = False
351
      errmsg = str(err)
352

  
353
    self.lock.acquire()
354
    try:
355
      # Tell all threads to abort
356
      self.abort = True
357
      self.source_to_dest.notifyAll()
358
      self.dest_to_source.notifyAll()
359
    finally:
360
      self.lock.release()
361

  
362
    setattr(self, "%s_success" % prefix, success)
363
    setattr(self, "%s_error_message" % prefix, errmsg)
364

  
365
  def CheckAbort(self):
366
    """Check whether thread should be aborted.
367

  
368
    @raise Abort: When thread should be aborted
369

  
370
    """
371
    if self.abort:
372
      logging.info("Aborting")
373
      raise Abort()
374

  
375
  def Wait(self, cond, check_fn):
376
    """Waits for a condition to become true.
377

  
378
    @type cond: threading.Condition
379
    @param cond: Threading condition
380
    @type check_fn: callable
381
    @param check_fn: Function to check whether condition is true
382

  
383
    """
384
    cond.acquire()
385
    try:
386
      while check_fn(self):
387
        self.CheckAbort()
388
        cond.wait()
389
    finally:
390
      cond.release()
391

  
392
  def PollJob(self, cl, job_id, remote_import_fn=None):
393
    """Wrapper for polling a job.
394

  
395
    @type cl: L{rapi.client.GanetiRapiClient}
396
    @param cl: RAPI client
397
    @type job_id: string
398
    @param job_id: Job ID
399
    @type remote_import_fn: callable or None
400
    @param remote_import_fn: Callback for reporting received remote import
401
                             information
402

  
403
    """
404
    return rapi.client_utils.PollJob(cl, job_id,
405
                                     MoveJobPollReportCb(self.CheckAbort,
406
                                                         remote_import_fn))
407

  
408

  
409
class MoveDestExecutor(object):
410
  def __init__(self, dest_client, mrt):
411
    """Destination side of an instance move.
412

  
413
    @type dest_client: L{rapi.client.GanetiRapiClient}
414
    @param dest_client: RAPI client
415
    @type mrt: L{MoveRuntime}
416
    @param mrt: Instance move runtime information
417

  
418
    """
419
    logging.debug("Waiting for instance information to become available")
420
    mrt.Wait(mrt.source_to_dest,
421
             lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
422

  
423
    logging.info("Creating instance %s in remote-import mode",
424
                 mrt.move.dest_instance_name)
425
    job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
426
                                  mrt.move.dest_pnode, mrt.move.dest_snode,
427
                                  mrt.move.dest_iallocator,
428
                                  mrt.src_instinfo, mrt.src_expinfo)
429
    mrt.PollJob(dest_client, job_id,
430
                remote_import_fn=compat.partial(self._SetImportInfo, mrt))
431

  
432
    logging.info("Import successful")
433

  
434
  @staticmethod
435
  def _SetImportInfo(mrt, impinfo):
436
    """Sets the remote import information and notifies source thread.
437

  
438
    @type mrt: L{MoveRuntime}
439
    @param mrt: Instance move runtime information
440
    @param impinfo: Remote import information
441

  
442
    """
443
    mrt.dest_to_source.acquire()
444
    try:
445
      mrt.dest_impinfo = impinfo
446
      mrt.dest_to_source.notifyAll()
447
    finally:
448
      mrt.dest_to_source.release()
449

  
450
  @staticmethod
451
  def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
452
    """Starts the instance creation in remote import mode.
453

  
454
    @type cl: L{rapi.client.GanetiRapiClient}
455
    @param cl: RAPI client
456
    @type name: string
457
    @param name: Instance name
458
    @type pnode: string or None
459
    @param pnode: Name of primary node on destination cluster
460
    @type snode: string or None
461
    @param snode: Name of secondary node on destination cluster
462
    @type iallocator: string or None
463
    @param iallocator: Name of iallocator to use
464
    @type instance: dict
465
    @param instance: Instance details from source cluster
466
    @type expinfo: dict
467
    @param expinfo: Prepared export information from source cluster
468
    @return: Job ID
469

  
470
    """
471
    disk_template = instance["disk_template"]
472

  
473
    disks = [{
474
      "size": i["size"],
475
      "mode": i["mode"],
476
      } for i in instance["disks"]]
477

  
478
    nics = [{
479
      "ip": ip,
480
      "mac": mac,
481
      "mode": mode,
482
      "link": link,
483
      } for ip, mac, mode, link in instance["nics"]]
484

  
485
    # TODO: Should this be the actual up/down status? (run_state)
486
    start = (instance["config_state"] == "up")
487

  
488
    assert len(disks) == len(instance["disks"])
489
    assert len(nics) == len(instance["nics"])
490

  
491
    return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
492
                             name, disk_template, disks, nics,
493
                             os=instance["os"],
494
                             pnode=pnode,
495
                             snode=snode,
496
                             start=start,
497
                             ip_check=False,
498
                             iallocator=iallocator,
499
                             hypervisor=instance["hypervisor"],
500
                             source_handshake=expinfo["handshake"],
501
                             source_x509_ca=expinfo["x509_ca"],
502
                             source_instance_name=instance["name"],
503
                             beparams=instance["be_instance"],
504
                             hvparams=instance["hv_instance"])
505

  
506

  
507
class MoveSourceExecutor(object):
508
  def __init__(self, src_client, mrt):
509
    """Source side of an instance move.
510

  
511
    @type src_client: L{rapi.client.GanetiRapiClient}
512
    @param src_client: RAPI client
513
    @type mrt: L{MoveRuntime}
514
    @param mrt: Instance move runtime information
515

  
516
    """
517
    logging.info("Checking whether instance exists")
518
    self._CheckInstance(src_client, mrt.move.src_instance_name)
519

  
520
    logging.info("Retrieving instance information from source cluster")
521
    instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
522
                                     mrt.move.src_instance_name)
523

  
524
    logging.info("Preparing export on source cluster")
525
    expinfo = self._PrepareExport(src_client, mrt.PollJob,
526
                                  mrt.move.src_instance_name)
527
    assert "handshake" in expinfo
528
    assert "x509_key_name" in expinfo
529
    assert "x509_ca" in expinfo
530

  
531
    # Hand information to destination thread
532
    mrt.source_to_dest.acquire()
533
    try:
534
      mrt.src_instinfo = instinfo
535
      mrt.src_expinfo = expinfo
536
      mrt.source_to_dest.notifyAll()
537
    finally:
538
      mrt.source_to_dest.release()
539

  
540
    logging.info("Waiting for destination information to become available")
541
    mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
542

  
543
    logging.info("Starting remote export on source cluster")
544
    self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
545
                         expinfo["x509_key_name"], mrt.dest_impinfo)
546

  
547
    logging.info("Export successful")
548

  
549
  @staticmethod
550
  def _CheckInstance(cl, name):
551
    """Checks whether the instance exists on the source cluster.
552

  
553
    @type cl: L{rapi.client.GanetiRapiClient}
554
    @param cl: RAPI client
555
    @type name: string
556
    @param name: Instance name
557

  
558
    """
559
    try:
560
      cl.GetInstance(name)
561
    except rapi.client.GanetiApiError, err:
562
      if err.code == rapi.client.HTTP_NOT_FOUND:
563
        raise Error("Instance %s not found (%s)" % (name, str(err)))
564
      raise
565

  
566
  @staticmethod
567
  def _GetInstanceInfo(cl, poll_job_fn, name):
568
    """Retrieves detailed instance information from source cluster.
569

  
570
    @type cl: L{rapi.client.GanetiRapiClient}
571
    @param cl: RAPI client
572
    @type poll_job_fn: callable
573
    @param poll_job_fn: Function to poll for job result
574
    @type name: string
575
    @param name: Instance name
576

  
577
    """
578
    job_id = cl.GetInstanceInfo(name, static=True)
579
    result = poll_job_fn(cl, job_id)
580
    assert len(result[0].keys()) == 1
581
    return result[0][result[0].keys()[0]]
582

  
583
  @staticmethod
584
  def _PrepareExport(cl, poll_job_fn, name):
585
    """Prepares export on source cluster.
586

  
587
    @type cl: L{rapi.client.GanetiRapiClient}
588
    @param cl: RAPI client
589
    @type poll_job_fn: callable
590
    @param poll_job_fn: Function to poll for job result
591
    @type name: string
592
    @param name: Instance name
593

  
594
    """
595
    job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
596
    return poll_job_fn(cl, job_id)[0]
597

  
598
  @staticmethod
599
  def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
600
    """Exports instance from source cluster.
601

  
602
    @type cl: L{rapi.client.GanetiRapiClient}
603
    @param cl: RAPI client
604
    @type poll_job_fn: callable
605
    @param poll_job_fn: Function to poll for job result
606
    @type name: string
607
    @param name: Instance name
608
    @param x509_key_name: Source X509 key
609
    @param impinfo: Import information from destination cluster
610

  
611
    """
612
    job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
613
                               impinfo["disks"], shutdown=True,
614
                               remove_instance=True,
615
                               x509_key_name=x509_key_name,
616
                               destination_x509_ca=impinfo["x509_ca"])
617
    (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
618

  
619
    if not (fin_resu and compat.all(dresults)):
620
      raise Error("Export failed for disks %s" %
621
                  utils.CommaJoin(str(idx) for idx, result
622
                                  in enumerate(dresults) if not result))
623

  
624

  
625
class MoveSourceWorker(workerpool.BaseWorker):
626
  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
627
    """Executes an instance move.
628

  
629
    @type rapi_factory: L{RapiClientFactory}
630
    @param rapi_factory: RAPI client factory
631
    @type move: L{InstanceMove}
632
    @param move: Instance move information
633

  
634
    """
635
    try:
636
      logging.info("Preparing to move %s from cluster %s to %s as %s",
637
                   move.src_instance_name, rapi_factory.src_cluster_name,
638
                   rapi_factory.dest_cluster_name, move.dest_instance_name)
639

  
640
      mrt = MoveRuntime(move)
641

  
642
      logging.debug("Starting destination thread")
643
      dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
644
                                     target=mrt.HandleErrors,
645
                                     args=("dest", MoveDestExecutor,
646
                                           rapi_factory.GetDestClient(),
647
                                           mrt, ))
648
      dest_thread.start()
649
      try:
650
        mrt.HandleErrors("src", MoveSourceExecutor,
651
                         rapi_factory.GetSourceClient(), mrt)
652
      finally:
653
        dest_thread.join()
654

  
655
      move.success = (mrt.src_success and mrt.dest_success)
656
      if mrt.src_error_message or mrt.dest_error_message:
657
        move.error_message = ("Source error: %s, destination error: %s" %
658
                              (mrt.src_error_message, mrt.dest_error_message))
659
      else:
660
        move.error_message = None
661
    except Exception, err: # pylint: disable-msg=W0703
662
      logging.exception("Caught unhandled exception")
663
      move.success = False
664
      move.error_message = str(err)
665

  
666

  
667
def CheckRapiSetup(rapi_factory):
668
  """Checks the RAPI setup by retrieving the version.
669

  
670
  @type rapi_factory: L{RapiClientFactory}
671
  @param rapi_factory: RAPI client factory
672

  
673
  """
674
  src_client = rapi_factory.GetSourceClient()
675
  logging.info("Connecting to source RAPI server")
676
  logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
677

  
678
  dest_client = rapi_factory.GetDestClient()
679
  logging.info("Connecting to destination RAPI server")
680
  logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
681

  
682

  
683
def SetupLogging(options):
684
  """Setting up logging infrastructure.
685

  
686
  @param options: Parsed command line options
687

  
688
  """
689
  fmt = "%(asctime)s: %(threadName)s "
690
  if options.debug or options.verbose:
691
    fmt += "%(levelname)s "
692
  fmt += "%(message)s"
693

  
694
  formatter = logging.Formatter(fmt)
695

  
696
  stderr_handler = logging.StreamHandler()
697
  stderr_handler.setFormatter(formatter)
698
  if options.debug:
699
    stderr_handler.setLevel(logging.NOTSET)
700
  elif options.verbose:
701
    stderr_handler.setLevel(logging.INFO)
702
  else:
703
    stderr_handler.setLevel(logging.ERROR)
704

  
705
  root_logger = logging.getLogger("")
706
  root_logger.setLevel(logging.NOTSET)
707
  root_logger.addHandler(stderr_handler)
708

  
709

  
710
def ParseOptions():
711
  """Parses options passed to program.
712

  
713
  """
714
  program = os.path.basename(sys.argv[0])
715

  
716
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
717
                                        " <source-cluster> <dest-cluster>"
718
                                        " <instance...>"),
719
                                 prog=program)
720
  parser.add_option(cli.DEBUG_OPT)
721
  parser.add_option(cli.VERBOSE_OPT)
722
  parser.add_option(cli.IALLOCATOR_OPT)
723
  parser.add_option(SRC_RAPI_PORT_OPT)
724
  parser.add_option(SRC_CA_FILE_OPT)
725
  parser.add_option(SRC_USERNAME_OPT)
726
  parser.add_option(SRC_PASSWORD_FILE_OPT)
727
  parser.add_option(DEST_RAPI_PORT_OPT)
728
  parser.add_option(DEST_CA_FILE_OPT)
729
  parser.add_option(DEST_USERNAME_OPT)
730
  parser.add_option(DEST_PASSWORD_FILE_OPT)
731
  parser.add_option(DEST_INSTANCE_NAME_OPT)
732
  parser.add_option(DEST_PRIMARY_NODE_OPT)
733
  parser.add_option(DEST_SECONDARY_NODE_OPT)
734
  parser.add_option(PARALLEL_OPT)
735

  
736
  (options, args) = parser.parse_args()
737

  
738
  return (parser, options, args)
739

  
740

  
741
def CheckOptions(parser, options, args):
742
  """Checks options and arguments for validity.
743

  
744
  """
745
  if len(args) < 3:
746
    parser.error("Not enough arguments")
747

  
748
  src_cluster_name = args.pop(0)
749
  dest_cluster_name = args.pop(0)
750
  instance_names = args
751

  
752
  assert len(instance_names) > 0
753

  
754
  # TODO: Remove once using system default paths for SSL certificate
755
  # verification is implemented
756
  if not options.src_ca_file:
757
    parser.error("Missing source cluster CA file")
758

  
759
  if options.parallel < 1:
760
    parser.error("Number of simultaneous moves must be >= 1")
761

  
762
  if not (bool(options.iallocator) ^
763
          bool(options.dest_primary_node or options.dest_secondary_node)):
764
    parser.error("Destination node and iallocator options exclude each other")
765

  
766
  if len(instance_names) == 1:
767
    # Moving one instance only
768
    if not (options.iallocator or
769
            options.dest_primary_node or
770
            options.dest_secondary_node):
771
      parser.error("An iallocator or the destination node is required")
772
  else:
773
    # Moving more than one instance
774
    if (options.dest_instance_name or options.dest_primary_node or
775
        options.dest_secondary_node):
776
      parser.error("The options --dest-instance-name, --dest-primary-node and"
777
                   " --dest-secondary-node can only be used when moving exactly"
778
                   " one instance")
779

  
780
    if not options.iallocator:
781
      parser.error("An iallocator must be specified for moving more than one"
782
                   " instance")
783

  
784
  return (src_cluster_name, dest_cluster_name, instance_names)
785

  
786

  
787
def main():
788
  """Main routine.
789

  
790
  """
791
  (parser, options, args) = ParseOptions()
792

  
793
  SetupLogging(options)
794

  
795
  (src_cluster_name, dest_cluster_name, instance_names) = \
796
    CheckOptions(parser, options, args)
797

  
798
  logging.info("Source cluster: %s", src_cluster_name)
799
  logging.info("Destination cluster: %s", dest_cluster_name)
800
  logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
801

  
802
  rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
803

  
804
  CheckRapiSetup(rapi_factory)
805

  
806
  assert (len(instance_names) == 1 or
807
          not (options.dest_primary_node or options.dest_secondary_node))
808
  assert len(instance_names) == 1 or options.iallocator
809
  assert (len(instance_names) > 1 or options.iallocator or
810
          options.dest_primary_node or options.dest_secondary_node)
811

  
812
  # Prepare list of instance moves
813
  moves = []
814
  for src_instance_name in instance_names:
815
    if options.dest_instance_name:
816
      assert len(instance_names) == 1
817
      # Rename instance
818
      dest_instance_name = options.dest_instance_name
819
    else:
820
      dest_instance_name = src_instance_name
821

  
822
    moves.append(InstanceMove(src_instance_name, dest_instance_name,
823
                              options.dest_primary_node,
824
                              options.dest_secondary_node,
825
                              options.iallocator))
826

  
827
  assert len(moves) == len(instance_names)
828

  
829
  # Start workerpool
830
  wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
831
  try:
832
    # Add instance moves to workerpool
833
    for move in moves:
834
      wp.AddTask(rapi_factory, move)
835

  
836
    # Wait for all moves to finish
837
    wp.Quiesce()
838

  
839
  finally:
840
    wp.TerminateWorkers()
841

  
842
  # There should be no threads running at this point, hence not using locks
843
  # anymore
844

  
845
  logging.info("Instance move results:")
846

  
847
  for move in moves:
848
    if move.dest_instance_name == move.src_instance_name:
849
      name = move.src_instance_name
850
    else:
851
      name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
852

  
853
    if move.success and not move.error_message:
854
      msg = "Success"
855
    else:
856
      msg = "Failed (%s)" % move.error_message
857

  
858
    logging.info("%s: %s", name, msg)
859

  
860
  if compat.all(move.success for move in moves):
861
    sys.exit(constants.EXIT_SUCCESS)
862

  
863
  sys.exit(constants.EXIT_FAILURE)
864

  
865

  
866
if __name__ == "__main__":
867
  main()

Also available in: Unified diff