|
1 |
#!/usr/bin/python
|
|
2 |
#
|
|
3 |
|
|
4 |
# Copyright (C) 2010 Google Inc.
|
|
5 |
#
|
|
6 |
# This program is free software; you can redistribute it and/or modify
|
|
7 |
# it under the terms of the GNU General Public License as published by
|
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
|
9 |
# (at your option) any later version.
|
|
10 |
#
|
|
11 |
# This program is distributed in the hope that it will be useful, but
|
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
14 |
# General Public License for more details.
|
|
15 |
#
|
|
16 |
# You should have received a copy of the GNU General Public License
|
|
17 |
# along with this program; if not, write to the Free Software
|
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
19 |
# 02110-1301, USA.
|
|
20 |
|
|
21 |
"""Tool to move instances from one cluster to another.
|
|
22 |
|
|
23 |
"""
|
|
24 |
|
|
25 |
# pylint: disable-msg=C0103
|
|
26 |
# C0103: Invalid name move-instance
|
|
27 |
|
|
28 |
import os
|
|
29 |
import sys
|
|
30 |
import time
|
|
31 |
import logging
|
|
32 |
import optparse
|
|
33 |
import threading
|
|
34 |
|
|
35 |
from ganeti import cli
|
|
36 |
from ganeti import constants
|
|
37 |
from ganeti import utils
|
|
38 |
from ganeti import workerpool
|
|
39 |
from ganeti import compat
|
|
40 |
from ganeti import rapi
|
|
41 |
|
|
42 |
import ganeti.rapi.client # pylint: disable-msg=W0611
|
|
43 |
import ganeti.rapi.client_utils
|
|
44 |
|
|
45 |
|
|
46 |
SRC_RAPI_PORT_OPT = \
|
|
47 |
cli.cli_option("--src-rapi-port", action="store", type="int",
|
|
48 |
dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
|
|
49 |
help=("Source cluster RAPI port (defaults to %s)" %
|
|
50 |
constants.DEFAULT_RAPI_PORT))
|
|
51 |
|
|
52 |
SRC_CA_FILE_OPT = \
|
|
53 |
cli.cli_option("--src-ca-file", action="store", type="string",
|
|
54 |
dest="src_ca_file",
|
|
55 |
help=("File containing source cluster Certificate"
|
|
56 |
" Authority (CA) in PEM format"))
|
|
57 |
|
|
58 |
SRC_USERNAME_OPT = \
|
|
59 |
cli.cli_option("--src-username", action="store", type="string",
|
|
60 |
dest="src_username", default=None,
|
|
61 |
help="Source cluster username")
|
|
62 |
|
|
63 |
SRC_PASSWORD_FILE_OPT = \
|
|
64 |
cli.cli_option("--src-password-file", action="store", type="string",
|
|
65 |
dest="src_password_file",
|
|
66 |
help="File containing source cluster password")
|
|
67 |
|
|
68 |
DEST_RAPI_PORT_OPT = \
|
|
69 |
cli.cli_option("--dest-rapi-port", action="store", type="int",
|
|
70 |
dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
|
|
71 |
help=("Destination cluster RAPI port (defaults to source"
|
|
72 |
" cluster RAPI port)"))
|
|
73 |
|
|
74 |
DEST_CA_FILE_OPT = \
|
|
75 |
cli.cli_option("--dest-ca-file", action="store", type="string",
|
|
76 |
dest="dest_ca_file",
|
|
77 |
help=("File containing destination cluster Certificate"
|
|
78 |
" Authority (CA) in PEM format (defaults to source"
|
|
79 |
" cluster CA)"))
|
|
80 |
|
|
81 |
DEST_USERNAME_OPT = \
|
|
82 |
cli.cli_option("--dest-username", action="store", type="string",
|
|
83 |
dest="dest_username", default=None,
|
|
84 |
help=("Destination cluster username (defaults to"
|
|
85 |
" source cluster username)"))
|
|
86 |
|
|
87 |
DEST_PASSWORD_FILE_OPT = \
|
|
88 |
cli.cli_option("--dest-password-file", action="store", type="string",
|
|
89 |
dest="dest_password_file",
|
|
90 |
help=("File containing destination cluster password"
|
|
91 |
" (defaults to source cluster password)"))
|
|
92 |
|
|
93 |
DEST_INSTANCE_NAME_OPT = \
|
|
94 |
cli.cli_option("--dest-instance-name", action="store", type="string",
|
|
95 |
dest="dest_instance_name",
|
|
96 |
help=("Instance name on destination cluster (only"
|
|
97 |
" when moving exactly one instance)"))
|
|
98 |
|
|
99 |
DEST_PRIMARY_NODE_OPT = \
|
|
100 |
cli.cli_option("--dest-primary-node", action="store", type="string",
|
|
101 |
dest="dest_primary_node",
|
|
102 |
help=("Primary node on destination cluster (only"
|
|
103 |
" when moving exactly one instance)"))
|
|
104 |
|
|
105 |
DEST_SECONDARY_NODE_OPT = \
|
|
106 |
cli.cli_option("--dest-secondary-node", action="store", type="string",
|
|
107 |
dest="dest_secondary_node",
|
|
108 |
help=("Secondary node on destination cluster (only"
|
|
109 |
" when moving exactly one instance)"))
|
|
110 |
|
|
111 |
PARALLEL_OPT = \
|
|
112 |
cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
|
|
113 |
dest="parallel", metavar="<number>",
|
|
114 |
help="Number of instances to be moved simultaneously")
|
|
115 |
|
|
116 |
|
|
117 |
class Error(Exception):
|
|
118 |
"""Generic error.
|
|
119 |
|
|
120 |
"""
|
|
121 |
|
|
122 |
|
|
123 |
class Abort(Error):
|
|
124 |
"""Special exception for aborting import/export.
|
|
125 |
|
|
126 |
"""
|
|
127 |
|
|
128 |
|
|
129 |
class RapiClientFactory:
|
|
130 |
"""Factory class for creating RAPI clients.
|
|
131 |
|
|
132 |
@ivar src_cluster_name: Source cluster name
|
|
133 |
@ivar dest_cluster_name: Destination cluster name
|
|
134 |
@ivar GetSourceClient: Callable returning new client for source cluster
|
|
135 |
@ivar GetDestClient: Callable returning new client for destination cluster
|
|
136 |
|
|
137 |
"""
|
|
138 |
def __init__(self, options, src_cluster_name, dest_cluster_name):
|
|
139 |
"""Initializes this class.
|
|
140 |
|
|
141 |
@param options: Program options
|
|
142 |
@type src_cluster_name: string
|
|
143 |
@param src_cluster_name: Source cluster name
|
|
144 |
@type dest_cluster_name: string
|
|
145 |
@param dest_cluster_name: Destination cluster name
|
|
146 |
|
|
147 |
"""
|
|
148 |
self.src_cluster_name = src_cluster_name
|
|
149 |
self.dest_cluster_name = dest_cluster_name
|
|
150 |
|
|
151 |
# TODO: Support for using system default paths for verifying SSL certificate
|
|
152 |
# (already implemented in CertAuthorityVerify)
|
|
153 |
logging.debug("Using '%s' as source CA", options.src_ca_file)
|
|
154 |
src_ssl_config = rapi.client.CertAuthorityVerify(cafile=options.src_ca_file)
|
|
155 |
|
|
156 |
if options.dest_ca_file:
|
|
157 |
logging.debug("Using '%s' as destination CA", options.dest_ca_file)
|
|
158 |
dest_ssl_config = \
|
|
159 |
rapi.client.CertAuthorityVerify(cafile=options.dest_ca_file)
|
|
160 |
else:
|
|
161 |
logging.debug("Using source CA for destination")
|
|
162 |
dest_ssl_config = src_ssl_config
|
|
163 |
|
|
164 |
logging.debug("Source RAPI server is %s:%s",
|
|
165 |
src_cluster_name, options.src_rapi_port)
|
|
166 |
logging.debug("Source username is '%s'", options.src_username)
|
|
167 |
|
|
168 |
if options.src_username is None:
|
|
169 |
src_username = ""
|
|
170 |
else:
|
|
171 |
src_username = options.src_username
|
|
172 |
|
|
173 |
if options.src_password_file:
|
|
174 |
logging.debug("Reading '%s' for source password",
|
|
175 |
options.src_password_file)
|
|
176 |
src_password = utils.ReadOneLineFile(options.src_password_file,
|
|
177 |
strict=True)
|
|
178 |
else:
|
|
179 |
logging.debug("Source has no password")
|
|
180 |
src_password = None
|
|
181 |
|
|
182 |
self.GetSourceClient = lambda: \
|
|
183 |
rapi.client.GanetiRapiClient(src_cluster_name,
|
|
184 |
port=options.src_rapi_port,
|
|
185 |
config_ssl_verification=src_ssl_config,
|
|
186 |
username=src_username,
|
|
187 |
password=src_password)
|
|
188 |
|
|
189 |
if options.dest_rapi_port:
|
|
190 |
dest_rapi_port = options.dest_rapi_port
|
|
191 |
else:
|
|
192 |
dest_rapi_port = options.src_rapi_port
|
|
193 |
|
|
194 |
if options.dest_username is None:
|
|
195 |
dest_username = src_username
|
|
196 |
else:
|
|
197 |
dest_username = options.dest_username
|
|
198 |
|
|
199 |
logging.debug("Destination RAPI server is %s:%s",
|
|
200 |
dest_cluster_name, dest_rapi_port)
|
|
201 |
logging.debug("Destination username is '%s'", dest_username)
|
|
202 |
|
|
203 |
if options.dest_password_file:
|
|
204 |
logging.debug("Reading '%s' for destination password",
|
|
205 |
options.dest_password_file)
|
|
206 |
dest_password = utils.ReadOneLineFile(options.dest_password_file,
|
|
207 |
strict=True)
|
|
208 |
else:
|
|
209 |
logging.debug("Using source password for destination")
|
|
210 |
dest_password = src_password
|
|
211 |
|
|
212 |
self.GetDestClient = lambda: \
|
|
213 |
rapi.client.GanetiRapiClient(dest_cluster_name,
|
|
214 |
port=dest_rapi_port,
|
|
215 |
config_ssl_verification=dest_ssl_config,
|
|
216 |
username=dest_username,
|
|
217 |
password=dest_password)
|
|
218 |
|
|
219 |
|
|
220 |
class MoveJobPollReportCb(cli.JobPollReportCbBase):
|
|
221 |
def __init__(self, abort_check_fn, remote_import_fn):
|
|
222 |
"""Initializes this class.
|
|
223 |
|
|
224 |
@type abort_check_fn: callable
|
|
225 |
@param abort_check_fn: Function to check whether move is aborted
|
|
226 |
@type remote_import_fn: callable or None
|
|
227 |
@param remote_import_fn: Callback for reporting received remote import
|
|
228 |
information
|
|
229 |
|
|
230 |
"""
|
|
231 |
cli.JobPollReportCbBase.__init__(self)
|
|
232 |
self._abort_check_fn = abort_check_fn
|
|
233 |
self._remote_import_fn = remote_import_fn
|
|
234 |
|
|
235 |
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
|
|
236 |
"""Handles a log message.
|
|
237 |
|
|
238 |
"""
|
|
239 |
if log_type == constants.ELOG_REMOTE_IMPORT:
|
|
240 |
logging.debug("Received remote import information")
|
|
241 |
|
|
242 |
if not self._remote_import_fn:
|
|
243 |
raise RuntimeError("Received unexpected remote import information")
|
|
244 |
|
|
245 |
assert "x509_ca" in log_msg
|
|
246 |
assert "disks" in log_msg
|
|
247 |
|
|
248 |
self._remote_import_fn(log_msg)
|
|
249 |
|
|
250 |
return
|
|
251 |
|
|
252 |
logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
|
|
253 |
utils.SafeEncode(log_msg))
|
|
254 |
|
|
255 |
def ReportNotChanged(self, job_id, status):
|
|
256 |
"""Called if a job hasn't changed in a while.
|
|
257 |
|
|
258 |
"""
|
|
259 |
try:
|
|
260 |
# Check whether we were told to abort by the other thread
|
|
261 |
self._abort_check_fn()
|
|
262 |
except Abort:
|
|
263 |
logging.warning("Aborting despite job %s still running", job_id)
|
|
264 |
raise
|
|
265 |
|
|
266 |
|
|
267 |
class InstanceMove(object):
|
|
268 |
"""Status class for instance moves.
|
|
269 |
|
|
270 |
"""
|
|
271 |
def __init__(self, src_instance_name, dest_instance_name,
|
|
272 |
dest_pnode, dest_snode, dest_iallocator):
|
|
273 |
"""Initializes this class.
|
|
274 |
|
|
275 |
@type src_instance_name: string
|
|
276 |
@param src_instance_name: Instance name on source cluster
|
|
277 |
@type dest_instance_name: string
|
|
278 |
@param dest_instance_name: Instance name on destination cluster
|
|
279 |
@type dest_pnode: string or None
|
|
280 |
@param dest_pnode: Name of primary node on destination cluster
|
|
281 |
@type dest_snode: string or None
|
|
282 |
@param dest_snode: Name of secondary node on destination cluster
|
|
283 |
@type dest_iallocator: string or None
|
|
284 |
@param dest_iallocator: Name of iallocator to use
|
|
285 |
|
|
286 |
"""
|
|
287 |
self.src_instance_name = src_instance_name
|
|
288 |
self.dest_instance_name = dest_instance_name
|
|
289 |
self.dest_pnode = dest_pnode
|
|
290 |
self.dest_snode = dest_snode
|
|
291 |
self.dest_iallocator = dest_iallocator
|
|
292 |
|
|
293 |
self.success = None
|
|
294 |
self.error_message = None
|
|
295 |
|
|
296 |
|
|
297 |
class MoveRuntime(object):
|
|
298 |
"""Class to keep track of instance move.
|
|
299 |
|
|
300 |
"""
|
|
301 |
def __init__(self, move):
|
|
302 |
"""Initializes this class.
|
|
303 |
|
|
304 |
@type move: L{InstanceMove}
|
|
305 |
|
|
306 |
"""
|
|
307 |
self.move = move
|
|
308 |
|
|
309 |
# Thread synchronization
|
|
310 |
self.lock = threading.Lock()
|
|
311 |
self.source_to_dest = threading.Condition(self.lock)
|
|
312 |
self.dest_to_source = threading.Condition(self.lock)
|
|
313 |
|
|
314 |
# Set when threads should abort
|
|
315 |
self.abort = None
|
|
316 |
|
|
317 |
# Source information
|
|
318 |
self.src_success = None
|
|
319 |
self.src_error_message = None
|
|
320 |
self.src_expinfo = None
|
|
321 |
self.src_instinfo = None
|
|
322 |
|
|
323 |
# Destination information
|
|
324 |
self.dest_success = None
|
|
325 |
self.dest_error_message = None
|
|
326 |
self.dest_impinfo = None
|
|
327 |
|
|
328 |
def HandleErrors(self, prefix, fn, *args):
|
|
329 |
"""Wrapper to catch errors and abort threads.
|
|
330 |
|
|
331 |
@type prefix: string
|
|
332 |
@param prefix: Variable name prefix ("src" or "dest")
|
|
333 |
@type fn: callable
|
|
334 |
@param fn: Function
|
|
335 |
|
|
336 |
"""
|
|
337 |
assert prefix in ("dest", "src")
|
|
338 |
|
|
339 |
try:
|
|
340 |
# Call inner function
|
|
341 |
fn(*args)
|
|
342 |
|
|
343 |
success = True
|
|
344 |
errmsg = None
|
|
345 |
except Abort:
|
|
346 |
success = False
|
|
347 |
errmsg = "Aborted"
|
|
348 |
except Exception, err:
|
|
349 |
logging.exception("Caught unhandled exception")
|
|
350 |
success = False
|
|
351 |
errmsg = str(err)
|
|
352 |
|
|
353 |
self.lock.acquire()
|
|
354 |
try:
|
|
355 |
# Tell all threads to abort
|
|
356 |
self.abort = True
|
|
357 |
self.source_to_dest.notifyAll()
|
|
358 |
self.dest_to_source.notifyAll()
|
|
359 |
finally:
|
|
360 |
self.lock.release()
|
|
361 |
|
|
362 |
setattr(self, "%s_success" % prefix, success)
|
|
363 |
setattr(self, "%s_error_message" % prefix, errmsg)
|
|
364 |
|
|
365 |
def CheckAbort(self):
|
|
366 |
"""Check whether thread should be aborted.
|
|
367 |
|
|
368 |
@raise Abort: When thread should be aborted
|
|
369 |
|
|
370 |
"""
|
|
371 |
if self.abort:
|
|
372 |
logging.info("Aborting")
|
|
373 |
raise Abort()
|
|
374 |
|
|
375 |
def Wait(self, cond, check_fn):
|
|
376 |
"""Waits for a condition to become true.
|
|
377 |
|
|
378 |
@type cond: threading.Condition
|
|
379 |
@param cond: Threading condition
|
|
380 |
@type check_fn: callable
|
|
381 |
@param check_fn: Function to check whether condition is true
|
|
382 |
|
|
383 |
"""
|
|
384 |
cond.acquire()
|
|
385 |
try:
|
|
386 |
while check_fn(self):
|
|
387 |
self.CheckAbort()
|
|
388 |
cond.wait()
|
|
389 |
finally:
|
|
390 |
cond.release()
|
|
391 |
|
|
392 |
def PollJob(self, cl, job_id, remote_import_fn=None):
|
|
393 |
"""Wrapper for polling a job.
|
|
394 |
|
|
395 |
@type cl: L{rapi.client.GanetiRapiClient}
|
|
396 |
@param cl: RAPI client
|
|
397 |
@type job_id: string
|
|
398 |
@param job_id: Job ID
|
|
399 |
@type remote_import_fn: callable or None
|
|
400 |
@param remote_import_fn: Callback for reporting received remote import
|
|
401 |
information
|
|
402 |
|
|
403 |
"""
|
|
404 |
return rapi.client_utils.PollJob(cl, job_id,
|
|
405 |
MoveJobPollReportCb(self.CheckAbort,
|
|
406 |
remote_import_fn))
|
|
407 |
|
|
408 |
|
|
409 |
class MoveDestExecutor(object):
|
|
410 |
def __init__(self, dest_client, mrt):
|
|
411 |
"""Destination side of an instance move.
|
|
412 |
|
|
413 |
@type dest_client: L{rapi.client.GanetiRapiClient}
|
|
414 |
@param dest_client: RAPI client
|
|
415 |
@type mrt: L{MoveRuntime}
|
|
416 |
@param mrt: Instance move runtime information
|
|
417 |
|
|
418 |
"""
|
|
419 |
logging.debug("Waiting for instance information to become available")
|
|
420 |
mrt.Wait(mrt.source_to_dest,
|
|
421 |
lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
|
|
422 |
|
|
423 |
logging.info("Creating instance %s in remote-import mode",
|
|
424 |
mrt.move.dest_instance_name)
|
|
425 |
job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
|
|
426 |
mrt.move.dest_pnode, mrt.move.dest_snode,
|
|
427 |
mrt.move.dest_iallocator,
|
|
428 |
mrt.src_instinfo, mrt.src_expinfo)
|
|
429 |
mrt.PollJob(dest_client, job_id,
|
|
430 |
remote_import_fn=compat.partial(self._SetImportInfo, mrt))
|
|
431 |
|
|
432 |
logging.info("Import successful")
|
|
433 |
|
|
434 |
@staticmethod
|
|
435 |
def _SetImportInfo(mrt, impinfo):
|
|
436 |
"""Sets the remote import information and notifies source thread.
|
|
437 |
|
|
438 |
@type mrt: L{MoveRuntime}
|
|
439 |
@param mrt: Instance move runtime information
|
|
440 |
@param impinfo: Remote import information
|
|
441 |
|
|
442 |
"""
|
|
443 |
mrt.dest_to_source.acquire()
|
|
444 |
try:
|
|
445 |
mrt.dest_impinfo = impinfo
|
|
446 |
mrt.dest_to_source.notifyAll()
|
|
447 |
finally:
|
|
448 |
mrt.dest_to_source.release()
|
|
449 |
|
|
450 |
@staticmethod
|
|
451 |
def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
|
|
452 |
"""Starts the instance creation in remote import mode.
|
|
453 |
|
|
454 |
@type cl: L{rapi.client.GanetiRapiClient}
|
|
455 |
@param cl: RAPI client
|
|
456 |
@type name: string
|
|
457 |
@param name: Instance name
|
|
458 |
@type pnode: string or None
|
|
459 |
@param pnode: Name of primary node on destination cluster
|
|
460 |
@type snode: string or None
|
|
461 |
@param snode: Name of secondary node on destination cluster
|
|
462 |
@type iallocator: string or None
|
|
463 |
@param iallocator: Name of iallocator to use
|
|
464 |
@type instance: dict
|
|
465 |
@param instance: Instance details from source cluster
|
|
466 |
@type expinfo: dict
|
|
467 |
@param expinfo: Prepared export information from source cluster
|
|
468 |
@return: Job ID
|
|
469 |
|
|
470 |
"""
|
|
471 |
disk_template = instance["disk_template"]
|
|
472 |
|
|
473 |
disks = [{
|
|
474 |
"size": i["size"],
|
|
475 |
"mode": i["mode"],
|
|
476 |
} for i in instance["disks"]]
|
|
477 |
|
|
478 |
nics = [{
|
|
479 |
"ip": ip,
|
|
480 |
"mac": mac,
|
|
481 |
"mode": mode,
|
|
482 |
"link": link,
|
|
483 |
} for ip, mac, mode, link in instance["nics"]]
|
|
484 |
|
|
485 |
# TODO: Should this be the actual up/down status? (run_state)
|
|
486 |
start = (instance["config_state"] == "up")
|
|
487 |
|
|
488 |
assert len(disks) == len(instance["disks"])
|
|
489 |
assert len(nics) == len(instance["nics"])
|
|
490 |
|
|
491 |
return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
|
|
492 |
name, disk_template, disks, nics,
|
|
493 |
os=instance["os"],
|
|
494 |
pnode=pnode,
|
|
495 |
snode=snode,
|
|
496 |
start=start,
|
|
497 |
ip_check=False,
|
|
498 |
iallocator=iallocator,
|
|
499 |
hypervisor=instance["hypervisor"],
|
|
500 |
source_handshake=expinfo["handshake"],
|
|
501 |
source_x509_ca=expinfo["x509_ca"],
|
|
502 |
source_instance_name=instance["name"],
|
|
503 |
beparams=instance["be_instance"],
|
|
504 |
hvparams=instance["hv_instance"])
|
|
505 |
|
|
506 |
|
|
507 |
class MoveSourceExecutor(object):
|
|
508 |
def __init__(self, src_client, mrt):
|
|
509 |
"""Source side of an instance move.
|
|
510 |
|
|
511 |
@type src_client: L{rapi.client.GanetiRapiClient}
|
|
512 |
@param src_client: RAPI client
|
|
513 |
@type mrt: L{MoveRuntime}
|
|
514 |
@param mrt: Instance move runtime information
|
|
515 |
|
|
516 |
"""
|
|
517 |
logging.info("Checking whether instance exists")
|
|
518 |
self._CheckInstance(src_client, mrt.move.src_instance_name)
|
|
519 |
|
|
520 |
logging.info("Retrieving instance information from source cluster")
|
|
521 |
instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
|
|
522 |
mrt.move.src_instance_name)
|
|
523 |
|
|
524 |
logging.info("Preparing export on source cluster")
|
|
525 |
expinfo = self._PrepareExport(src_client, mrt.PollJob,
|
|
526 |
mrt.move.src_instance_name)
|
|
527 |
assert "handshake" in expinfo
|
|
528 |
assert "x509_key_name" in expinfo
|
|
529 |
assert "x509_ca" in expinfo
|
|
530 |
|
|
531 |
# Hand information to destination thread
|
|
532 |
mrt.source_to_dest.acquire()
|
|
533 |
try:
|
|
534 |
mrt.src_instinfo = instinfo
|
|
535 |
mrt.src_expinfo = expinfo
|
|
536 |
mrt.source_to_dest.notifyAll()
|
|
537 |
finally:
|
|
538 |
mrt.source_to_dest.release()
|
|
539 |
|
|
540 |
logging.info("Waiting for destination information to become available")
|
|
541 |
mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
|
|
542 |
|
|
543 |
logging.info("Starting remote export on source cluster")
|
|
544 |
self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
|
|
545 |
expinfo["x509_key_name"], mrt.dest_impinfo)
|
|
546 |
|
|
547 |
logging.info("Export successful")
|
|
548 |
|
|
549 |
@staticmethod
|
|
550 |
def _CheckInstance(cl, name):
|
|
551 |
"""Checks whether the instance exists on the source cluster.
|
|
552 |
|
|
553 |
@type cl: L{rapi.client.GanetiRapiClient}
|
|
554 |
@param cl: RAPI client
|
|
555 |
@type name: string
|
|
556 |
@param name: Instance name
|
|
557 |
|
|
558 |
"""
|
|
559 |
try:
|
|
560 |
cl.GetInstance(name)
|
|
561 |
except rapi.client.GanetiApiError, err:
|
|
562 |
if err.code == rapi.client.HTTP_NOT_FOUND:
|
|
563 |
raise Error("Instance %s not found (%s)" % (name, str(err)))
|
|
564 |
raise
|
|
565 |
|
|
566 |
@staticmethod
|
|
567 |
def _GetInstanceInfo(cl, poll_job_fn, name):
|
|
568 |
"""Retrieves detailed instance information from source cluster.
|
|
569 |
|
|
570 |
@type cl: L{rapi.client.GanetiRapiClient}
|
|
571 |
@param cl: RAPI client
|
|
572 |
@type poll_job_fn: callable
|
|
573 |
@param poll_job_fn: Function to poll for job result
|
|
574 |
@type name: string
|
|
575 |
@param name: Instance name
|
|
576 |
|
|
577 |
"""
|
|
578 |
job_id = cl.GetInstanceInfo(name, static=True)
|
|
579 |
result = poll_job_fn(cl, job_id)
|
|
580 |
assert len(result[0].keys()) == 1
|
|
581 |
return result[0][result[0].keys()[0]]
|
|
582 |
|
|
583 |
@staticmethod
|
|
584 |
def _PrepareExport(cl, poll_job_fn, name):
|
|
585 |
"""Prepares export on source cluster.
|
|
586 |
|
|
587 |
@type cl: L{rapi.client.GanetiRapiClient}
|
|
588 |
@param cl: RAPI client
|
|
589 |
@type poll_job_fn: callable
|
|
590 |
@param poll_job_fn: Function to poll for job result
|
|
591 |
@type name: string
|
|
592 |
@param name: Instance name
|
|
593 |
|
|
594 |
"""
|
|
595 |
job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
|
|
596 |
return poll_job_fn(cl, job_id)[0]
|
|
597 |
|
|
598 |
@staticmethod
|
|
599 |
def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
|
|
600 |
"""Exports instance from source cluster.
|
|
601 |
|
|
602 |
@type cl: L{rapi.client.GanetiRapiClient}
|
|
603 |
@param cl: RAPI client
|
|
604 |
@type poll_job_fn: callable
|
|
605 |
@param poll_job_fn: Function to poll for job result
|
|
606 |
@type name: string
|
|
607 |
@param name: Instance name
|
|
608 |
@param x509_key_name: Source X509 key
|
|
609 |
@param impinfo: Import information from destination cluster
|
|
610 |
|
|
611 |
"""
|
|
612 |
job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
|
|
613 |
impinfo["disks"], shutdown=True,
|
|
614 |
remove_instance=True,
|
|
615 |
x509_key_name=x509_key_name,
|
|
616 |
destination_x509_ca=impinfo["x509_ca"])
|
|
617 |
(fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
|
|
618 |
|
|
619 |
if not (fin_resu and compat.all(dresults)):
|
|
620 |
raise Error("Export failed for disks %s" %
|
|
621 |
utils.CommaJoin(str(idx) for idx, result
|
|
622 |
in enumerate(dresults) if not result))
|
|
623 |
|
|
624 |
|
|
625 |
class MoveSourceWorker(workerpool.BaseWorker):
|
|
626 |
def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
|
|
627 |
"""Executes an instance move.
|
|
628 |
|
|
629 |
@type rapi_factory: L{RapiClientFactory}
|
|
630 |
@param rapi_factory: RAPI client factory
|
|
631 |
@type move: L{InstanceMove}
|
|
632 |
@param move: Instance move information
|
|
633 |
|
|
634 |
"""
|
|
635 |
try:
|
|
636 |
logging.info("Preparing to move %s from cluster %s to %s as %s",
|
|
637 |
move.src_instance_name, rapi_factory.src_cluster_name,
|
|
638 |
rapi_factory.dest_cluster_name, move.dest_instance_name)
|
|
639 |
|
|
640 |
mrt = MoveRuntime(move)
|
|
641 |
|
|
642 |
logging.debug("Starting destination thread")
|
|
643 |
dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
|
|
644 |
target=mrt.HandleErrors,
|
|
645 |
args=("dest", MoveDestExecutor,
|
|
646 |
rapi_factory.GetDestClient(),
|
|
647 |
mrt, ))
|
|
648 |
dest_thread.start()
|
|
649 |
try:
|
|
650 |
mrt.HandleErrors("src", MoveSourceExecutor,
|
|
651 |
rapi_factory.GetSourceClient(), mrt)
|
|
652 |
finally:
|
|
653 |
dest_thread.join()
|
|
654 |
|
|
655 |
move.success = (mrt.src_success and mrt.dest_success)
|
|
656 |
if mrt.src_error_message or mrt.dest_error_message:
|
|
657 |
move.error_message = ("Source error: %s, destination error: %s" %
|
|
658 |
(mrt.src_error_message, mrt.dest_error_message))
|
|
659 |
else:
|
|
660 |
move.error_message = None
|
|
661 |
except Exception, err: # pylint: disable-msg=W0703
|
|
662 |
logging.exception("Caught unhandled exception")
|
|
663 |
move.success = False
|
|
664 |
move.error_message = str(err)
|
|
665 |
|
|
666 |
|
|
667 |
def CheckRapiSetup(rapi_factory):
|
|
668 |
"""Checks the RAPI setup by retrieving the version.
|
|
669 |
|
|
670 |
@type rapi_factory: L{RapiClientFactory}
|
|
671 |
@param rapi_factory: RAPI client factory
|
|
672 |
|
|
673 |
"""
|
|
674 |
src_client = rapi_factory.GetSourceClient()
|
|
675 |
logging.info("Connecting to source RAPI server")
|
|
676 |
logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
|
|
677 |
|
|
678 |
dest_client = rapi_factory.GetDestClient()
|
|
679 |
logging.info("Connecting to destination RAPI server")
|
|
680 |
logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
|
|
681 |
|
|
682 |
|
|
683 |
def SetupLogging(options):
|
|
684 |
"""Setting up logging infrastructure.
|
|
685 |
|
|
686 |
@param options: Parsed command line options
|
|
687 |
|
|
688 |
"""
|
|
689 |
fmt = "%(asctime)s: %(threadName)s "
|
|
690 |
if options.debug or options.verbose:
|
|
691 |
fmt += "%(levelname)s "
|
|
692 |
fmt += "%(message)s"
|
|
693 |
|
|
694 |
formatter = logging.Formatter(fmt)
|
|
695 |
|
|
696 |
stderr_handler = logging.StreamHandler()
|
|
697 |
stderr_handler.setFormatter(formatter)
|
|
698 |
if options.debug:
|
|
699 |
stderr_handler.setLevel(logging.NOTSET)
|
|
700 |
elif options.verbose:
|
|
701 |
stderr_handler.setLevel(logging.INFO)
|
|
702 |
else:
|
|
703 |
stderr_handler.setLevel(logging.ERROR)
|
|
704 |
|
|
705 |
root_logger = logging.getLogger("")
|
|
706 |
root_logger.setLevel(logging.NOTSET)
|
|
707 |
root_logger.addHandler(stderr_handler)
|
|
708 |
|
|
709 |
|
|
710 |
def ParseOptions():
|
|
711 |
"""Parses options passed to program.
|
|
712 |
|
|
713 |
"""
|
|
714 |
program = os.path.basename(sys.argv[0])
|
|
715 |
|
|
716 |
parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
|
|
717 |
" <source-cluster> <dest-cluster>"
|
|
718 |
" <instance...>"),
|
|
719 |
prog=program)
|
|
720 |
parser.add_option(cli.DEBUG_OPT)
|
|
721 |
parser.add_option(cli.VERBOSE_OPT)
|
|
722 |
parser.add_option(cli.IALLOCATOR_OPT)
|
|
723 |
parser.add_option(SRC_RAPI_PORT_OPT)
|
|
724 |
parser.add_option(SRC_CA_FILE_OPT)
|
|
725 |
parser.add_option(SRC_USERNAME_OPT)
|
|
726 |
parser.add_option(SRC_PASSWORD_FILE_OPT)
|
|
727 |
parser.add_option(DEST_RAPI_PORT_OPT)
|
|
728 |
parser.add_option(DEST_CA_FILE_OPT)
|
|
729 |
parser.add_option(DEST_USERNAME_OPT)
|
|
730 |
parser.add_option(DEST_PASSWORD_FILE_OPT)
|
|
731 |
parser.add_option(DEST_INSTANCE_NAME_OPT)
|
|
732 |
parser.add_option(DEST_PRIMARY_NODE_OPT)
|
|
733 |
parser.add_option(DEST_SECONDARY_NODE_OPT)
|
|
734 |
parser.add_option(PARALLEL_OPT)
|
|
735 |
|
|
736 |
(options, args) = parser.parse_args()
|
|
737 |
|
|
738 |
return (parser, options, args)
|
|
739 |
|
|
740 |
|
|
741 |
def CheckOptions(parser, options, args):
|
|
742 |
"""Checks options and arguments for validity.
|
|
743 |
|
|
744 |
"""
|
|
745 |
if len(args) < 3:
|
|
746 |
parser.error("Not enough arguments")
|
|
747 |
|
|
748 |
src_cluster_name = args.pop(0)
|
|
749 |
dest_cluster_name = args.pop(0)
|
|
750 |
instance_names = args
|
|
751 |
|
|
752 |
assert len(instance_names) > 0
|
|
753 |
|
|
754 |
# TODO: Remove once using system default paths for SSL certificate
|
|
755 |
# verification is implemented
|
|
756 |
if not options.src_ca_file:
|
|
757 |
parser.error("Missing source cluster CA file")
|
|
758 |
|
|
759 |
if options.parallel < 1:
|
|
760 |
parser.error("Number of simultaneous moves must be >= 1")
|
|
761 |
|
|
762 |
if not (bool(options.iallocator) ^
|
|
763 |
bool(options.dest_primary_node or options.dest_secondary_node)):
|
|
764 |
parser.error("Destination node and iallocator options exclude each other")
|
|
765 |
|
|
766 |
if len(instance_names) == 1:
|
|
767 |
# Moving one instance only
|
|
768 |
if not (options.iallocator or
|
|
769 |
options.dest_primary_node or
|
|
770 |
options.dest_secondary_node):
|
|
771 |
parser.error("An iallocator or the destination node is required")
|
|
772 |
else:
|
|
773 |
# Moving more than one instance
|
|
774 |
if (options.dest_instance_name or options.dest_primary_node or
|
|
775 |
options.dest_secondary_node):
|
|
776 |
parser.error("The options --dest-instance-name, --dest-primary-node and"
|
|
777 |
" --dest-secondary-node can only be used when moving exactly"
|
|
778 |
" one instance")
|
|
779 |
|
|
780 |
if not options.iallocator:
|
|
781 |
parser.error("An iallocator must be specified for moving more than one"
|
|
782 |
" instance")
|
|
783 |
|
|
784 |
return (src_cluster_name, dest_cluster_name, instance_names)
|
|
785 |
|
|
786 |
|
|
787 |
def main():
|
|
788 |
"""Main routine.
|
|
789 |
|
|
790 |
"""
|
|
791 |
(parser, options, args) = ParseOptions()
|
|
792 |
|
|
793 |
SetupLogging(options)
|
|
794 |
|
|
795 |
(src_cluster_name, dest_cluster_name, instance_names) = \
|
|
796 |
CheckOptions(parser, options, args)
|
|
797 |
|
|
798 |
logging.info("Source cluster: %s", src_cluster_name)
|
|
799 |
logging.info("Destination cluster: %s", dest_cluster_name)
|
|
800 |
logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
|
|
801 |
|
|
802 |
rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
|
|
803 |
|
|
804 |
CheckRapiSetup(rapi_factory)
|
|
805 |
|
|
806 |
assert (len(instance_names) == 1 or
|
|
807 |
not (options.dest_primary_node or options.dest_secondary_node))
|
|
808 |
assert len(instance_names) == 1 or options.iallocator
|
|
809 |
assert (len(instance_names) > 1 or options.iallocator or
|
|
810 |
options.dest_primary_node or options.dest_secondary_node)
|
|
811 |
|
|
812 |
# Prepare list of instance moves
|
|
813 |
moves = []
|
|
814 |
for src_instance_name in instance_names:
|
|
815 |
if options.dest_instance_name:
|
|
816 |
assert len(instance_names) == 1
|
|
817 |
# Rename instance
|
|
818 |
dest_instance_name = options.dest_instance_name
|
|
819 |
else:
|
|
820 |
dest_instance_name = src_instance_name
|
|
821 |
|
|
822 |
moves.append(InstanceMove(src_instance_name, dest_instance_name,
|
|
823 |
options.dest_primary_node,
|
|
824 |
options.dest_secondary_node,
|
|
825 |
options.iallocator))
|
|
826 |
|
|
827 |
assert len(moves) == len(instance_names)
|
|
828 |
|
|
829 |
# Start workerpool
|
|
830 |
wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
|
|
831 |
try:
|
|
832 |
# Add instance moves to workerpool
|
|
833 |
for move in moves:
|
|
834 |
wp.AddTask(rapi_factory, move)
|
|
835 |
|
|
836 |
# Wait for all moves to finish
|
|
837 |
wp.Quiesce()
|
|
838 |
|
|
839 |
finally:
|
|
840 |
wp.TerminateWorkers()
|
|
841 |
|
|
842 |
# There should be no threads running at this point, hence not using locks
|
|
843 |
# anymore
|
|
844 |
|
|
845 |
logging.info("Instance move results:")
|
|
846 |
|
|
847 |
for move in moves:
|
|
848 |
if move.dest_instance_name == move.src_instance_name:
|
|
849 |
name = move.src_instance_name
|
|
850 |
else:
|
|
851 |
name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
|
|
852 |
|
|
853 |
if move.success and not move.error_message:
|
|
854 |
msg = "Success"
|
|
855 |
else:
|
|
856 |
msg = "Failed (%s)" % move.error_message
|
|
857 |
|
|
858 |
logging.info("%s: %s", name, msg)
|
|
859 |
|
|
860 |
if compat.all(move.success for move in moves):
|
|
861 |
sys.exit(constants.EXIT_SUCCESS)
|
|
862 |
|
|
863 |
sys.exit(constants.EXIT_FAILURE)
|
|
864 |
|
|
865 |
|
|
866 |
if __name__ == "__main__":
|
|
867 |
main()
|