#
#
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from ganeti import compat
from ganeti import utils
from ganeti import objects
+from ganeti import netutils
+from ganeti import pathutils
class _ImportExportError(Exception):
"""Callbacks for disk import/export.
"""
- def ReportListening(self, ie, private):
+ def ReportListening(self, ie, private, component):
"""Called when daemon started listening.
@type ie: Subclass of L{_DiskImportExportBase}
@param ie: Import/export object
@param private: Private data passed to import/export object
+ @param component: transfer component name
"""
"""
-def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
- """Checks whether a timeout has expired.
-
- """
- return _time_fn() > (epoch + timeout)
-
-
class _DiskImportExportBase(object):
MODE_TEXT = None
def __init__(self, lu, node_name, opts,
- instance, timeouts, cbs, private=None):
+ instance, component, timeouts, cbs, private=None):
"""Initializes this class.
@param lu: Logical unit instance
@param opts: Import/export daemon options
@type instance: L{objects.Instance}
@param instance: Instance object
+ @type component: string
+ @param component: which part of the instance is being imported
@type timeouts: L{ImportExportTimeouts}
@param timeouts: Timeouts for this import
@type cbs: L{ImportExportCbBase}
self._lu = lu
self.node_name = node_name
- self._opts = opts
+ self._opts = opts.Copy()
self._instance = instance
+ self._component = component
self._timeouts = timeouts
self._cbs = cbs
self._private = private
+ # Set master daemon's timeout in options for import/export daemon
+ assert self._opts.connect_timeout is None
+ self._opts.connect_timeout = timeouts.connect
+
# Parent loop
self._loop = None
"""
if self._daemon:
- return self._daemon.recent_output
+ return "\n".join(self._daemon.recent_output)
return None
daemon_name = result.payload
- logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
+ logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
self.node_name)
self._ts_begin = time.time()
"""
if self._daemon_name:
- self._lu.LogWarning("Aborting %s %r on %s",
+ self._lu.LogWarning("Aborting %s '%s' on %s",
self.MODE_TEXT, self._daemon_name, self.node_name)
result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
if result.fail_msg:
- self._lu.LogWarning("Failed to abort %s %r on %s: %s",
+ self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
self.MODE_TEXT, self._daemon_name,
self.node_name, result.fail_msg)
return False
assert self._ts_begin is not None
if not data:
- if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
+ if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
raise _ImportExportError("Didn't become ready after %s seconds" %
self._timeouts.ready)
if self._ts_last_error is None:
self._ts_last_error = time.time()
- elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
+ elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
raise _ImportExportError("Too many errors while updating data")
return False
self._ts_connected = time.time()
# TODO: Log remote peer
- logging.debug("%s %r on %s is now connected",
+ logging.debug("%s '%s' on %s is now connected",
self.MODE_TEXT, self._daemon_name, self.node_name)
self._cbs.ReportConnected(self, self._private)
return True
- if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
+ if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
+ self._timeouts.connect):
raise _ImportExportError("Not connected after %s seconds" %
self._timeouts.connect)
"""
if ((self._ts_last_progress is None or
- _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
+ utils.TimeoutExpired(self._ts_last_progress,
+ self._timeouts.progress)) and
self._daemon and
self._daemon.progress_mbytes is not None and
self._daemon.progress_throughput is not None):
self.final_message = message
if success:
- logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
- self.node_name)
+ logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
+ self._daemon_name, self.node_name)
elif self._daemon_name:
- self._lu.LogWarning("%s %r on %s failed: %s",
+ self._lu.LogWarning("%s '%s' on %s failed: %s",
self.MODE_TEXT, self._daemon_name, self.node_name,
message)
else:
"""
if self._daemon_name:
- logging.info("Finalizing %s %r on %s",
+ logging.info("Finalizing %s '%s' on %s",
self.MODE_TEXT, self._daemon_name, self.node_name)
result = self._Finalize()
if result.fail_msg:
- self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
+ self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
self.MODE_TEXT, self._daemon_name,
self.node_name, result.fail_msg)
return False
class DiskImport(_DiskImportExportBase):
MODE_TEXT = "import"
- def __init__(self, lu, node_name, opts, instance,
+ def __init__(self, lu, node_name, opts, instance, component,
dest, dest_args, timeouts, cbs, private=None):
"""Initializes this class.
@param opts: Import/export daemon options
@type instance: L{objects.Instance}
@param instance: Instance object
+ @type component: string
+ @param component: which part of the instance is being imported
@param dest: I/O destination
@param dest_args: I/O arguments
@type timeouts: L{ImportExportTimeouts}
@param private: Private data for callback functions
"""
- _DiskImportExportBase.__init__(self, lu, node_name, opts,
- instance, timeouts, cbs, private)
+ _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
+ component, timeouts, cbs, private)
self._dest = dest
self._dest_args = dest_args
"""
return self._lu.rpc.call_import_start(self.node_name, self._opts,
- self._instance,
- self._dest, self._dest_args)
+ self._instance, self._component,
+ (self._dest, self._dest_args))
def CheckListening(self):
"""Checks whether the daemon is listening.
if port is not None:
self._ts_listening = time.time()
- logging.debug("Import %r on %s is now listening on port %s",
+ logging.debug("Import '%s' on %s is now listening on port %s",
self._daemon_name, self.node_name, port)
- self._cbs.ReportListening(self, self._private)
+ self._cbs.ReportListening(self, self._private, self._component)
return True
- if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
+ if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
raise _ImportExportError("Not listening after %s seconds" %
self._timeouts.listen)
class DiskExport(_DiskImportExportBase):
MODE_TEXT = "export"
- def __init__(self, lu, node_name, opts,
- dest_host, dest_port, instance, source, source_args,
+ def __init__(self, lu, node_name, opts, dest_host, dest_port,
+ instance, component, source, source_args,
timeouts, cbs, private=None):
"""Initializes this class.
@param dest_port: Destination port number
@type instance: L{objects.Instance}
@param instance: Instance object
+ @type component: string
+ @param component: which part of the instance is being imported
@param source: I/O source
@param source_args: I/O source
@type timeouts: L{ImportExportTimeouts}
@param private: Private data for callback functions
"""
- _DiskImportExportBase.__init__(self, lu, node_name, opts,
- instance, timeouts, cbs, private)
+ _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
+ component, timeouts, cbs, private)
self._dest_host = dest_host
self._dest_port = dest_port
self._source = source
"""
return self._lu.rpc.call_export_start(self.node_name, self._opts,
self._dest_host, self._dest_port,
- self._instance, self._source,
- self._source_args)
+ self._instance, self._component,
+ (self._source, self._source_args))
def CheckListening(self):
"""Checks whether the daemon is listening.
logging.exception("%s failed", diskie.MODE_TEXT)
diskie.Finalize(error=str(err))
- if not compat.any([diskie.active for diskie in self._queue]):
+ if not compat.any(diskie.active for diskie in self._queue):
break
# Wait a bit
class _TransferInstCbBase(ImportExportCbBase):
def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
- dest_node, dest_ip, export_opts):
+ dest_node, dest_ip):
"""Initializes this class.
"""
self.src_cbs = src_cbs
self.dest_node = dest_node
self.dest_ip = dest_ip
- self.export_opts = export_opts
class _TransferInstSourceCb(_TransferInstCbBase):
if ie.success:
self.feedback_fn("%s finished sending data" % dtp.data.name)
else:
- self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
+ self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
(dtp.data.name, ie.final_message, ie.recent_output))
dtp.RecordResult(ie.success)
class _TransferInstDestCb(_TransferInstCbBase):
- def ReportListening(self, ie, dtp):
+ def ReportListening(self, ie, dtp, component):
"""Called when daemon started listening.
"""
assert self.src_cbs
assert dtp.src_export is None
assert dtp.dest_import
+ assert dtp.export_opts
self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
# Start export on source node
- de = DiskExport(self.lu, self.src_node, self.export_opts,
- self.dest_ip, ie.listen_port,
- self.instance, dtp.data.src_io, dtp.data.src_ioargs,
+ de = DiskExport(self.lu, self.src_node, dtp.export_opts,
+ self.dest_ip, ie.listen_port, self.instance,
+ component, dtp.data.src_io, dtp.data.src_ioargs,
self.timeouts, self.src_cbs, private=dtp)
ie.loop.Add(de)
if ie.success:
self.feedback_fn("%s finished receiving data" % dtp.data.name)
else:
- self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
+ self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
(dtp.data.name, ie.final_message, ie.recent_output))
dtp.RecordResult(ie.success)
class _DiskTransferPrivate(object):
- def __init__(self, data, success):
+ def __init__(self, data, success, export_opts):
"""Initializes this class.
@type data: L{DiskTransfer}
"""
self.data = data
+ self.success = success
+ self.export_opts = export_opts
self.src_export = None
self.dest_import = None
- self.success = success
-
def RecordResult(self, success):
"""Updates the status.
self.success = self.success and success
+def _GetInstDiskMagic(base, instance_name, index):
+ """Computes the magic value for a disk export or import.
+
+ @type base: string
+ @param base: Random seed value (can be the same for all disks of a transfer)
+ @type instance_name: string
+ @param instance_name: Name of instance
+ @type index: number
+ @param index: Disk index
+
+ """
+ h = compat.sha1_hash()
+ h.update(str(constants.RIE_VERSION))
+ h.update(base)
+ h.update(instance_name)
+ h.update(str(index))
+ return h.hexdigest()
+
+
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
instance, all_transfers):
"""Transfers an instance's data from one node to another.
logging.debug("Source node %s, destination node %s, compression '%s'",
src_node, dest_node, compress)
- opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
- compress=compress)
-
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
- src_node, None, dest_node, dest_ip, opts)
+ src_node, None, dest_node, dest_ip)
dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
- src_node, src_cbs, dest_node, dest_ip, opts)
+ src_node, src_cbs, dest_node, dest_ip)
all_dtp = []
+ base_magic = utils.GenerateSecret(6)
+
ieloop = ImportExportLoop(lu)
try:
- for transfer in all_transfers:
+ for idx, transfer in enumerate(all_transfers):
if transfer:
feedback_fn("Exporting %s from %s to %s" %
(transfer.name, src_node, dest_node))
- dtp = _DiskTransferPrivate(transfer, True)
+ magic = _GetInstDiskMagic(base_magic, instance.name, idx)
+ opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
+ compress=compress, magic=magic)
+
+ dtp = _DiskTransferPrivate(transfer, True, opts)
- di = DiskImport(lu, dest_node, opts, instance,
+ di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
transfer.dest_io, transfer.dest_ioargs,
timeouts, dest_cbs, private=dtp)
ieloop.Add(di)
dtp.dest_import = di
else:
- dtp = _DiskTransferPrivate(None, False)
+ dtp = _DiskTransferPrivate(None, False, None)
all_dtp.append(dtp)
ieloop.FinalizeAll()
assert len(all_dtp) == len(all_transfers)
- assert compat.all([(dtp.src_export is None or
+ assert compat.all((dtp.src_export is None or
dtp.src_export.success is not None) and
(dtp.dest_import is None or
dtp.dest_import.success is not None)
- for dtp in all_dtp]), \
+ for dtp in all_dtp), \
"Not all imports/exports are finalized"
return [bool(dtp.success) for dtp in all_dtp]
if ie.success:
self._feedback_fn("Disk %s finished sending data" % idx)
else:
- self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
+ self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
(idx, ie.final_message, ie.recent_output))
self._dresults[idx] = bool(ie.success)
instance = self._instance
src_node = instance.primary_node
- vgname = self._lu.cfg.GetVGName()
-
for idx, disk in enumerate(instance.disks):
self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
(idx, src_node))
# result.payload will be a snapshot of an lvm leaf of the one we
# passed
- result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
+ result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
+ new_dev = False
msg = result.fail_msg
if msg:
self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
idx, src_node, msg)
- new_dev = False
+ elif (not isinstance(result.payload, (tuple, list)) or
+ len(result.payload) != 2):
+ self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
+ " result '%s'", idx, src_node, result.payload)
else:
- disk_id = (vgname, result.payload)
+ disk_id = tuple(result.payload)
+ disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
logical_id=disk_id, physical_id=disk_id,
- iv_name=disk.iv_name)
+ iv_name=disk.iv_name,
+ params=disk_params)
self._snap_disks.append(new_dev)
transfers.append(None)
continue
- path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+ path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
dev.physical_id[1])
finished_fn = compat.partial(self._TransferFinished, idx)
return (fin_resu, dresults)
- def RemoteExport(self, opts, disk_info, timeouts):
+ def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
"""Inter-cluster instance export.
- @type opts: L{objects.ImportExportOptions}
- @param opts: Import/export daemon options
@type disk_info: list
@param disk_info: Per-disk destination information
+ @type key_name: string
+ @param key_name: Name of X509 key to use
+ @type dest_ca_pem: string
+ @param dest_ca_pem: Destination X509 CA in PEM format
@type timeouts: L{ImportExportTimeouts}
@param timeouts: Timeouts for this import
ieloop = ImportExportLoop(self._lu)
try:
- for idx, (dev, (host, port)) in enumerate(zip(instance.disks,
- disk_info)):
+ for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
+ disk_info)):
+ # Decide whether to use IPv6
+ ipv6 = netutils.IP6Address.IsValid(host)
+
+ opts = objects.ImportExportOptions(key_name=key_name,
+ ca_pem=dest_ca_pem,
+ magic=magic, ipv6=ipv6)
+
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
finished_fn = compat.partial(self._TransferFinished, idx)
ieloop.Add(DiskExport(self._lu, instance.primary_node,
- opts, host, port, instance,
+ opts, host, port, instance, "disk%d" % idx,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, finished_fn)))
host = self._external_address
disks = []
- for idx, port in enumerate(self._daemon_port):
+ for idx, (port, magic) in enumerate(self._daemon_port):
disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
- idx, host, port))
+ idx, host, port, magic))
assert len(disks) == self._disk_count
"x509_ca": self._x509_cert_pem,
})
- def ReportListening(self, ie, private):
+ def ReportListening(self, ie, private, _):
"""Called when daemon started listening.
"""
assert self._daemon_port[idx] is None
- self._daemon_port[idx] = ie.listen_port
+ self._daemon_port[idx] = (ie.listen_port, ie.magic)
self._CheckAllListening()
self._feedback_fn("Disk %s finished receiving data" % idx)
else:
self._feedback_fn(("Disk %s failed to receive data: %s"
- " (recent output: %r)") %
+ " (recent output: %s)") %
(idx, ie.final_message, ie.recent_output))
self._dresults[idx] = bool(ie.success)
-def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
+def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
+ cds, timeouts):
"""Imports an instance from another cluster.
@param lu: Logical unit instance
@param feedback_fn: Feedback function
@type instance: L{objects.Instance}
@param instance: Instance object
+ @type pnode: L{objects.Node}
+ @param pnode: Primary node of instance as an object
@type source_x509_ca: OpenSSL.crypto.X509
@param source_x509_ca: Import source's X509 CA
@type cds: string
source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
source_x509_ca)
+ magic_base = utils.GenerateSecret(6)
+
+ # Decide whether to use IPv6
+ ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
+
# Create crypto key
result = lu.rpc.call_x509_cert_create(instance.primary_node,
constants.RIE_CERT_VALIDITY)
x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509_cert_pem)
- # Import daemon options
- opts = objects.ImportExportOptions(key_name=x509_key_name,
- ca_pem=source_ca_pem)
-
# Sign certificate
signed_x509_cert_pem = \
utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
- len(instance.disks), instance.primary_node)
+ len(instance.disks), pnode.primary_ip)
ieloop = ImportExportLoop(lu)
try:
for idx, dev in enumerate(instance.disks):
+ magic = _GetInstDiskMagic(magic_base, instance.name, idx)
+
+ # Import daemon options
+ opts = objects.ImportExportOptions(key_name=x509_key_name,
+ ca_pem=source_ca_pem,
+ magic=magic, ipv6=ipv6)
+
ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
+ "disk%d" % idx,
constants.IEIO_SCRIPT, (dev, idx),
timeouts, cbs, private=(idx, )))
return None
-def _GetRieDiskInfoMessage(disk_index, host, port):
+def _GetRieDiskInfoMessage(disk_index, host, port, magic):
"""Returns the hashed text for import/export disk information.
@type disk_index: number
@param host: Hostname
@type port: number
@param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
"""
- return "%s:%s:%s" % (disk_index, host, port)
+ return "%s:%s:%s:%s" % (disk_index, host, port, magic)
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
"""
try:
- (host, port, hmac_digest, hmac_salt) = disk_info
+ (host, port, magic, hmac_digest, hmac_salt) = disk_info
except (TypeError, ValueError), err:
raise errors.GenericError("Invalid data: %s" % err)
- if not (host and port):
- raise errors.GenericError("Missing destination host or port")
+ if not (host and port and magic):
+ raise errors.GenericError("Missing destination host, port or magic")
- msg = _GetRieDiskInfoMessage(disk_index, host, port)
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
raise errors.GenericError("HMAC is wrong")
- return (utils.HostInfo.NormalizeName(host),
- utils.ValidateServiceName(port))
+ if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
+ destination = host
+ else:
+ destination = netutils.Hostname.GetNormalizedName(host)
+ return (destination,
+ utils.ValidateServiceName(port),
+ magic)
-def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
+
+def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
"""Computes the signed disk information for a remote import.
@type cds: string
@param host: Hostname
@type port: number
@param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
"""
- msg = _GetRieDiskInfoMessage(disk_index, host, port)
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
- return (host, port, hmac_digest, salt)
+ return (host, port, magic, hmac_digest, salt)
+
+
+def CalculateGroupIPolicy(cluster, group):
+ """Calculate instance policy for group.
+
+ """
+ return cluster.SimpleFillIPolicy(group.ipolicy)
+
+
+def ComputeDiskSize(disk_template, disks):
+ """Compute disk size requirements according to disk template
+
+ """
+ # Required free disk space as a function of disk and swap space
+ req_size_dict = {
+ constants.DT_DISKLESS: None,
+ constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
+ # 128 MB are added for drbd metadata for each disk
+ constants.DT_DRBD8:
+ sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
+ constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_BLOCK: 0,
+ constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
+ }
+
+ if disk_template not in req_size_dict:
+ raise errors.ProgrammerError("Disk template '%s' size requirement"
+ " is unknown" % disk_template)
+
+ return req_size_dict[disk_template]