from ganeti import compat
from ganeti import utils
from ganeti import objects
+from ganeti import netutils
class _ImportExportError(Exception):
self._daemon.progress_eta)
@property
+ def magic(self):
+ """Returns the magic value for this import/export.
+
+ """
+ return self._opts.magic
+
+ @property
def active(self):
"""Determines whether this transport is still active.
logging.exception("%s failed", diskie.MODE_TEXT)
diskie.Finalize(error=str(err))
- if not compat.any([diskie.active for diskie in self._queue]):
+ if not compat.any(diskie.active for diskie in self._queue):
break
# Wait a bit
class _TransferInstCbBase(ImportExportCbBase):
def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
- dest_node, dest_ip, export_opts):
+ dest_node, dest_ip):
"""Initializes this class.
"""
self.src_cbs = src_cbs
self.dest_node = dest_node
self.dest_ip = dest_ip
- self.export_opts = export_opts
class _TransferInstSourceCb(_TransferInstCbBase):
assert self.src_cbs
assert dtp.src_export is None
assert dtp.dest_import
+ assert dtp.export_opts
self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
# Start export on source node
- de = DiskExport(self.lu, self.src_node, self.export_opts,
+ de = DiskExport(self.lu, self.src_node, dtp.export_opts,
self.dest_ip, ie.listen_port,
self.instance, dtp.data.src_io, dtp.data.src_ioargs,
self.timeouts, self.src_cbs, private=dtp)
class _DiskTransferPrivate(object):
- def __init__(self, data, success):
+ def __init__(self, data, success, export_opts):
"""Initializes this class.
@type data: L{DiskTransfer}
"""
self.data = data
+ self.success = success
+ self.export_opts = export_opts
self.src_export = None
self.dest_import = None
- self.success = success
-
def RecordResult(self, success):
"""Updates the status.
self.success = self.success and success
+def _GetInstDiskMagic(base, instance_name, index):
+ """Computes the magic value for a disk export or import.
+
+ @type base: string
+ @param base: Random seed value (can be the same for all disks of a transfer)
+ @type instance_name: string
+ @param instance_name: Name of instance
+ @type index: number
+ @param index: Disk index
+
+ """
+ h = compat.sha1_hash()
+ h.update(str(constants.RIE_VERSION))
+ h.update(base)
+ h.update(instance_name)
+ h.update(str(index))
+ return h.hexdigest()
+
+
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
instance, all_transfers):
"""Transfers an instance's data from one node to another.
each transfer
"""
- # Compress only if transfer is to another node
- if src_node == dest_node:
- compress = constants.IEC_NONE
- else:
- compress = constants.IEC_GZIP
+ # Disable compression for all moves as these are all within the same cluster
+ compress = constants.IEC_NONE
logging.debug("Source node %s, destination node %s, compression '%s'",
src_node, dest_node, compress)
- opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
- compress=compress)
-
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
- src_node, None, dest_node, dest_ip, opts)
+ src_node, None, dest_node, dest_ip)
dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
- src_node, src_cbs, dest_node, dest_ip, opts)
+ src_node, src_cbs, dest_node, dest_ip)
all_dtp = []
+ base_magic = utils.GenerateSecret(6)
+
ieloop = ImportExportLoop(lu)
try:
- for transfer in all_transfers:
+ for idx, transfer in enumerate(all_transfers):
if transfer:
feedback_fn("Exporting %s from %s to %s" %
(transfer.name, src_node, dest_node))
- dtp = _DiskTransferPrivate(transfer, True)
+ magic = _GetInstDiskMagic(base_magic, instance.name, idx)
+ opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
+ compress=compress, magic=magic)
+
+ dtp = _DiskTransferPrivate(transfer, True, opts)
di = DiskImport(lu, dest_node, opts, instance,
transfer.dest_io, transfer.dest_ioargs,
dtp.dest_import = di
else:
- dtp = _DiskTransferPrivate(None, False)
+ dtp = _DiskTransferPrivate(None, False, None)
all_dtp.append(dtp)
ieloop.FinalizeAll()
assert len(all_dtp) == len(all_transfers)
- assert compat.all([(dtp.src_export is None or
+ assert compat.all((dtp.src_export is None or
dtp.src_export.success is not None) and
(dtp.dest_import is None or
dtp.dest_import.success is not None)
- for dtp in all_dtp]), \
+ for dtp in all_dtp), \
"Not all imports/exports are finalized"
return [bool(dtp.success) for dtp in all_dtp]
return (fin_resu, dresults)
- def RemoteExport(self, opts, disk_info, timeouts):
+ def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
"""Inter-cluster instance export.
- @type opts: L{objects.ImportExportOptions}
- @param opts: Import/export daemon options
@type disk_info: list
@param disk_info: Per-disk destination information
+ @type key_name: string
+ @param key_name: Name of X509 key to use
+ @type dest_ca_pem: string
+ @param dest_ca_pem: Destination X509 CA in PEM format
@type timeouts: L{ImportExportTimeouts}
@param timeouts: Timeouts for this import
ieloop = ImportExportLoop(self._lu)
try:
- for idx, (dev, (host, port)) in enumerate(zip(instance.disks,
- disk_info)):
+ for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
+ disk_info)):
+ opts = objects.ImportExportOptions(key_name=key_name,
+ ca_pem=dest_ca_pem,
+ magic=magic)
+
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
finished_fn = compat.partial(self._TransferFinished, idx)
ieloop.Add(DiskExport(self._lu, instance.primary_node,
host = self._external_address
disks = []
- for idx, port in enumerate(self._daemon_port):
+ for idx, (port, magic) in enumerate(self._daemon_port):
disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
- idx, host, port))
+ idx, host, port, magic))
assert len(disks) == self._disk_count
assert self._daemon_port[idx] is None
- self._daemon_port[idx] = ie.listen_port
+ self._daemon_port[idx] = (ie.listen_port, ie.magic)
self._CheckAllListening()
source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
source_x509_ca)
+ magic_base = utils.GenerateSecret(6)
+
# Create crypto key
result = lu.rpc.call_x509_cert_create(instance.primary_node,
constants.RIE_CERT_VALIDITY)
x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509_cert_pem)
- # Import daemon options
- opts = objects.ImportExportOptions(key_name=x509_key_name,
- ca_pem=source_ca_pem)
-
# Sign certificate
signed_x509_cert_pem = \
utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
ieloop = ImportExportLoop(lu)
try:
for idx, dev in enumerate(instance.disks):
+ magic = _GetInstDiskMagic(magic_base, instance.name, idx)
+
+ # Import daemon options
+ opts = objects.ImportExportOptions(key_name=x509_key_name,
+ ca_pem=source_ca_pem,
+ magic=magic)
+
ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, )))
return None
-def _GetRieDiskInfoMessage(disk_index, host, port):
+def _GetRieDiskInfoMessage(disk_index, host, port, magic):
"""Returns the hashed text for import/export disk information.
@type disk_index: number
@param host: Hostname
@type port: number
@param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
"""
- return "%s:%s:%s" % (disk_index, host, port)
+ return "%s:%s:%s:%s" % (disk_index, host, port, magic)
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
"""
try:
- (host, port, hmac_digest, hmac_salt) = disk_info
+ (host, port, magic, hmac_digest, hmac_salt) = disk_info
except (TypeError, ValueError), err:
raise errors.GenericError("Invalid data: %s" % err)
- if not (host and port):
- raise errors.GenericError("Missing destination host or port")
+ if not (host and port and magic):
+ raise errors.GenericError("Missing destination host, port or magic")
- msg = _GetRieDiskInfoMessage(disk_index, host, port)
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
raise errors.GenericError("HMAC is wrong")
- return (utils.HostInfo.NormalizeName(host),
- utils.ValidateServiceName(port))
+ return (netutils.HostInfo.NormalizeName(host),
+ utils.ValidateServiceName(port),
+ magic)
-def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
+def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
"""Computes the signed disk information for a remote import.
@type cds: string
@param host: Hostname
@type port: number
@param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
"""
- msg = _GetRieDiskInfoMessage(disk_index, host, port)
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
- return (host, port, hmac_digest, salt)
+ return (host, port, magic, hmac_digest, salt)