root / tools / move-instance @ a396b2d6
History | View | Annotate | Download (36.6 kB)
1 |
#!/usr/bin/python |
---|---|
2 |
# |
3 |
|
4 |
# Copyright (C) 2010, 2011, 2012 Google Inc. |
5 |
# |
6 |
# This program is free software; you can redistribute it and/or modify |
7 |
# it under the terms of the GNU General Public License as published by |
8 |
# the Free Software Foundation; either version 2 of the License, or |
9 |
# (at your option) any later version. |
10 |
# |
11 |
# This program is distributed in the hope that it will be useful, but |
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 |
# General Public License for more details. |
15 |
# |
16 |
# You should have received a copy of the GNU General Public License |
17 |
# along with this program; if not, write to the Free Software |
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 |
# 02110-1301, USA. |
20 |
|
21 |
"""Tool to move instances from one cluster to another. |
22 |
|
23 |
""" |
24 |
|
25 |
# pylint: disable=C0103 |
26 |
# C0103: Invalid name move-instance |
27 |
|
28 |
import os |
29 |
import sys |
30 |
import time |
31 |
import logging |
32 |
import optparse |
33 |
import threading |
34 |
|
35 |
from ganeti import cli |
36 |
from ganeti import constants |
37 |
from ganeti import utils |
38 |
from ganeti import workerpool |
39 |
from ganeti import objects |
40 |
from ganeti import compat |
41 |
from ganeti import rapi |
42 |
from ganeti import errors |
43 |
|
44 |
import ganeti.rapi.client # pylint: disable=W0611 |
45 |
import ganeti.rapi.client_utils |
46 |
from ganeti.rapi.client import UsesRapiClient |
47 |
|
48 |
|
49 |
SRC_RAPI_PORT_OPT = \ |
50 |
cli.cli_option("--src-rapi-port", action="store", type="int", |
51 |
dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT, |
52 |
help=("Source cluster RAPI port (defaults to %s)" % |
53 |
constants.DEFAULT_RAPI_PORT)) |
54 |
|
55 |
SRC_CA_FILE_OPT = \ |
56 |
cli.cli_option("--src-ca-file", action="store", type="string", |
57 |
dest="src_ca_file", |
58 |
help=("File containing source cluster Certificate" |
59 |
" Authority (CA) in PEM format")) |
60 |
|
61 |
SRC_USERNAME_OPT = \ |
62 |
cli.cli_option("--src-username", action="store", type="string", |
63 |
dest="src_username", default=None, |
64 |
help="Source cluster username") |
65 |
|
66 |
SRC_PASSWORD_FILE_OPT = \ |
67 |
cli.cli_option("--src-password-file", action="store", type="string", |
68 |
dest="src_password_file", |
69 |
help="File containing source cluster password") |
70 |
|
71 |
DEST_RAPI_PORT_OPT = \ |
72 |
cli.cli_option("--dest-rapi-port", action="store", type="int", |
73 |
dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT, |
74 |
help=("Destination cluster RAPI port (defaults to source" |
75 |
" cluster RAPI port)")) |
76 |
|
77 |
DEST_CA_FILE_OPT = \ |
78 |
cli.cli_option("--dest-ca-file", action="store", type="string", |
79 |
dest="dest_ca_file", |
80 |
help=("File containing destination cluster Certificate" |
81 |
" Authority (CA) in PEM format (defaults to source" |
82 |
" cluster CA)")) |
83 |
|
84 |
DEST_USERNAME_OPT = \ |
85 |
cli.cli_option("--dest-username", action="store", type="string", |
86 |
dest="dest_username", default=None, |
87 |
help=("Destination cluster username (defaults to" |
88 |
" source cluster username)")) |
89 |
|
90 |
DEST_PASSWORD_FILE_OPT = \ |
91 |
cli.cli_option("--dest-password-file", action="store", type="string", |
92 |
dest="dest_password_file", |
93 |
help=("File containing destination cluster password" |
94 |
" (defaults to source cluster password)")) |
95 |
|
96 |
DEST_INSTANCE_NAME_OPT = \ |
97 |
cli.cli_option("--dest-instance-name", action="store", type="string", |
98 |
dest="dest_instance_name", |
99 |
help=("Instance name on destination cluster (only" |
100 |
" when moving exactly one instance)")) |
101 |
|
102 |
DEST_PRIMARY_NODE_OPT = \ |
103 |
cli.cli_option("--dest-primary-node", action="store", type="string", |
104 |
dest="dest_primary_node", |
105 |
help=("Primary node on destination cluster (only" |
106 |
" when moving exactly one instance)")) |
107 |
|
108 |
DEST_SECONDARY_NODE_OPT = \ |
109 |
cli.cli_option("--dest-secondary-node", action="store", type="string", |
110 |
dest="dest_secondary_node", |
111 |
help=("Secondary node on destination cluster (only" |
112 |
" when moving exactly one instance)")) |
113 |
|
114 |
DEST_DISK_TEMPLATE_OPT = \ |
115 |
cli.cli_option("--dest-disk-template", action="store", type="string", |
116 |
dest="dest_disk_template", default=None, |
117 |
help="Disk template to use on destination cluster") |
118 |
|
119 |
COMPRESS_OPT = \ |
120 |
cli.cli_option("--compress", action="store", type="string", |
121 |
dest="compress", default="none", |
122 |
help="Compression mode to use during the move (this mode has" |
123 |
" to be supported by both clusters)") |
124 |
|
125 |
PARALLEL_OPT = \ |
126 |
cli.cli_option("-p", "--parallel", action="store", type="int", default=1, |
127 |
dest="parallel", metavar="<number>", |
128 |
help="Number of instances to be moved simultaneously") |
129 |
|
130 |
OPPORTUNISTIC_TRIES_OPT = \ |
131 |
cli.cli_option("--opportunistic-tries", action="store", type="int", |
132 |
dest="opportunistic_tries", metavar="<number>", |
133 |
help="Number of opportunistic instance creation attempts" |
134 |
" before a normal creation is performed. An opportunistic" |
135 |
" attempt will use the iallocator with all the nodes" |
136 |
" currently unlocked, failing if not enough nodes are" |
137 |
" available. Even though it will succeed (or fail) more" |
138 |
" quickly, it can result in suboptimal instance" |
139 |
" placement") |
140 |
|
141 |
OPPORTUNISTIC_DELAY_OPT = \ |
142 |
cli.cli_option("--opportunistic-delay", action="store", type="int", |
143 |
dest="opportunistic_delay", metavar="<number>", |
144 |
help="The delay between successive opportunistic instance" |
145 |
" creation attempts, in seconds") |
146 |
|
147 |
|
148 |
class Error(Exception): |
149 |
"""Generic error. |
150 |
|
151 |
""" |
152 |
|
153 |
|
154 |
class Abort(Error): |
155 |
"""Special exception for aborting import/export. |
156 |
|
157 |
""" |
158 |
|
159 |
|
160 |
class RapiClientFactory: |
161 |
"""Factory class for creating RAPI clients. |
162 |
|
163 |
@ivar src_cluster_name: Source cluster name |
164 |
@ivar dest_cluster_name: Destination cluster name |
165 |
@ivar GetSourceClient: Callable returning new client for source cluster |
166 |
@ivar GetDestClient: Callable returning new client for destination cluster |
167 |
|
168 |
""" |
169 |
def __init__(self, options, src_cluster_name, dest_cluster_name): |
170 |
"""Initializes this class. |
171 |
|
172 |
@param options: Program options |
173 |
@type src_cluster_name: string |
174 |
@param src_cluster_name: Source cluster name |
175 |
@type dest_cluster_name: string |
176 |
@param dest_cluster_name: Destination cluster name |
177 |
|
178 |
""" |
179 |
self.src_cluster_name = src_cluster_name |
180 |
self.dest_cluster_name = dest_cluster_name |
181 |
|
182 |
# TODO: Implement timeouts for RAPI connections |
183 |
# TODO: Support for using system default paths for verifying SSL certificate |
184 |
logging.debug("Using '%s' as source CA", options.src_ca_file) |
185 |
src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file) |
186 |
|
187 |
if options.dest_ca_file: |
188 |
logging.debug("Using '%s' as destination CA", options.dest_ca_file) |
189 |
dest_curl_config = \ |
190 |
rapi.client.GenericCurlConfig(cafile=options.dest_ca_file) |
191 |
else: |
192 |
logging.debug("Using source CA for destination") |
193 |
dest_curl_config = src_curl_config |
194 |
|
195 |
logging.debug("Source RAPI server is %s:%s", |
196 |
src_cluster_name, options.src_rapi_port) |
197 |
logging.debug("Source username is '%s'", options.src_username) |
198 |
|
199 |
if options.src_username is None: |
200 |
src_username = "" |
201 |
else: |
202 |
src_username = options.src_username |
203 |
|
204 |
if options.src_password_file: |
205 |
logging.debug("Reading '%s' for source password", |
206 |
options.src_password_file) |
207 |
src_password = utils.ReadOneLineFile(options.src_password_file, |
208 |
strict=True) |
209 |
else: |
210 |
logging.debug("Source has no password") |
211 |
src_password = None |
212 |
|
213 |
self.GetSourceClient = lambda: \ |
214 |
rapi.client.GanetiRapiClient(src_cluster_name, |
215 |
port=options.src_rapi_port, |
216 |
curl_config_fn=src_curl_config, |
217 |
username=src_username, |
218 |
password=src_password) |
219 |
|
220 |
if options.dest_rapi_port: |
221 |
dest_rapi_port = options.dest_rapi_port |
222 |
else: |
223 |
dest_rapi_port = options.src_rapi_port |
224 |
|
225 |
if options.dest_username is None: |
226 |
dest_username = src_username |
227 |
else: |
228 |
dest_username = options.dest_username |
229 |
|
230 |
logging.debug("Destination RAPI server is %s:%s", |
231 |
dest_cluster_name, dest_rapi_port) |
232 |
logging.debug("Destination username is '%s'", dest_username) |
233 |
|
234 |
if options.dest_password_file: |
235 |
logging.debug("Reading '%s' for destination password", |
236 |
options.dest_password_file) |
237 |
dest_password = utils.ReadOneLineFile(options.dest_password_file, |
238 |
strict=True) |
239 |
else: |
240 |
logging.debug("Using source password for destination") |
241 |
dest_password = src_password |
242 |
|
243 |
self.GetDestClient = lambda: \ |
244 |
rapi.client.GanetiRapiClient(dest_cluster_name, |
245 |
port=dest_rapi_port, |
246 |
curl_config_fn=dest_curl_config, |
247 |
username=dest_username, |
248 |
password=dest_password) |
249 |
|
250 |
|
251 |
class MoveJobPollReportCb(cli.JobPollReportCbBase): |
252 |
def __init__(self, abort_check_fn, remote_import_fn): |
253 |
"""Initializes this class. |
254 |
|
255 |
@type abort_check_fn: callable |
256 |
@param abort_check_fn: Function to check whether move is aborted |
257 |
@type remote_import_fn: callable or None |
258 |
@param remote_import_fn: Callback for reporting received remote import |
259 |
information |
260 |
|
261 |
""" |
262 |
cli.JobPollReportCbBase.__init__(self) |
263 |
self._abort_check_fn = abort_check_fn |
264 |
self._remote_import_fn = remote_import_fn |
265 |
|
266 |
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
267 |
"""Handles a log message. |
268 |
|
269 |
""" |
270 |
if log_type == constants.ELOG_REMOTE_IMPORT: |
271 |
logging.debug("Received remote import information") |
272 |
|
273 |
if not self._remote_import_fn: |
274 |
raise RuntimeError("Received unexpected remote import information") |
275 |
|
276 |
assert "x509_ca" in log_msg |
277 |
assert "disks" in log_msg |
278 |
|
279 |
self._remote_import_fn(log_msg) |
280 |
|
281 |
return |
282 |
|
283 |
logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)), |
284 |
cli.FormatLogMessage(log_type, log_msg)) |
285 |
|
286 |
def ReportNotChanged(self, job_id, status): |
287 |
"""Called if a job hasn't changed in a while. |
288 |
|
289 |
""" |
290 |
try: |
291 |
# Check whether we were told to abort by the other thread |
292 |
self._abort_check_fn() |
293 |
except Abort: |
294 |
logging.warning("Aborting despite job %s still running", job_id) |
295 |
raise |
296 |
|
297 |
|
298 |
class InstanceMove(object): |
299 |
"""Status class for instance moves. |
300 |
|
301 |
""" |
302 |
def __init__(self, src_instance_name, dest_instance_name, |
303 |
dest_pnode, dest_snode, compress, dest_iallocator, |
304 |
dest_disk_template, hvparams, |
305 |
beparams, osparams, nics, opportunistic_tries, |
306 |
opportunistic_delay): |
307 |
"""Initializes this class. |
308 |
|
309 |
@type src_instance_name: string |
310 |
@param src_instance_name: Instance name on source cluster |
311 |
@type dest_instance_name: string |
312 |
@param dest_instance_name: Instance name on destination cluster |
313 |
@type dest_pnode: string or None |
314 |
@param dest_pnode: Name of primary node on destination cluster |
315 |
@type dest_snode: string or None |
316 |
@param dest_snode: Name of secondary node on destination cluster |
317 |
@type compress; string |
318 |
@param compress: Compression mode to use (has to be supported on both |
319 |
clusters) |
320 |
@type dest_iallocator: string or None |
321 |
@param dest_iallocator: Name of iallocator to use |
322 |
@type dest_disk_template: string or None |
323 |
@param dest_disk_template: Disk template to use instead of the original one |
324 |
@type hvparams: dict or None |
325 |
@param hvparams: Hypervisor parameters to override |
326 |
@type beparams: dict or None |
327 |
@param beparams: Backend parameters to override |
328 |
@type osparams: dict or None |
329 |
@param osparams: OS parameters to override |
330 |
@type nics: dict or None |
331 |
@param nics: NICs to override |
332 |
@type opportunistic_tries: int or None |
333 |
@param opportunistic_tries: Number of opportunistic creation attempts to |
334 |
perform |
335 |
@type opportunistic_delay: int or None |
336 |
@param opportunistic_delay: Delay between successive creation attempts, in |
337 |
seconds |
338 |
|
339 |
""" |
340 |
self.src_instance_name = src_instance_name |
341 |
self.dest_instance_name = dest_instance_name |
342 |
self.dest_pnode = dest_pnode |
343 |
self.dest_snode = dest_snode |
344 |
self.compress = compress |
345 |
self.dest_iallocator = dest_iallocator |
346 |
self.dest_disk_template = dest_disk_template |
347 |
self.hvparams = hvparams |
348 |
self.beparams = beparams |
349 |
self.osparams = osparams |
350 |
self.nics = nics |
351 |
|
352 |
if opportunistic_tries is not None: |
353 |
self.opportunistic_tries = opportunistic_tries |
354 |
else: |
355 |
self.opportunistic_tries = 0 |
356 |
|
357 |
if opportunistic_delay is not None: |
358 |
self.opportunistic_delay = opportunistic_delay |
359 |
else: |
360 |
self.opportunistic_delay = constants.DEFAULT_OPPORTUNISTIC_RETRY_INTERVAL |
361 |
|
362 |
self.error_message = None |
363 |
|
364 |
|
365 |
class MoveRuntime(object): |
366 |
"""Class to keep track of instance move. |
367 |
|
368 |
""" |
369 |
def __init__(self, move): |
370 |
"""Initializes this class. |
371 |
|
372 |
@type move: L{InstanceMove} |
373 |
|
374 |
""" |
375 |
self.move = move |
376 |
|
377 |
# Thread synchronization |
378 |
self.lock = threading.Lock() |
379 |
self.source_to_dest = threading.Condition(self.lock) |
380 |
self.dest_to_source = threading.Condition(self.lock) |
381 |
|
382 |
# Source information |
383 |
self.src_error_message = None |
384 |
self.src_expinfo = None |
385 |
self.src_instinfo = None |
386 |
|
387 |
# Destination information |
388 |
self.dest_error_message = None |
389 |
self.dest_impinfo = None |
390 |
|
391 |
def HandleErrors(self, prefix, fn, *args): |
392 |
"""Wrapper to catch errors and abort threads. |
393 |
|
394 |
@type prefix: string |
395 |
@param prefix: Variable name prefix ("src" or "dest") |
396 |
@type fn: callable |
397 |
@param fn: Function |
398 |
|
399 |
""" |
400 |
assert prefix in ("dest", "src") |
401 |
|
402 |
try: |
403 |
# Call inner function |
404 |
fn(*args) |
405 |
|
406 |
errmsg = None |
407 |
except Abort: |
408 |
errmsg = "Aborted" |
409 |
except Exception, err: |
410 |
logging.exception("Caught unhandled exception") |
411 |
errmsg = str(err) |
412 |
|
413 |
setattr(self, "%s_error_message" % prefix, errmsg) |
414 |
|
415 |
self.lock.acquire() |
416 |
try: |
417 |
self.source_to_dest.notifyAll() |
418 |
self.dest_to_source.notifyAll() |
419 |
finally: |
420 |
self.lock.release() |
421 |
|
422 |
def CheckAbort(self): |
423 |
"""Check whether thread should be aborted. |
424 |
|
425 |
@raise Abort: When thread should be aborted |
426 |
|
427 |
""" |
428 |
if not (self.src_error_message is None and |
429 |
self.dest_error_message is None): |
430 |
logging.info("Aborting") |
431 |
raise Abort() |
432 |
|
433 |
def Wait(self, cond, check_fn): |
434 |
"""Waits for a condition to become true. |
435 |
|
436 |
@type cond: threading.Condition |
437 |
@param cond: Threading condition |
438 |
@type check_fn: callable |
439 |
@param check_fn: Function to check whether condition is true |
440 |
|
441 |
""" |
442 |
cond.acquire() |
443 |
try: |
444 |
while check_fn(self): |
445 |
self.CheckAbort() |
446 |
cond.wait() |
447 |
finally: |
448 |
cond.release() |
449 |
|
450 |
def PollJob(self, cl, job_id, remote_import_fn=None): |
451 |
"""Wrapper for polling a job. |
452 |
|
453 |
@type cl: L{rapi.client.GanetiRapiClient} |
454 |
@param cl: RAPI client |
455 |
@type job_id: string |
456 |
@param job_id: Job ID |
457 |
@type remote_import_fn: callable or None |
458 |
@param remote_import_fn: Callback for reporting received remote import |
459 |
information |
460 |
|
461 |
""" |
462 |
return rapi.client_utils.PollJob(cl, job_id, |
463 |
MoveJobPollReportCb(self.CheckAbort, |
464 |
remote_import_fn)) |
465 |
|
466 |
|
467 |
class MoveDestExecutor(object): |
468 |
def __init__(self, dest_client, mrt): |
469 |
"""Destination side of an instance move. |
470 |
|
471 |
@type dest_client: L{rapi.client.GanetiRapiClient} |
472 |
@param dest_client: RAPI client |
473 |
@type mrt: L{MoveRuntime} |
474 |
@param mrt: Instance move runtime information |
475 |
|
476 |
""" |
477 |
logging.debug("Waiting for instance information to become available") |
478 |
mrt.Wait(mrt.source_to_dest, |
479 |
lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None) |
480 |
|
481 |
logging.info("Creating instance %s in remote-import mode", |
482 |
mrt.move.dest_instance_name) |
483 |
|
484 |
# Depending on whether opportunistic tries are enabled, we may have to |
485 |
# make multiple creation attempts |
486 |
creation_attempts = [True] * mrt.move.opportunistic_tries |
487 |
|
488 |
# But the last one is never opportunistic, and will block until completion |
489 |
# or failure |
490 |
creation_attempts.append(False) |
491 |
|
492 |
for is_attempt_opportunistic in creation_attempts: |
493 |
job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name, |
494 |
mrt.move.dest_pnode, mrt.move.dest_snode, |
495 |
mrt.move.compress, |
496 |
mrt.move.dest_iallocator, |
497 |
mrt.move.dest_disk_template, |
498 |
mrt.src_instinfo, mrt.src_expinfo, |
499 |
mrt.move.hvparams, mrt.move.beparams, |
500 |
mrt.move.beparams, mrt.move.nics, |
501 |
is_attempt_opportunistic |
502 |
) |
503 |
|
504 |
try: |
505 |
# The completion of this block signifies that the import has been |
506 |
# completed successfullly |
507 |
mrt.PollJob(dest_client, job_id, |
508 |
remote_import_fn=compat.partial(self._SetImportInfo, mrt)) |
509 |
logging.info("Import successful") |
510 |
return |
511 |
except errors.OpPrereqError, err: |
512 |
# Any exception in the non-opportunistic creation is to be passed on, |
513 |
# as well as exceptions apart from resources temporarily unavailable |
514 |
if not is_attempt_opportunistic or \ |
515 |
err.args[1] != rapi.client.ECODE_TEMP_NORES: |
516 |
raise |
517 |
|
518 |
logging.info("Opportunistic attempt unsuccessful, waiting %d seconds" |
519 |
" before another creation attempt is made", |
520 |
mrt.move.opportunistic_delay) |
521 |
time.sleep(mrt.move.opportunistic_delay) |
522 |
|
523 |
@staticmethod |
524 |
def _SetImportInfo(mrt, impinfo): |
525 |
"""Sets the remote import information and notifies source thread. |
526 |
|
527 |
@type mrt: L{MoveRuntime} |
528 |
@param mrt: Instance move runtime information |
529 |
@param impinfo: Remote import information |
530 |
|
531 |
""" |
532 |
mrt.dest_to_source.acquire() |
533 |
try: |
534 |
mrt.dest_impinfo = impinfo |
535 |
mrt.dest_to_source.notifyAll() |
536 |
finally: |
537 |
mrt.dest_to_source.release() |
538 |
|
539 |
@staticmethod |
540 |
def _CreateInstance(cl, name, pnode, snode, compress, iallocator, |
541 |
dest_disk_template, instance, expinfo, override_hvparams, |
542 |
override_beparams, override_osparams, override_nics, |
543 |
is_attempt_opportunistic): |
544 |
"""Starts the instance creation in remote import mode. |
545 |
|
546 |
@type cl: L{rapi.client.GanetiRapiClient} |
547 |
@param cl: RAPI client |
548 |
@type name: string |
549 |
@param name: Instance name |
550 |
@type pnode: string or None |
551 |
@param pnode: Name of primary node on destination cluster |
552 |
@type snode: string or None |
553 |
@param snode: Name of secondary node on destination cluster |
554 |
@type compress: string |
555 |
@param compress: Compression mode to use |
556 |
@type iallocator: string or None |
557 |
@param iallocator: Name of iallocator to use |
558 |
@type dest_disk_template: string or None |
559 |
@param dest_disk_template: Disk template to use instead of the original one |
560 |
@type instance: dict |
561 |
@param instance: Instance details from source cluster |
562 |
@type expinfo: dict |
563 |
@param expinfo: Prepared export information from source cluster |
564 |
@type override_hvparams: dict or None |
565 |
@param override_hvparams: Hypervisor parameters to override |
566 |
@type override_beparams: dict or None |
567 |
@param override_beparams: Backend parameters to override |
568 |
@type override_osparams: dict or None |
569 |
@param override_osparams: OS parameters to override |
570 |
@type override_nics: dict or None |
571 |
@param override_nics: NICs to override |
572 |
@type is_attempt_opportunistic: bool |
573 |
@param is_attempt_opportunistic: Whether to use opportunistic locking or not |
574 |
@return: Job ID |
575 |
|
576 |
""" |
577 |
if dest_disk_template: |
578 |
disk_template = dest_disk_template |
579 |
else: |
580 |
disk_template = instance["disk_template"] |
581 |
|
582 |
disks = [] |
583 |
for idisk in instance["disks"]: |
584 |
odisk = { |
585 |
constants.IDISK_SIZE: idisk["size"], |
586 |
constants.IDISK_MODE: idisk["mode"], |
587 |
constants.IDISK_NAME: str(idisk.get("name")), |
588 |
} |
589 |
spindles = idisk.get("spindles") |
590 |
if spindles is not None: |
591 |
odisk[constants.IDISK_SPINDLES] = spindles |
592 |
disks.append(odisk) |
593 |
|
594 |
try: |
595 |
nics = [{ |
596 |
constants.INIC_IP: ip, |
597 |
constants.INIC_MAC: mac, |
598 |
constants.INIC_MODE: mode, |
599 |
constants.INIC_LINK: link, |
600 |
constants.INIC_VLAN: vlan, |
601 |
constants.INIC_NETWORK: network, |
602 |
constants.INIC_NAME: nic_name |
603 |
} for nic_name, _, ip, mac, mode, link, vlan, network, _ |
604 |
in instance["nics"]] |
605 |
except ValueError: |
606 |
raise Error("Received NIC information does not match expected format; " |
607 |
"Do the versions of this tool and the source cluster match?") |
608 |
|
609 |
if len(override_nics) > len(nics): |
610 |
raise Error("Can not create new NICs") |
611 |
|
612 |
if override_nics: |
613 |
assert len(override_nics) <= len(nics) |
614 |
for idx, (nic, override) in enumerate(zip(nics, override_nics)): |
615 |
nics[idx] = objects.FillDict(nic, override) |
616 |
|
617 |
# TODO: Should this be the actual up/down status? (run_state) |
618 |
start = (instance["config_state"] == "up") |
619 |
|
620 |
assert len(disks) == len(instance["disks"]) |
621 |
assert len(nics) == len(instance["nics"]) |
622 |
|
623 |
inst_beparams = instance["be_instance"] |
624 |
if not inst_beparams: |
625 |
inst_beparams = {} |
626 |
|
627 |
inst_hvparams = instance["hv_instance"] |
628 |
if not inst_hvparams: |
629 |
inst_hvparams = {} |
630 |
|
631 |
inst_osparams = instance["os_instance"] |
632 |
if not inst_osparams: |
633 |
inst_osparams = {} |
634 |
|
635 |
return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT, |
636 |
name, disk_template, disks, nics, |
637 |
os=instance["os"], |
638 |
pnode=pnode, |
639 |
snode=snode, |
640 |
start=start, |
641 |
ip_check=False, |
642 |
iallocator=iallocator, |
643 |
hypervisor=instance["hypervisor"], |
644 |
source_handshake=expinfo["handshake"], |
645 |
source_x509_ca=expinfo["x509_ca"], |
646 |
compress=compress, |
647 |
source_instance_name=instance["name"], |
648 |
beparams=objects.FillDict(inst_beparams, |
649 |
override_beparams), |
650 |
hvparams=objects.FillDict(inst_hvparams, |
651 |
override_hvparams), |
652 |
osparams=objects.FillDict(inst_osparams, |
653 |
override_osparams), |
654 |
opportunistic_locking=is_attempt_opportunistic |
655 |
) |
656 |
|
657 |
|
658 |
class MoveSourceExecutor(object): |
659 |
def __init__(self, src_client, mrt): |
660 |
"""Source side of an instance move. |
661 |
|
662 |
@type src_client: L{rapi.client.GanetiRapiClient} |
663 |
@param src_client: RAPI client |
664 |
@type mrt: L{MoveRuntime} |
665 |
@param mrt: Instance move runtime information |
666 |
|
667 |
""" |
668 |
logging.info("Checking whether instance exists") |
669 |
self._CheckInstance(src_client, mrt.move.src_instance_name) |
670 |
|
671 |
logging.info("Retrieving instance information from source cluster") |
672 |
instinfo = self._GetInstanceInfo(src_client, mrt.PollJob, |
673 |
mrt.move.src_instance_name) |
674 |
if (instinfo["disk_template"] in constants.DTS_FILEBASED): |
675 |
raise Error("Inter-cluster move of file-based instances is not" |
676 |
" supported.") |
677 |
|
678 |
logging.info("Preparing export on source cluster") |
679 |
expinfo = self._PrepareExport(src_client, mrt.PollJob, |
680 |
mrt.move.src_instance_name) |
681 |
assert "handshake" in expinfo |
682 |
assert "x509_key_name" in expinfo |
683 |
assert "x509_ca" in expinfo |
684 |
|
685 |
# Hand information to destination thread |
686 |
mrt.source_to_dest.acquire() |
687 |
try: |
688 |
mrt.src_instinfo = instinfo |
689 |
mrt.src_expinfo = expinfo |
690 |
mrt.source_to_dest.notifyAll() |
691 |
finally: |
692 |
mrt.source_to_dest.release() |
693 |
|
694 |
logging.info("Waiting for destination information to become available") |
695 |
mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None) |
696 |
|
697 |
logging.info("Starting remote export on source cluster") |
698 |
self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name, |
699 |
expinfo["x509_key_name"], mrt.move.compress, |
700 |
mrt.dest_impinfo) |
701 |
|
702 |
logging.info("Export successful") |
703 |
|
704 |
@staticmethod |
705 |
def _CheckInstance(cl, name): |
706 |
"""Checks whether the instance exists on the source cluster. |
707 |
|
708 |
@type cl: L{rapi.client.GanetiRapiClient} |
709 |
@param cl: RAPI client |
710 |
@type name: string |
711 |
@param name: Instance name |
712 |
|
713 |
""" |
714 |
try: |
715 |
cl.GetInstance(name) |
716 |
except rapi.client.GanetiApiError, err: |
717 |
if err.code == rapi.client.HTTP_NOT_FOUND: |
718 |
raise Error("Instance %s not found (%s)" % (name, str(err))) |
719 |
raise |
720 |
|
721 |
@staticmethod |
722 |
def _GetInstanceInfo(cl, poll_job_fn, name): |
723 |
"""Retrieves detailed instance information from source cluster. |
724 |
|
725 |
@type cl: L{rapi.client.GanetiRapiClient} |
726 |
@param cl: RAPI client |
727 |
@type poll_job_fn: callable |
728 |
@param poll_job_fn: Function to poll for job result |
729 |
@type name: string |
730 |
@param name: Instance name |
731 |
|
732 |
""" |
733 |
job_id = cl.GetInstanceInfo(name, static=True) |
734 |
result = poll_job_fn(cl, job_id) |
735 |
assert len(result[0].keys()) == 1 |
736 |
return result[0][result[0].keys()[0]] |
737 |
|
738 |
@staticmethod |
739 |
def _PrepareExport(cl, poll_job_fn, name): |
740 |
"""Prepares export on source cluster. |
741 |
|
742 |
@type cl: L{rapi.client.GanetiRapiClient} |
743 |
@param cl: RAPI client |
744 |
@type poll_job_fn: callable |
745 |
@param poll_job_fn: Function to poll for job result |
746 |
@type name: string |
747 |
@param name: Instance name |
748 |
|
749 |
""" |
750 |
job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE) |
751 |
return poll_job_fn(cl, job_id)[0] |
752 |
|
753 |
@staticmethod |
754 |
def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo): |
755 |
"""Exports instance from source cluster. |
756 |
|
757 |
@type cl: L{rapi.client.GanetiRapiClient} |
758 |
@param cl: RAPI client |
759 |
@type poll_job_fn: callable |
760 |
@param poll_job_fn: Function to poll for job result |
761 |
@type name: string |
762 |
@param name: Instance name |
763 |
@param x509_key_name: Source X509 key |
764 |
@type compress: string |
765 |
@param compress: Compression mode to use |
766 |
@param impinfo: Import information from destination cluster |
767 |
|
768 |
""" |
769 |
job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE, |
770 |
impinfo["disks"], shutdown=True, |
771 |
remove_instance=True, |
772 |
x509_key_name=x509_key_name, |
773 |
destination_x509_ca=impinfo["x509_ca"], |
774 |
compress=compress) |
775 |
(fin_resu, dresults) = poll_job_fn(cl, job_id)[0] |
776 |
|
777 |
if not (fin_resu and compat.all(dresults)): |
778 |
raise Error("Export failed for disks %s" % |
779 |
utils.CommaJoin(str(idx) for idx, result |
780 |
in enumerate(dresults) if not result)) |
781 |
|
782 |
|
783 |
class MoveSourceWorker(workerpool.BaseWorker): |
784 |
def RunTask(self, rapi_factory, move): # pylint: disable=W0221 |
785 |
"""Executes an instance move. |
786 |
|
787 |
@type rapi_factory: L{RapiClientFactory} |
788 |
@param rapi_factory: RAPI client factory |
789 |
@type move: L{InstanceMove} |
790 |
@param move: Instance move information |
791 |
|
792 |
""" |
793 |
try: |
794 |
logging.info("Preparing to move %s from cluster %s to %s as %s", |
795 |
move.src_instance_name, rapi_factory.src_cluster_name, |
796 |
rapi_factory.dest_cluster_name, move.dest_instance_name) |
797 |
|
798 |
mrt = MoveRuntime(move) |
799 |
|
800 |
logging.debug("Starting destination thread") |
801 |
dest_thread = threading.Thread(name="DestFor%s" % self.getName(), |
802 |
target=mrt.HandleErrors, |
803 |
args=("dest", MoveDestExecutor, |
804 |
rapi_factory.GetDestClient(), |
805 |
mrt, )) |
806 |
dest_thread.start() |
807 |
try: |
808 |
mrt.HandleErrors("src", MoveSourceExecutor, |
809 |
rapi_factory.GetSourceClient(), mrt) |
810 |
finally: |
811 |
dest_thread.join() |
812 |
|
813 |
if mrt.src_error_message or mrt.dest_error_message: |
814 |
move.error_message = ("Source error: %s, destination error: %s" % |
815 |
(mrt.src_error_message, mrt.dest_error_message)) |
816 |
else: |
817 |
move.error_message = None |
818 |
except Exception, err: # pylint: disable=W0703 |
819 |
logging.exception("Caught unhandled exception") |
820 |
move.error_message = str(err) |
821 |
|
822 |
|
823 |
def CheckRapiSetup(rapi_factory): |
824 |
"""Checks the RAPI setup by retrieving the version. |
825 |
|
826 |
@type rapi_factory: L{RapiClientFactory} |
827 |
@param rapi_factory: RAPI client factory |
828 |
|
829 |
""" |
830 |
src_client = rapi_factory.GetSourceClient() |
831 |
logging.info("Connecting to source RAPI server") |
832 |
logging.info("Source cluster RAPI version: %s", src_client.GetVersion()) |
833 |
|
834 |
dest_client = rapi_factory.GetDestClient() |
835 |
logging.info("Connecting to destination RAPI server") |
836 |
logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion()) |
837 |
|
838 |
|
839 |
def ParseOptions(): |
840 |
"""Parses options passed to program. |
841 |
|
842 |
""" |
843 |
program = os.path.basename(sys.argv[0]) |
844 |
|
845 |
parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" |
846 |
" <source-cluster> <dest-cluster>" |
847 |
" <instance...>"), |
848 |
prog=program) |
849 |
parser.add_option(cli.DEBUG_OPT) |
850 |
parser.add_option(cli.VERBOSE_OPT) |
851 |
parser.add_option(cli.IALLOCATOR_OPT) |
852 |
parser.add_option(cli.BACKEND_OPT) |
853 |
parser.add_option(cli.HVOPTS_OPT) |
854 |
parser.add_option(cli.OSPARAMS_OPT) |
855 |
parser.add_option(cli.NET_OPT) |
856 |
parser.add_option(SRC_RAPI_PORT_OPT) |
857 |
parser.add_option(SRC_CA_FILE_OPT) |
858 |
parser.add_option(SRC_USERNAME_OPT) |
859 |
parser.add_option(SRC_PASSWORD_FILE_OPT) |
860 |
parser.add_option(DEST_RAPI_PORT_OPT) |
861 |
parser.add_option(DEST_CA_FILE_OPT) |
862 |
parser.add_option(DEST_USERNAME_OPT) |
863 |
parser.add_option(DEST_PASSWORD_FILE_OPT) |
864 |
parser.add_option(DEST_INSTANCE_NAME_OPT) |
865 |
parser.add_option(DEST_PRIMARY_NODE_OPT) |
866 |
parser.add_option(DEST_SECONDARY_NODE_OPT) |
867 |
parser.add_option(DEST_DISK_TEMPLATE_OPT) |
868 |
parser.add_option(COMPRESS_OPT) |
869 |
parser.add_option(PARALLEL_OPT) |
870 |
parser.add_option(OPPORTUNISTIC_TRIES_OPT) |
871 |
parser.add_option(OPPORTUNISTIC_DELAY_OPT) |
872 |
|
873 |
(options, args) = parser.parse_args() |
874 |
|
875 |
return (parser, options, args) |
876 |
|
877 |
|
878 |
def CheckOptions(parser, options, args): |
879 |
"""Checks options and arguments for validity. |
880 |
|
881 |
""" |
882 |
if len(args) < 3: |
883 |
parser.error("Not enough arguments") |
884 |
|
885 |
src_cluster_name = args.pop(0) |
886 |
dest_cluster_name = args.pop(0) |
887 |
instance_names = args |
888 |
|
889 |
assert len(instance_names) > 0 |
890 |
|
891 |
# TODO: Remove once using system default paths for SSL certificate |
892 |
# verification is implemented |
893 |
if not options.src_ca_file: |
894 |
parser.error("Missing source cluster CA file") |
895 |
|
896 |
if options.parallel < 1: |
897 |
parser.error("Number of simultaneous moves must be >= 1") |
898 |
|
899 |
if (bool(options.iallocator) and |
900 |
bool(options.dest_primary_node or options.dest_secondary_node)): |
901 |
parser.error("Destination node and iallocator options exclude each other") |
902 |
|
903 |
if (not options.iallocator and (options.opportunistic_tries > 0)): |
904 |
parser.error("Opportunistic instance creation can only be used with an" |
905 |
" iallocator") |
906 |
|
907 |
tries_specified = options.opportunistic_tries is not None |
908 |
delay_specified = options.opportunistic_delay is not None |
909 |
if tries_specified: |
910 |
if options.opportunistic_tries < 0: |
911 |
parser.error("Number of opportunistic creation attempts must be >= 0") |
912 |
if delay_specified: |
913 |
if options.opportunistic_delay <= 0: |
914 |
parser.error("The delay between two successive creation attempts must" |
915 |
" be greater than zero") |
916 |
elif delay_specified: |
917 |
parser.error("Opportunistic delay can only be specified when opportunistic" |
918 |
" tries are used") |
919 |
else: |
920 |
# The default values will be provided later |
921 |
pass |
922 |
|
923 |
if len(instance_names) == 1: |
924 |
# Moving one instance only |
925 |
if options.hvparams: |
926 |
utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES) |
927 |
|
928 |
if options.beparams: |
929 |
utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES) |
930 |
|
931 |
if options.nics: |
932 |
options.nics = cli.ParseNicOption(options.nics) |
933 |
else: |
934 |
# Moving more than one instance |
935 |
if (options.dest_instance_name or options.dest_primary_node or |
936 |
options.dest_secondary_node or options.hvparams or |
937 |
options.beparams or options.osparams or options.nics): |
938 |
parser.error("The options --dest-instance-name, --dest-primary-node," |
939 |
" --dest-secondary-node, --hypervisor-parameters," |
940 |
" --backend-parameters, --os-parameters and --net can" |
941 |
" only be used when moving exactly one instance") |
942 |
|
943 |
return (src_cluster_name, dest_cluster_name, instance_names) |
944 |
|
945 |
|
946 |
def DestClusterHasDefaultIAllocator(rapi_factory): |
947 |
"""Determines if a given cluster has a default iallocator. |
948 |
|
949 |
""" |
950 |
result = rapi_factory.GetDestClient().GetInfo() |
951 |
ia_name = "default_iallocator" |
952 |
return ia_name in result and result[ia_name] |
953 |
|
954 |
|
955 |
def ExitWithError(message): |
956 |
"""Exits after an error and shows a message. |
957 |
|
958 |
""" |
959 |
sys.stderr.write("move-instance: error: " + message + "\n") |
960 |
sys.exit(constants.EXIT_FAILURE) |
961 |
|
962 |
|
963 |
@UsesRapiClient |
964 |
def main(): |
965 |
"""Main routine. |
966 |
|
967 |
""" |
968 |
(parser, options, args) = ParseOptions() |
969 |
|
970 |
utils.SetupToolLogging(options.debug, options.verbose, threadname=True) |
971 |
|
972 |
(src_cluster_name, dest_cluster_name, instance_names) = \ |
973 |
CheckOptions(parser, options, args) |
974 |
|
975 |
logging.info("Source cluster: %s", src_cluster_name) |
976 |
logging.info("Destination cluster: %s", dest_cluster_name) |
977 |
logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names)) |
978 |
|
979 |
rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name) |
980 |
|
981 |
CheckRapiSetup(rapi_factory) |
982 |
|
983 |
has_iallocator = options.iallocator or \ |
984 |
DestClusterHasDefaultIAllocator(rapi_factory) |
985 |
|
986 |
if len(instance_names) > 1 and not has_iallocator: |
987 |
ExitWithError("When moving multiple nodes, an iallocator must be used. " |
988 |
"None was provided and the target cluster does not have " |
989 |
"a default iallocator.") |
990 |
if (len(instance_names) == 1 and not (has_iallocator or |
991 |
options.dest_primary_node or options.dest_secondary_node)): |
992 |
ExitWithError("Target cluster does not have a default iallocator, " |
993 |
"please specify either destination nodes or an iallocator.") |
994 |
|
995 |
# Prepare list of instance moves |
996 |
moves = [] |
997 |
for src_instance_name in instance_names: |
998 |
if options.dest_instance_name: |
999 |
assert len(instance_names) == 1 |
1000 |
# Rename instance |
1001 |
dest_instance_name = options.dest_instance_name |
1002 |
else: |
1003 |
dest_instance_name = src_instance_name |
1004 |
|
1005 |
moves.append(InstanceMove(src_instance_name, dest_instance_name, |
1006 |
options.dest_primary_node, |
1007 |
options.dest_secondary_node, |
1008 |
options.compress, |
1009 |
options.iallocator, |
1010 |
options.dest_disk_template, |
1011 |
options.hvparams, |
1012 |
options.beparams, |
1013 |
options.osparams, |
1014 |
options.nics, |
1015 |
options.opportunistic_tries, |
1016 |
options.opportunistic_delay)) |
1017 |
|
1018 |
assert len(moves) == len(instance_names) |
1019 |
|
1020 |
# Start workerpool |
1021 |
wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker) |
1022 |
try: |
1023 |
# Add instance moves to workerpool |
1024 |
for move in moves: |
1025 |
wp.AddTask((rapi_factory, move)) |
1026 |
|
1027 |
# Wait for all moves to finish |
1028 |
wp.Quiesce() |
1029 |
|
1030 |
finally: |
1031 |
wp.TerminateWorkers() |
1032 |
|
1033 |
# There should be no threads running at this point, hence not using locks |
1034 |
# anymore |
1035 |
|
1036 |
logging.info("Instance move results:") |
1037 |
|
1038 |
for move in moves: |
1039 |
if move.dest_instance_name == move.src_instance_name: |
1040 |
name = move.src_instance_name |
1041 |
else: |
1042 |
name = "%s as %s" % (move.src_instance_name, move.dest_instance_name) |
1043 |
|
1044 |
if move.error_message: |
1045 |
msg = "Failed (%s)" % move.error_message |
1046 |
else: |
1047 |
msg = "Success" |
1048 |
|
1049 |
logging.info("%s: %s", name, msg) |
1050 |
|
1051 |
if compat.any(move.error_message for move in moves): |
1052 |
sys.exit(constants.EXIT_FAILURE) |
1053 |
|
1054 |
sys.exit(constants.EXIT_SUCCESS) |
1055 |
|
1056 |
|
1057 |
if __name__ == "__main__": |
1058 |
main() |