"""
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
# C0103: Invalid name move-instance
import os
from ganeti import constants
from ganeti import utils
from ganeti import workerpool
+from ganeti import objects
from ganeti import compat
from ganeti import rapi
-import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client # pylint: disable=W0611
import ganeti.rapi.client_utils
self.src_cluster_name = src_cluster_name
self.dest_cluster_name = dest_cluster_name
+ # TODO: Implement timeouts for RAPI connections
# TODO: Support for using system default paths for verifying SSL certificate
- # (already implemented in CertAuthorityVerify)
logging.debug("Using '%s' as source CA", options.src_ca_file)
- src_ssl_config = rapi.client.CertAuthorityVerify(cafile=options.src_ca_file)
+ src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
if options.dest_ca_file:
logging.debug("Using '%s' as destination CA", options.dest_ca_file)
- dest_ssl_config = \
- rapi.client.CertAuthorityVerify(cafile=options.dest_ca_file)
+ dest_curl_config = \
+ rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
else:
logging.debug("Using source CA for destination")
- dest_ssl_config = src_ssl_config
+ dest_curl_config = src_curl_config
logging.debug("Source RAPI server is %s:%s",
src_cluster_name, options.src_rapi_port)
self.GetSourceClient = lambda: \
rapi.client.GanetiRapiClient(src_cluster_name,
port=options.src_rapi_port,
- config_ssl_verification=src_ssl_config,
+ curl_config_fn=src_curl_config,
username=src_username,
password=src_password)
self.GetDestClient = lambda: \
rapi.client.GanetiRapiClient(dest_cluster_name,
port=dest_rapi_port,
- config_ssl_verification=dest_ssl_config,
+ curl_config_fn=dest_curl_config,
username=dest_username,
password=dest_password)
return
logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
- utils.SafeEncode(log_msg))
+ cli.FormatLogMessage(log_type, log_msg))
def ReportNotChanged(self, job_id, status):
"""Called if a job hasn't changed in a while.
"""
def __init__(self, src_instance_name, dest_instance_name,
- dest_pnode, dest_snode, dest_iallocator):
+ dest_pnode, dest_snode, dest_iallocator,
+ hvparams, beparams, osparams, nics):
"""Initializes this class.
@type src_instance_name: string
@param dest_snode: Name of secondary node on destination cluster
@type dest_iallocator: string or None
@param dest_iallocator: Name of iallocator to use
+ @type hvparams: dict or None
+ @param hvparams: Hypervisor parameters to override
+ @type beparams: dict or None
+ @param beparams: Backend parameters to override
+ @type osparams: dict or None
+ @param osparams: OS parameters to override
+ @type nics: dict or None
+ @param nics: NICs to override
"""
self.src_instance_name = src_instance_name
self.dest_pnode = dest_pnode
self.dest_snode = dest_snode
self.dest_iallocator = dest_iallocator
+ self.hvparams = hvparams
+ self.beparams = beparams
+ self.osparams = osparams
+ self.nics = nics
- self.success = None
self.error_message = None
self.source_to_dest = threading.Condition(self.lock)
self.dest_to_source = threading.Condition(self.lock)
- # Set when threads should abort
- self.abort = None
-
# Source information
- self.src_success = None
self.src_error_message = None
self.src_expinfo = None
self.src_instinfo = None
# Destination information
- self.dest_success = None
self.dest_error_message = None
self.dest_impinfo = None
# Call inner function
fn(*args)
- success = True
errmsg = None
except Abort:
- success = False
errmsg = "Aborted"
except Exception, err:
logging.exception("Caught unhandled exception")
- success = False
errmsg = str(err)
+ setattr(self, "%s_error_message" % prefix, errmsg)
+
self.lock.acquire()
try:
- # Tell all threads to abort
- self.abort = True
self.source_to_dest.notifyAll()
self.dest_to_source.notifyAll()
finally:
self.lock.release()
- setattr(self, "%s_success" % prefix, success)
- setattr(self, "%s_error_message" % prefix, errmsg)
-
def CheckAbort(self):
"""Check whether thread should be aborted.
@raise Abort: When thread should be aborted
"""
- if self.abort:
+ if not (self.src_error_message is None and
+ self.dest_error_message is None):
logging.info("Aborting")
raise Abort()
job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
mrt.move.dest_pnode, mrt.move.dest_snode,
mrt.move.dest_iallocator,
- mrt.src_instinfo, mrt.src_expinfo)
+ mrt.src_instinfo, mrt.src_expinfo,
+ mrt.move.hvparams, mrt.move.beparams,
+ mrt.move.beparams, mrt.move.nics)
mrt.PollJob(dest_client, job_id,
remote_import_fn=compat.partial(self._SetImportInfo, mrt))
mrt.dest_to_source.release()
@staticmethod
- def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
+ def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
+ override_hvparams, override_beparams, override_osparams,
+ override_nics):
"""Starts the instance creation in remote import mode.
@type cl: L{rapi.client.GanetiRapiClient}
@param instance: Instance details from source cluster
@type expinfo: dict
@param expinfo: Prepared export information from source cluster
+ @type override_hvparams: dict or None
+ @param override_hvparams: Hypervisor parameters to override
+ @type override_beparams: dict or None
+ @param override_beparams: Backend parameters to override
+ @type override_osparams: dict or None
+ @param override_osparams: OS parameters to override
+ @type override_nics: dict or None
+ @param override_nics: NICs to override
@return: Job ID
"""
disk_template = instance["disk_template"]
disks = [{
- "size": i["size"],
- "mode": i["mode"],
+ constants.IDISK_SIZE: i["size"],
+ constants.IDISK_MODE: i["mode"],
} for i in instance["disks"]]
nics = [{
- "ip": ip,
- "mac": mac,
- "mode": mode,
- "link": link,
+ constants.INIC_IP: ip,
+ constants.INIC_MAC: mac,
+ constants.INIC_MODE: mode,
+ constants.INIC_LINK: link,
} for ip, mac, mode, link in instance["nics"]]
+ if len(override_nics) > len(nics):
+ raise Error("Can not create new NICs")
+
+ if override_nics:
+ assert len(override_nics) <= len(nics)
+ for idx, (nic, override) in enumerate(zip(nics, override_nics)):
+ nics[idx] = objects.FillDict(nic, override)
+
# TODO: Should this be the actual up/down status? (run_state)
start = (instance["config_state"] == "up")
assert len(disks) == len(instance["disks"])
assert len(nics) == len(instance["nics"])
+ inst_beparams = instance["be_instance"]
+ if not inst_beparams:
+ inst_beparams = {}
+
+ inst_hvparams = instance["hv_instance"]
+ if not inst_hvparams:
+ inst_hvparams = {}
+
+ inst_osparams = instance["os_instance"]
+ if not inst_osparams:
+ inst_osparams = {}
+
return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
name, disk_template, disks, nics,
os=instance["os"],
source_handshake=expinfo["handshake"],
source_x509_ca=expinfo["x509_ca"],
source_instance_name=instance["name"],
- beparams=instance["be_instance"],
- hvparams=instance["hv_instance"])
+ beparams=objects.FillDict(inst_beparams,
+ override_beparams),
+ hvparams=objects.FillDict(inst_hvparams,
+ override_hvparams),
+ osparams=objects.FillDict(inst_osparams,
+ override_osparams))
class MoveSourceExecutor(object):
class MoveSourceWorker(workerpool.BaseWorker):
- def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
+ def RunTask(self, rapi_factory, move): # pylint: disable=W0221
"""Executes an instance move.
@type rapi_factory: L{RapiClientFactory}
finally:
dest_thread.join()
- move.success = (mrt.src_success and mrt.dest_success)
if mrt.src_error_message or mrt.dest_error_message:
move.error_message = ("Source error: %s, destination error: %s" %
(mrt.src_error_message, mrt.dest_error_message))
else:
move.error_message = None
- except Exception, err: # pylint: disable-msg=W0703
+ except Exception, err: # pylint: disable=W0703
logging.exception("Caught unhandled exception")
- move.success = False
move.error_message = str(err)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option(cli.IALLOCATOR_OPT)
+ parser.add_option(cli.BACKEND_OPT)
+ parser.add_option(cli.HVOPTS_OPT)
+ parser.add_option(cli.OSPARAMS_OPT)
+ parser.add_option(cli.NET_OPT)
parser.add_option(SRC_RAPI_PORT_OPT)
parser.add_option(SRC_CA_FILE_OPT)
parser.add_option(SRC_USERNAME_OPT)
options.dest_primary_node or
options.dest_secondary_node):
parser.error("An iallocator or the destination node is required")
+
+ if options.hvparams:
+ utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
+
+ if options.beparams:
+ utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
+
+ if options.nics:
+ options.nics = cli.ParseNicOption(options.nics)
else:
# Moving more than one instance
if (options.dest_instance_name or options.dest_primary_node or
- options.dest_secondary_node):
- parser.error("The options --dest-instance-name, --dest-primary-node and"
- " --dest-secondary-node can only be used when moving exactly"
- " one instance")
+ options.dest_secondary_node or options.hvparams or
+ options.beparams or options.osparams or options.nics):
+ parser.error("The options --dest-instance-name, --dest-primary-node,"
+ " --dest-secondary-node, --hypervisor-parameters,"
+ " --backend-parameters, --os-parameters and --net can"
+ " only be used when moving exactly one instance")
if not options.iallocator:
parser.error("An iallocator must be specified for moving more than one"
return (src_cluster_name, dest_cluster_name, instance_names)
+@rapi.client.UsesRapiClient
def main():
"""Main routine.
assert len(instance_names) == 1 or options.iallocator
assert (len(instance_names) > 1 or options.iallocator or
options.dest_primary_node or options.dest_secondary_node)
+ assert (len(instance_names) == 1 or
+ not (options.hvparams or options.beparams or options.osparams or
+ options.nics))
# Prepare list of instance moves
moves = []
moves.append(InstanceMove(src_instance_name, dest_instance_name,
options.dest_primary_node,
options.dest_secondary_node,
- options.iallocator))
+ options.iallocator, options.hvparams,
+ options.beparams, options.osparams,
+ options.nics))
assert len(moves) == len(instance_names)
try:
# Add instance moves to workerpool
for move in moves:
- wp.AddTask(rapi_factory, move)
+ wp.AddTask((rapi_factory, move))
# Wait for all moves to finish
wp.Quiesce()
else:
name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
- if move.success and not move.error_message:
- msg = "Success"
- else:
+ if move.error_message:
msg = "Failed (%s)" % move.error_message
+ else:
+ msg = "Success"
logging.info("%s: %s", name, msg)
- if compat.all(move.success for move in moves):
- sys.exit(constants.EXIT_SUCCESS)
+ if compat.any(move.error_message for move in moves):
+ sys.exit(constants.EXIT_FAILURE)
- sys.exit(constants.EXIT_FAILURE)
+ sys.exit(constants.EXIT_SUCCESS)
if __name__ == "__main__":