X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/acd65a16f46384639126076acdb8aa8e616ab86c..acf931b71580fb9918945414673f6bd76f3a0fee:/lib/masterd/instance.py diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py index 6eb01e1..de8b462 100644 --- a/lib/masterd/instance.py +++ b/lib/masterd/instance.py @@ -32,6 +32,7 @@ from ganeti import errors from ganeti import compat from ganeti import utils from ganeti import objects +from ganeti import netutils class _ImportExportError(Exception): @@ -209,6 +210,13 @@ class _DiskImportExportBase(object): self._daemon.progress_eta) @property + def magic(self): + """Returns the magic value for this import/export. + + """ + return self._opts.magic + + @property def active(self): """Determines whether this transport is still active. @@ -786,7 +794,7 @@ class ImportExportLoop: logging.exception("%s failed", diskie.MODE_TEXT) diskie.Finalize(error=str(err)) - if not compat.any([diskie.active for diskie in self._queue]): + if not compat.any(diskie.active for diskie in self._queue): break # Wait a bit @@ -808,7 +816,7 @@ class ImportExportLoop: class _TransferInstCbBase(ImportExportCbBase): def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, - dest_node, dest_ip, export_opts): + dest_node, dest_ip): """Initializes this class. """ @@ -822,7 +830,6 @@ class _TransferInstCbBase(ImportExportCbBase): self.src_cbs = src_cbs self.dest_node = dest_node self.dest_ip = dest_ip - self.export_opts = export_opts class _TransferInstSourceCb(_TransferInstCbBase): @@ -881,11 +888,12 @@ class _TransferInstDestCb(_TransferInstCbBase): assert self.src_cbs assert dtp.src_export is None assert dtp.dest_import + assert dtp.export_opts self.feedback_fn("%s is now listening, starting export" % dtp.data.name) # Start export on source node - de = DiskExport(self.lu, self.src_node, self.export_opts, + de = DiskExport(self.lu, self.src_node, dtp.export_opts, self.dest_ip, ie.listen_port, self.instance, dtp.data.src_io, dtp.data.src_ioargs, self.timeouts, self.src_cbs, private=dtp) @@ -945,7 +953,7 @@ class DiskTransfer(object): class _DiskTransferPrivate(object): - def __init__(self, data, success): + def __init__(self, data, success, export_opts): """Initializes this class. @type data: L{DiskTransfer} @@ -953,12 +961,12 @@ class _DiskTransferPrivate(object): """ self.data = data + self.success = success + self.export_opts = export_opts self.src_export = None self.dest_import = None - self.success = success - def RecordResult(self, success): """Updates the status. @@ -968,6 +976,25 @@ class _DiskTransferPrivate(object): self.success = self.success and success +def _GetInstDiskMagic(base, instance_name, index): + """Computes the magic value for a disk export or import. + + @type base: string + @param base: Random seed value (can be the same for all disks of a transfer) + @type instance_name: string + @param instance_name: Name of instance + @type index: number + @param index: Disk index + + """ + h = compat.sha1_hash() + h.update(str(constants.RIE_VERSION)) + h.update(base) + h.update(instance_name) + h.update(str(index)) + return h.hexdigest() + + def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, instance, all_transfers): """Transfers an instance's data from one node to another. @@ -989,34 +1016,34 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, each transfer """ - # Compress only if transfer is to another node - if src_node == dest_node: - compress = constants.IEC_NONE - else: - compress = constants.IEC_GZIP + # Disable compression for all moves as these are all within the same cluster + compress = constants.IEC_NONE logging.debug("Source node %s, destination node %s, compression '%s'", src_node, dest_node, compress) - opts = objects.ImportExportOptions(key_name=None, ca_pem=None, - compress=compress) - timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, - src_node, None, dest_node, dest_ip, opts) + src_node, None, dest_node, dest_ip) dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, - src_node, src_cbs, dest_node, dest_ip, opts) + src_node, src_cbs, dest_node, dest_ip) all_dtp = [] + base_magic = utils.GenerateSecret(6) + ieloop = ImportExportLoop(lu) try: - for transfer in all_transfers: + for idx, transfer in enumerate(all_transfers): if transfer: feedback_fn("Exporting %s from %s to %s" % (transfer.name, src_node, dest_node)) - dtp = _DiskTransferPrivate(transfer, True) + magic = _GetInstDiskMagic(base_magic, instance.name, idx) + opts = objects.ImportExportOptions(key_name=None, ca_pem=None, + compress=compress, magic=magic) + + dtp = _DiskTransferPrivate(transfer, True, opts) di = DiskImport(lu, dest_node, opts, instance, transfer.dest_io, transfer.dest_ioargs, @@ -1025,7 +1052,7 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, dtp.dest_import = di else: - dtp = _DiskTransferPrivate(None, False) + dtp = _DiskTransferPrivate(None, False, None) all_dtp.append(dtp) @@ -1034,11 +1061,11 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, ieloop.FinalizeAll() assert len(all_dtp) == len(all_transfers) - assert compat.all([(dtp.src_export is None or + assert compat.all((dtp.src_export is None or dtp.src_export.success is not None) and (dtp.dest_import is None or dtp.dest_import.success is not None) - for dtp in all_dtp]), \ + for dtp in all_dtp), \ "Not all imports/exports are finalized" return [bool(dtp.success) for dtp in all_dtp] @@ -1220,13 +1247,15 @@ class ExportInstanceHelper: return (fin_resu, dresults) - def RemoteExport(self, opts, disk_info, timeouts): + def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts): """Inter-cluster instance export. - @type opts: L{objects.ImportExportOptions} - @param opts: Import/export daemon options @type disk_info: list @param disk_info: Per-disk destination information + @type key_name: string + @param key_name: Name of X509 key to use + @type dest_ca_pem: string + @param dest_ca_pem: Destination X509 CA in PEM format @type timeouts: L{ImportExportTimeouts} @param timeouts: Timeouts for this import @@ -1239,8 +1268,12 @@ class ExportInstanceHelper: ieloop = ImportExportLoop(self._lu) try: - for idx, (dev, (host, port)) in enumerate(zip(instance.disks, - disk_info)): + for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, + disk_info)): + opts = objects.ImportExportOptions(key_name=key_name, + ca_pem=dest_ca_pem, + magic=magic) + self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) finished_fn = compat.partial(self._TransferFinished, idx) ieloop.Add(DiskExport(self._lu, instance.primary_node, @@ -1319,9 +1352,9 @@ class _RemoteImportCb(ImportExportCbBase): host = self._external_address disks = [] - for idx, port in enumerate(self._daemon_port): + for idx, (port, magic) in enumerate(self._daemon_port): disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, - idx, host, port)) + idx, host, port, magic)) assert len(disks) == self._disk_count @@ -1340,7 +1373,7 @@ class _RemoteImportCb(ImportExportCbBase): assert self._daemon_port[idx] is None - self._daemon_port[idx] = ie.listen_port + self._daemon_port[idx] = (ie.listen_port, ie.magic) self._CheckAllListening() @@ -1389,6 +1422,8 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts): source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, source_x509_ca) + magic_base = utils.GenerateSecret(6) + # Create crypto key result = lu.rpc.call_x509_cert_create(instance.primary_node, constants.RIE_CERT_VALIDITY) @@ -1400,10 +1435,6 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts): x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, x509_cert_pem) - # Import daemon options - opts = objects.ImportExportOptions(key_name=x509_key_name, - ca_pem=source_ca_pem) - # Sign certificate signed_x509_cert_pem = \ utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) @@ -1414,6 +1445,13 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts): ieloop = ImportExportLoop(lu) try: for idx, dev in enumerate(instance.disks): + magic = _GetInstDiskMagic(magic_base, instance.name, idx) + + # Import daemon options + opts = objects.ImportExportOptions(key_name=x509_key_name, + ca_pem=source_ca_pem, + magic=magic) + ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, constants.IEIO_SCRIPT, (dev, idx), timeouts, cbs, private=(idx, ))) @@ -1476,7 +1514,7 @@ def CheckRemoteExportHandshake(cds, handshake): return None -def _GetRieDiskInfoMessage(disk_index, host, port): +def _GetRieDiskInfoMessage(disk_index, host, port, magic): """Returns the hashed text for import/export disk information. @type disk_index: number @@ -1485,9 +1523,11 @@ def _GetRieDiskInfoMessage(disk_index, host, port): @param host: Hostname @type port: number @param port: Daemon port + @type magic: string + @param magic: Magic value """ - return "%s:%s:%s" % (disk_index, host, port) + return "%s:%s:%s:%s" % (disk_index, host, port, magic) def CheckRemoteExportDiskInfo(cds, disk_index, disk_info): @@ -1502,23 +1542,24 @@ def CheckRemoteExportDiskInfo(cds, disk_index, disk_info): """ try: - (host, port, hmac_digest, hmac_salt) = disk_info + (host, port, magic, hmac_digest, hmac_salt) = disk_info except (TypeError, ValueError), err: raise errors.GenericError("Invalid data: %s" % err) - if not (host and port): - raise errors.GenericError("Missing destination host or port") + if not (host and port and magic): + raise errors.GenericError("Missing destination host, port or magic") - msg = _GetRieDiskInfoMessage(disk_index, host, port) + msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): raise errors.GenericError("HMAC is wrong") - return (utils.HostInfo.NormalizeName(host), - utils.ValidateServiceName(port)) + return (netutils.HostInfo.NormalizeName(host), + utils.ValidateServiceName(port), + magic) -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port): +def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic): """Computes the signed disk information for a remote import. @type cds: string @@ -1531,8 +1572,10 @@ def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port): @param host: Hostname @type port: number @param port: Daemon port + @type magic: string + @param magic: Magic value """ - msg = _GetRieDiskInfoMessage(disk_index, host, port) + msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) - return (host, port, hmac_digest, salt) + return (host, port, magic, hmac_digest, salt)