from ganeti import compat
from ganeti import utils
from ganeti import objects
+from ganeti import netutils
class _ImportExportError(Exception):
#: Time after which daemon must be listening
DEFAULT_LISTEN_TIMEOUT = 10
+ #: Progress update interval
+ DEFAULT_PROGRESS_INTERVAL = 60
+
__slots__ = [
"error",
"ready",
"listen",
"connect",
+ "progress",
]
def __init__(self, connect,
listen=DEFAULT_LISTEN_TIMEOUT,
error=DEFAULT_ERROR_TIMEOUT,
- ready=DEFAULT_READY_TIMEOUT):
+ ready=DEFAULT_READY_TIMEOUT,
+ progress=DEFAULT_PROGRESS_INTERVAL):
"""Initializes this class.
@type connect: number
@param error: Length of time until errors cause hard failure
@type ready: number
@param ready: Timeout for daemon to become ready
+ @type progress: number
+ @param progress: Progress update interval
"""
self.error = error
self.ready = ready
self.listen = listen
self.connect = connect
+ self.progress = progress
class ImportExportCbBase(object):
"""
+ def ReportProgress(self, ie, private):
+ """Called when new progress information should be reported.
+
+ @type ie: Subclass of L{_DiskImportExportBase}
+ @param ie: Import/export object
+ @param private: Private data passed to import/export object
+
+ """
+
def ReportFinished(self, ie, private):
"""Called when a transfer has finished.
self._ts_connected = None
self._ts_finished = None
self._ts_cleanup = None
+ self._ts_last_progress = None
self._ts_last_error = None
# Transfer status
return None
@property
+ def progress(self):
+ """Returns transfer progress information.
+
+ """
+ if not self._daemon:
+ return None
+
+ return (self._daemon.progress_mbytes,
+ self._daemon.progress_throughput,
+ self._daemon.progress_percent,
+ self._daemon.progress_eta)
+
+ @property
+ def magic(self):
+ """Returns the magic value for this import/export.
+
+ """
+ return self._opts.magic
+
+ @property
def active(self):
"""Determines whether this transport is still active.
return False
+ def _CheckProgress(self):
+ """Checks whether a progress update should be reported.
+
+ """
+ if ((self._ts_last_progress is None or
+ _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
+ self._daemon and
+ self._daemon.progress_mbytes is not None and
+ self._daemon.progress_throughput is not None):
+ self._cbs.ReportProgress(self, self._private)
+ self._ts_last_progress = time.time()
+
def CheckFinished(self):
"""Checks whether the daemon exited.
return True
if self._daemon.exit_status is None:
+ # TODO: Adjust delay for ETA expiring soon
+ self._CheckProgress()
return False
self._ts_finished = time.time()
"""Finalizes this import/export.
"""
- assert error or self.success is not None
-
if self._daemon_name:
logging.info("Finalizing %s %r on %s",
self.MODE_TEXT, self._daemon_name, self.node_name)
return self._ts_begin
+def FormatProgress(progress):
+ """Formats progress information for user consumption
+
+ """
+ (mbytes, throughput, percent, eta) = progress
+
+ parts = [
+ utils.FormatUnit(mbytes, "h"),
+
+ # Not using FormatUnit as it doesn't support kilobytes
+ "%0.1f MiB/s" % throughput,
+ ]
+
+ if percent is not None:
+ parts.append("%d%%" % percent)
+
+ if eta is not None:
+ parts.append("ETA %s" % utils.FormatSeconds(eta))
+
+ return utils.CommaJoin(parts)
+
+
class ImportExportLoop:
MIN_DELAY = 1.0
MAX_DELAY = 20.0
logging.exception("%s failed", diskie.MODE_TEXT)
diskie.Finalize(error=str(err))
- if not compat.any([diskie.active for diskie in self._queue]):
+ if not compat.any(diskie.active for diskie in self._queue):
break
# Wait a bit
class _TransferInstCbBase(ImportExportCbBase):
def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
- dest_node, dest_ip, export_opts):
+ dest_node, dest_ip):
"""Initializes this class.
"""
self.src_cbs = src_cbs
self.dest_node = dest_node
self.dest_ip = dest_ip
- self.export_opts = export_opts
class _TransferInstSourceCb(_TransferInstCbBase):
self.feedback_fn("%s is sending data on %s" %
(dtp.data.name, ie.node_name))
+ def ReportProgress(self, ie, dtp):
+ """Called when new progress information should be reported.
+
+ """
+ progress = ie.progress
+ if not progress:
+ return
+
+ self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
+
def ReportFinished(self, ie, dtp):
"""Called when a transfer has finished.
assert self.src_cbs
assert dtp.src_export is None
assert dtp.dest_import
+ assert dtp.export_opts
self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
# Start export on source node
- de = DiskExport(self.lu, self.src_node, self.export_opts,
+ de = DiskExport(self.lu, self.src_node, dtp.export_opts,
self.dest_ip, ie.listen_port,
self.instance, dtp.data.src_io, dtp.data.src_ioargs,
self.timeouts, self.src_cbs, private=dtp)
class _DiskTransferPrivate(object):
- def __init__(self, data, success):
+ def __init__(self, data, success, export_opts):
"""Initializes this class.
@type data: L{DiskTransfer}
"""
self.data = data
+ self.success = success
+ self.export_opts = export_opts
self.src_export = None
self.dest_import = None
- self.success = success
-
def RecordResult(self, success):
"""Updates the status.
self.success = self.success and success
+def _GetInstDiskMagic(base, instance_name, index):
+ """Computes the magic value for a disk export or import.
+
+ @type base: string
+ @param base: Random seed value (can be the same for all disks of a transfer)
+ @type instance_name: string
+ @param instance_name: Name of instance
+ @type index: number
+ @param index: Disk index
+
+ """
+ h = compat.sha1_hash()
+ h.update(str(constants.RIE_VERSION))
+ h.update(base)
+ h.update(instance_name)
+ h.update(str(index))
+ return h.hexdigest()
+
+
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
instance, all_transfers):
"""Transfers an instance's data from one node to another.
each transfer
"""
- # Compress only if transfer is to another node
- if src_node == dest_node:
- compress = constants.IEC_NONE
- else:
- compress = constants.IEC_GZIP
+ # Disable compression for all moves as these are all within the same cluster
+ compress = constants.IEC_NONE
logging.debug("Source node %s, destination node %s, compression '%s'",
src_node, dest_node, compress)
- opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
- compress=compress)
-
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
- src_node, None, dest_node, dest_ip, opts)
+ src_node, None, dest_node, dest_ip)
dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
- src_node, src_cbs, dest_node, dest_ip, opts)
+ src_node, src_cbs, dest_node, dest_ip)
all_dtp = []
+ base_magic = utils.GenerateSecret(6)
+
ieloop = ImportExportLoop(lu)
try:
- for transfer in all_transfers:
+ for idx, transfer in enumerate(all_transfers):
if transfer:
feedback_fn("Exporting %s from %s to %s" %
(transfer.name, src_node, dest_node))
- dtp = _DiskTransferPrivate(transfer, True)
+ magic = _GetInstDiskMagic(base_magic, instance.name, idx)
+ opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
+ compress=compress, magic=magic)
+
+ dtp = _DiskTransferPrivate(transfer, True, opts)
di = DiskImport(lu, dest_node, opts, instance,
transfer.dest_io, transfer.dest_ioargs,
dtp.dest_import = di
else:
- dtp = _DiskTransferPrivate(None, False)
+ dtp = _DiskTransferPrivate(None, False, None)
all_dtp.append(dtp)
ieloop.FinalizeAll()
assert len(all_dtp) == len(all_transfers)
- assert compat.all([(dtp.src_export is None or
+ assert compat.all((dtp.src_export is None or
dtp.src_export.success is not None) and
(dtp.dest_import is None or
dtp.dest_import.success is not None)
- for dtp in all_dtp]), \
+ for dtp in all_dtp), \
"Not all imports/exports are finalized"
return [bool(dtp.success) for dtp in all_dtp]
self._feedback_fn("Disk %s is now sending data" % idx)
+ def ReportProgress(self, ie, private):
+ """Called when new progress information should be reported.
+
+ """
+ (idx, _) = private
+
+ progress = ie.progress
+ if not progress:
+ return
+
+ self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
+
def ReportFinished(self, ie, private):
"""Called when a transfer has finished.
return (fin_resu, dresults)
- def RemoteExport(self, opts, disk_info, timeouts):
+ def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
"""Inter-cluster instance export.
- @type opts: L{objects.ImportExportOptions}
- @param opts: Import/export daemon options
@type disk_info: list
@param disk_info: Per-disk destination information
+ @type key_name: string
+ @param key_name: Name of X509 key to use
+ @type dest_ca_pem: string
+ @param dest_ca_pem: Destination X509 CA in PEM format
@type timeouts: L{ImportExportTimeouts}
@param timeouts: Timeouts for this import
ieloop = ImportExportLoop(self._lu)
try:
- for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
- disk_info)):
+ for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
+ disk_info)):
+ opts = objects.ImportExportOptions(key_name=key_name,
+ ca_pem=dest_ca_pem,
+ magic=magic)
+
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
finished_fn = compat.partial(self._TransferFinished, idx)
ieloop.Add(DiskExport(self._lu, instance.primary_node,
host = self._external_address
disks = []
- for idx, port in enumerate(self._daemon_port):
+ for idx, (port, magic) in enumerate(self._daemon_port):
disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
- idx, host, port))
+ idx, host, port, magic))
assert len(disks) == self._disk_count
assert self._daemon_port[idx] is None
- self._daemon_port[idx] = ie.listen_port
+ self._daemon_port[idx] = (ie.listen_port, ie.magic)
self._CheckAllListening()
source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
source_x509_ca)
+ magic_base = utils.GenerateSecret(6)
+
# Create crypto key
result = lu.rpc.call_x509_cert_create(instance.primary_node,
constants.RIE_CERT_VALIDITY)
x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509_cert_pem)
- # Import daemon options
- opts = objects.ImportExportOptions(key_name=x509_key_name,
- ca_pem=source_ca_pem)
-
# Sign certificate
signed_x509_cert_pem = \
utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
ieloop = ImportExportLoop(lu)
try:
for idx, dev in enumerate(instance.disks):
+ magic = _GetInstDiskMagic(magic_base, instance.name, idx)
+
+ # Import daemon options
+ opts = objects.ImportExportOptions(key_name=x509_key_name,
+ ca_pem=source_ca_pem,
+ magic=magic)
+
ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, )))
return None
-def _GetRieDiskInfoMessage(disk_index, host, port):
+def _GetRieDiskInfoMessage(disk_index, host, port, magic):
"""Returns the hashed text for import/export disk information.
@type disk_index: number
@param host: Hostname
@type port: number
@param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
"""
- return "%s:%s:%s" % (disk_index, host, port)
+ return "%s:%s:%s:%s" % (disk_index, host, port, magic)
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
"""
try:
- (host, port, hmac_digest, hmac_salt) = disk_info
+ (host, port, magic, hmac_digest, hmac_salt) = disk_info
except (TypeError, ValueError), err:
raise errors.GenericError("Invalid data: %s" % err)
- if not (host and port):
- raise errors.GenericError("Missing destination host or port")
+ if not (host and port and magic):
+ raise errors.GenericError("Missing destination host, port or magic")
- msg = _GetRieDiskInfoMessage(disk_index, host, port)
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
raise errors.GenericError("HMAC is wrong")
- return (host, port)
+ return (netutils.HostInfo.NormalizeName(host),
+ utils.ValidateServiceName(port),
+ magic)
-def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
+def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
"""Computes the signed disk information for a remote import.
@type cds: string
@param host: Hostname
@type port: number
@param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
"""
- msg = _GetRieDiskInfoMessage(disk_index, host, port)
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
- return (host, port, hmac_digest, salt)
+ return (host, port, magic, hmac_digest, salt)