X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/5bb9557265607638a87c78ef03665d32a406db98..6f1e192196d30ca1afcbff5c534619480d56225a:/lib/masterd/instance.py diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py index df0b187..d99f4d8 100644 --- a/lib/masterd/instance.py +++ b/lib/masterd/instance.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2010 Google Inc. +# Copyright (C) 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -32,6 +32,8 @@ from ganeti import errors from ganeti import compat from ganeti import utils from ganeti import objects +from ganeti import netutils +from ganeti import pathutils class _ImportExportError(Exception): @@ -91,12 +93,13 @@ class ImportExportCbBase(object): """Callbacks for disk import/export. """ - def ReportListening(self, ie, private): + def ReportListening(self, ie, private, component): """Called when daemon started listening. @type ie: Subclass of L{_DiskImportExportBase} @param ie: Import/export object @param private: Private data passed to import/export object + @param component: transfer component name """ @@ -128,18 +131,11 @@ class ImportExportCbBase(object): """ -def _TimeoutExpired(epoch, timeout, _time_fn=time.time): - """Checks whether a timeout has expired. - - """ - return _time_fn() > (epoch + timeout) - - class _DiskImportExportBase(object): MODE_TEXT = None def __init__(self, lu, node_name, opts, - instance, timeouts, cbs, private=None): + instance, component, timeouts, cbs, private=None): """Initializes this class. @param lu: Logical unit instance @@ -149,6 +145,8 @@ class _DiskImportExportBase(object): @param opts: Import/export daemon options @type instance: L{objects.Instance} @param instance: Instance object + @type component: string + @param component: which part of the instance is being imported @type timeouts: L{ImportExportTimeouts} @param timeouts: Timeouts for this import @type cbs: L{ImportExportCbBase} @@ -160,12 +158,17 @@ class _DiskImportExportBase(object): self._lu = lu self.node_name = node_name - self._opts = opts + self._opts = opts.Copy() self._instance = instance + self._component = component self._timeouts = timeouts self._cbs = cbs self._private = private + # Set master daemon's timeout in options for import/export daemon + assert self._opts.connect_timeout is None + self._opts.connect_timeout = timeouts.connect + # Parent loop self._loop = None @@ -191,7 +194,7 @@ class _DiskImportExportBase(object): """ if self._daemon: - return self._daemon.recent_output + return "\n".join(self._daemon.recent_output) return None @@ -268,7 +271,7 @@ class _DiskImportExportBase(object): daemon_name = result.payload - logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name, + logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name, self.node_name) self._ts_begin = time.time() @@ -289,11 +292,11 @@ class _DiskImportExportBase(object): """ if self._daemon_name: - self._lu.LogWarning("Aborting %s %r on %s", + self._lu.LogWarning("Aborting %s '%s' on %s", self.MODE_TEXT, self._daemon_name, self.node_name) result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) if result.fail_msg: - self._lu.LogWarning("Failed to abort %s %r on %s: %s", + self._lu.LogWarning("Failed to abort %s '%s' on %s: %s", self.MODE_TEXT, self._daemon_name, self.node_name, result.fail_msg) return False @@ -310,7 +313,7 @@ class _DiskImportExportBase(object): assert self._ts_begin is not None if not data: - if _TimeoutExpired(self._ts_begin, self._timeouts.ready): + if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready): raise _ImportExportError("Didn't become ready after %s seconds" % self._timeouts.ready) @@ -333,7 +336,7 @@ class _DiskImportExportBase(object): if self._ts_last_error is None: self._ts_last_error = time.time() - elif _TimeoutExpired(self._ts_last_error, self._timeouts.error): + elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error): raise _ImportExportError("Too many errors while updating data") return False @@ -370,14 +373,15 @@ class _DiskImportExportBase(object): self._ts_connected = time.time() # TODO: Log remote peer - logging.debug("%s %r on %s is now connected", + logging.debug("%s '%s' on %s is now connected", self.MODE_TEXT, self._daemon_name, self.node_name) self._cbs.ReportConnected(self, self._private) return True - if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect): + if utils.TimeoutExpired(self._GetConnectedCheckEpoch(), + self._timeouts.connect): raise _ImportExportError("Not connected after %s seconds" % self._timeouts.connect) @@ -388,7 +392,8 @@ class _DiskImportExportBase(object): """ if ((self._ts_last_progress is None or - _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and + utils.TimeoutExpired(self._ts_last_progress, + self._timeouts.progress)) and self._daemon and self._daemon.progress_mbytes is not None and self._daemon.progress_throughput is not None): @@ -434,10 +439,10 @@ class _DiskImportExportBase(object): self.final_message = message if success: - logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name, - self.node_name) + logging.info("%s '%s' on %s succeeded", self.MODE_TEXT, + self._daemon_name, self.node_name) elif self._daemon_name: - self._lu.LogWarning("%s %r on %s failed: %s", + self._lu.LogWarning("%s '%s' on %s failed: %s", self.MODE_TEXT, self._daemon_name, self.node_name, message) else: @@ -457,12 +462,12 @@ class _DiskImportExportBase(object): """ if self._daemon_name: - logging.info("Finalizing %s %r on %s", + logging.info("Finalizing %s '%s' on %s", self.MODE_TEXT, self._daemon_name, self.node_name) result = self._Finalize() if result.fail_msg: - self._lu.LogWarning("Failed to finalize %s %r on %s: %s", + self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s", self.MODE_TEXT, self._daemon_name, self.node_name, result.fail_msg) return False @@ -480,7 +485,7 @@ class _DiskImportExportBase(object): class DiskImport(_DiskImportExportBase): MODE_TEXT = "import" - def __init__(self, lu, node_name, opts, instance, + def __init__(self, lu, node_name, opts, instance, component, dest, dest_args, timeouts, cbs, private=None): """Initializes this class. @@ -491,6 +496,8 @@ class DiskImport(_DiskImportExportBase): @param opts: Import/export daemon options @type instance: L{objects.Instance} @param instance: Instance object + @type component: string + @param component: which part of the instance is being imported @param dest: I/O destination @param dest_args: I/O arguments @type timeouts: L{ImportExportTimeouts} @@ -500,8 +507,8 @@ class DiskImport(_DiskImportExportBase): @param private: Private data for callback functions """ - _DiskImportExportBase.__init__(self, lu, node_name, opts, - instance, timeouts, cbs, private) + _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, + component, timeouts, cbs, private) self._dest = dest self._dest_args = dest_args @@ -523,8 +530,8 @@ class DiskImport(_DiskImportExportBase): """ return self._lu.rpc.call_import_start(self.node_name, self._opts, - self._instance, - self._dest, self._dest_args) + self._instance, self._component, + (self._dest, self._dest_args)) def CheckListening(self): """Checks whether the daemon is listening. @@ -542,14 +549,14 @@ class DiskImport(_DiskImportExportBase): if port is not None: self._ts_listening = time.time() - logging.debug("Import %r on %s is now listening on port %s", + logging.debug("Import '%s' on %s is now listening on port %s", self._daemon_name, self.node_name, port) - self._cbs.ReportListening(self, self._private) + self._cbs.ReportListening(self, self._private, self._component) return True - if _TimeoutExpired(self._ts_begin, self._timeouts.listen): + if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen): raise _ImportExportError("Not listening after %s seconds" % self._timeouts.listen) @@ -569,8 +576,8 @@ class DiskImport(_DiskImportExportBase): class DiskExport(_DiskImportExportBase): MODE_TEXT = "export" - def __init__(self, lu, node_name, opts, - dest_host, dest_port, instance, source, source_args, + def __init__(self, lu, node_name, opts, dest_host, dest_port, + instance, component, source, source_args, timeouts, cbs, private=None): """Initializes this class. @@ -585,6 +592,8 @@ class DiskExport(_DiskImportExportBase): @param dest_port: Destination port number @type instance: L{objects.Instance} @param instance: Instance object + @type component: string + @param component: which part of the instance is being imported @param source: I/O source @param source_args: I/O source @type timeouts: L{ImportExportTimeouts} @@ -594,8 +603,8 @@ class DiskExport(_DiskImportExportBase): @param private: Private data for callback functions """ - _DiskImportExportBase.__init__(self, lu, node_name, opts, - instance, timeouts, cbs, private) + _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, + component, timeouts, cbs, private) self._dest_host = dest_host self._dest_port = dest_port self._source = source @@ -607,8 +616,8 @@ class DiskExport(_DiskImportExportBase): """ return self._lu.rpc.call_export_start(self.node_name, self._opts, self._dest_host, self._dest_port, - self._instance, self._source, - self._source_args) + self._instance, self._component, + (self._source, self._source_args)) def CheckListening(self): """Checks whether the daemon is listening. @@ -793,7 +802,7 @@ class ImportExportLoop: logging.exception("%s failed", diskie.MODE_TEXT) diskie.Finalize(error=str(err)) - if not compat.any([diskie.active for diskie in self._queue]): + if not compat.any(diskie.active for diskie in self._queue): break # Wait a bit @@ -815,7 +824,7 @@ class ImportExportLoop: class _TransferInstCbBase(ImportExportCbBase): def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, - dest_node, dest_ip, export_opts): + dest_node, dest_ip): """Initializes this class. """ @@ -829,7 +838,6 @@ class _TransferInstCbBase(ImportExportCbBase): self.src_cbs = src_cbs self.dest_node = dest_node self.dest_ip = dest_ip - self.export_opts = export_opts class _TransferInstSourceCb(_TransferInstCbBase): @@ -865,7 +873,7 @@ class _TransferInstSourceCb(_TransferInstCbBase): if ie.success: self.feedback_fn("%s finished sending data" % dtp.data.name) else: - self.feedback_fn("%s failed to send data: %s (recent output: %r)" % + self.feedback_fn("%s failed to send data: %s (recent output: %s)" % (dtp.data.name, ie.final_message, ie.recent_output)) dtp.RecordResult(ie.success) @@ -881,20 +889,21 @@ class _TransferInstSourceCb(_TransferInstCbBase): class _TransferInstDestCb(_TransferInstCbBase): - def ReportListening(self, ie, dtp): + def ReportListening(self, ie, dtp, component): """Called when daemon started listening. """ assert self.src_cbs assert dtp.src_export is None assert dtp.dest_import + assert dtp.export_opts self.feedback_fn("%s is now listening, starting export" % dtp.data.name) # Start export on source node - de = DiskExport(self.lu, self.src_node, self.export_opts, - self.dest_ip, ie.listen_port, - self.instance, dtp.data.src_io, dtp.data.src_ioargs, + de = DiskExport(self.lu, self.src_node, dtp.export_opts, + self.dest_ip, ie.listen_port, self.instance, + component, dtp.data.src_io, dtp.data.src_ioargs, self.timeouts, self.src_cbs, private=dtp) ie.loop.Add(de) @@ -914,7 +923,7 @@ class _TransferInstDestCb(_TransferInstCbBase): if ie.success: self.feedback_fn("%s finished receiving data" % dtp.data.name) else: - self.feedback_fn("%s failed to receive data: %s (recent output: %r)" % + self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % (dtp.data.name, ie.final_message, ie.recent_output)) dtp.RecordResult(ie.success) @@ -952,7 +961,7 @@ class DiskTransfer(object): class _DiskTransferPrivate(object): - def __init__(self, data, success): + def __init__(self, data, success, export_opts): """Initializes this class. @type data: L{DiskTransfer} @@ -960,12 +969,12 @@ class _DiskTransferPrivate(object): """ self.data = data + self.success = success + self.export_opts = export_opts self.src_export = None self.dest_import = None - self.success = success - def RecordResult(self, success): """Updates the status. @@ -975,6 +984,25 @@ class _DiskTransferPrivate(object): self.success = self.success and success +def _GetInstDiskMagic(base, instance_name, index): + """Computes the magic value for a disk export or import. + + @type base: string + @param base: Random seed value (can be the same for all disks of a transfer) + @type instance_name: string + @param instance_name: Name of instance + @type index: number + @param index: Disk index + + """ + h = compat.sha1_hash() + h.update(str(constants.RIE_VERSION)) + h.update(base) + h.update(instance_name) + h.update(str(index)) + return h.hexdigest() + + def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, instance, all_transfers): """Transfers an instance's data from one node to another. @@ -1002,34 +1030,37 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, logging.debug("Source node %s, destination node %s, compression '%s'", src_node, dest_node, compress) - opts = objects.ImportExportOptions(key_name=None, ca_pem=None, - compress=compress) - timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, - src_node, None, dest_node, dest_ip, opts) + src_node, None, dest_node, dest_ip) dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, - src_node, src_cbs, dest_node, dest_ip, opts) + src_node, src_cbs, dest_node, dest_ip) all_dtp = [] + base_magic = utils.GenerateSecret(6) + ieloop = ImportExportLoop(lu) try: - for transfer in all_transfers: + for idx, transfer in enumerate(all_transfers): if transfer: feedback_fn("Exporting %s from %s to %s" % (transfer.name, src_node, dest_node)) - dtp = _DiskTransferPrivate(transfer, True) + magic = _GetInstDiskMagic(base_magic, instance.name, idx) + opts = objects.ImportExportOptions(key_name=None, ca_pem=None, + compress=compress, magic=magic) + + dtp = _DiskTransferPrivate(transfer, True, opts) - di = DiskImport(lu, dest_node, opts, instance, + di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx, transfer.dest_io, transfer.dest_ioargs, timeouts, dest_cbs, private=dtp) ieloop.Add(di) dtp.dest_import = di else: - dtp = _DiskTransferPrivate(None, False) + dtp = _DiskTransferPrivate(None, False, None) all_dtp.append(dtp) @@ -1038,11 +1069,11 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, ieloop.FinalizeAll() assert len(all_dtp) == len(all_transfers) - assert compat.all([(dtp.src_export is None or + assert compat.all((dtp.src_export is None or dtp.src_export.success is not None) and (dtp.dest_import is None or dtp.dest_import.success is not None) - for dtp in all_dtp]), \ + for dtp in all_dtp), \ "Not all imports/exports are finalized" return [bool(dtp.success) for dtp in all_dtp] @@ -1093,7 +1124,7 @@ class _RemoteExportCb(ImportExportCbBase): if ie.success: self._feedback_fn("Disk %s finished sending data" % idx) else: - self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" % + self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % (idx, ie.final_message, ie.recent_output)) self._dresults[idx] = bool(ie.success) @@ -1128,25 +1159,29 @@ class ExportInstanceHelper: instance = self._instance src_node = instance.primary_node - vgname = self._lu.cfg.GetVGName() - for idx, disk in enumerate(instance.disks): self._feedback_fn("Creating a snapshot of disk/%s on node %s" % (idx, src_node)) # result.payload will be a snapshot of an lvm leaf of the one we # passed - result = self._lu.rpc.call_blockdev_snapshot(src_node, disk) + result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance)) + new_dev = False msg = result.fail_msg if msg: self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", idx, src_node, msg) - new_dev = False + elif (not isinstance(result.payload, (tuple, list)) or + len(result.payload) != 2): + self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" + " result '%s'", idx, src_node, result.payload) else: - disk_id = (vgname, result.payload) + disk_id = tuple(result.payload) + disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy() new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) + iv_name=disk.iv_name, + params=disk_params) self._snap_disks.append(new_dev) @@ -1193,7 +1228,7 @@ class ExportInstanceHelper: transfers.append(None) continue - path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name, dev.physical_id[1]) finished_fn = compat.partial(self._TransferFinished, idx) @@ -1224,13 +1259,15 @@ class ExportInstanceHelper: return (fin_resu, dresults) - def RemoteExport(self, opts, disk_info, timeouts): + def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts): """Inter-cluster instance export. - @type opts: L{objects.ImportExportOptions} - @param opts: Import/export daemon options @type disk_info: list @param disk_info: Per-disk destination information + @type key_name: string + @param key_name: Name of X509 key to use + @type dest_ca_pem: string + @param dest_ca_pem: Destination X509 CA in PEM format @type timeouts: L{ImportExportTimeouts} @param timeouts: Timeouts for this import @@ -1243,12 +1280,19 @@ class ExportInstanceHelper: ieloop = ImportExportLoop(self._lu) try: - for idx, (dev, (host, port)) in enumerate(zip(instance.disks, - disk_info)): + for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, + disk_info)): + # Decide whether to use IPv6 + ipv6 = netutils.IP6Address.IsValid(host) + + opts = objects.ImportExportOptions(key_name=key_name, + ca_pem=dest_ca_pem, + magic=magic, ipv6=ipv6) + self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) finished_fn = compat.partial(self._TransferFinished, idx) ieloop.Add(DiskExport(self._lu, instance.primary_node, - opts, host, port, instance, + opts, host, port, instance, "disk%d" % idx, constants.IEIO_SCRIPT, (dev, idx), timeouts, cbs, private=(idx, finished_fn))) @@ -1323,9 +1367,9 @@ class _RemoteImportCb(ImportExportCbBase): host = self._external_address disks = [] - for idx, port in enumerate(self._daemon_port): + for idx, (port, magic) in enumerate(self._daemon_port): disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, - idx, host, port)) + idx, host, port, magic)) assert len(disks) == self._disk_count @@ -1334,7 +1378,7 @@ class _RemoteImportCb(ImportExportCbBase): "x509_ca": self._x509_cert_pem, }) - def ReportListening(self, ie, private): + def ReportListening(self, ie, private, _): """Called when daemon started listening. """ @@ -1344,7 +1388,7 @@ class _RemoteImportCb(ImportExportCbBase): assert self._daemon_port[idx] is None - self._daemon_port[idx] = ie.listen_port + self._daemon_port[idx] = (ie.listen_port, ie.magic) self._CheckAllListening() @@ -1369,19 +1413,22 @@ class _RemoteImportCb(ImportExportCbBase): self._feedback_fn("Disk %s finished receiving data" % idx) else: self._feedback_fn(("Disk %s failed to receive data: %s" - " (recent output: %r)") % + " (recent output: %s)") % (idx, ie.final_message, ie.recent_output)) self._dresults[idx] = bool(ie.success) -def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts): +def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, + cds, timeouts): """Imports an instance from another cluster. @param lu: Logical unit instance @param feedback_fn: Feedback function @type instance: L{objects.Instance} @param instance: Instance object + @type pnode: L{objects.Node} + @param pnode: Primary node of instance as an object @type source_x509_ca: OpenSSL.crypto.X509 @param source_x509_ca: Import source's X509 CA @type cds: string @@ -1393,6 +1440,11 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts): source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, source_x509_ca) + magic_base = utils.GenerateSecret(6) + + # Decide whether to use IPv6 + ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) + # Create crypto key result = lu.rpc.call_x509_cert_create(instance.primary_node, constants.RIE_CERT_VALIDITY) @@ -1404,21 +1456,25 @@ def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts): x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, x509_cert_pem) - # Import daemon options - opts = objects.ImportExportOptions(key_name=x509_key_name, - ca_pem=source_ca_pem) - # Sign certificate signed_x509_cert_pem = \ utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, - len(instance.disks), instance.primary_node) + len(instance.disks), pnode.primary_ip) ieloop = ImportExportLoop(lu) try: for idx, dev in enumerate(instance.disks): + magic = _GetInstDiskMagic(magic_base, instance.name, idx) + + # Import daemon options + opts = objects.ImportExportOptions(key_name=x509_key_name, + ca_pem=source_ca_pem, + magic=magic, ipv6=ipv6) + ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, + "disk%d" % idx, constants.IEIO_SCRIPT, (dev, idx), timeouts, cbs, private=(idx, ))) @@ -1480,7 +1536,7 @@ def CheckRemoteExportHandshake(cds, handshake): return None -def _GetRieDiskInfoMessage(disk_index, host, port): +def _GetRieDiskInfoMessage(disk_index, host, port, magic): """Returns the hashed text for import/export disk information. @type disk_index: number @@ -1489,9 +1545,11 @@ def _GetRieDiskInfoMessage(disk_index, host, port): @param host: Hostname @type port: number @param port: Daemon port + @type magic: string + @param magic: Magic value """ - return "%s:%s:%s" % (disk_index, host, port) + return "%s:%s:%s:%s" % (disk_index, host, port, magic) def CheckRemoteExportDiskInfo(cds, disk_index, disk_info): @@ -1506,23 +1564,29 @@ def CheckRemoteExportDiskInfo(cds, disk_index, disk_info): """ try: - (host, port, hmac_digest, hmac_salt) = disk_info + (host, port, magic, hmac_digest, hmac_salt) = disk_info except (TypeError, ValueError), err: raise errors.GenericError("Invalid data: %s" % err) - if not (host and port): - raise errors.GenericError("Missing destination host or port") + if not (host and port and magic): + raise errors.GenericError("Missing destination host, port or magic") - msg = _GetRieDiskInfoMessage(disk_index, host, port) + msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): raise errors.GenericError("HMAC is wrong") - return (utils.HostInfo.NormalizeName(host), - utils.ValidateServiceName(port)) + if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): + destination = host + else: + destination = netutils.Hostname.GetNormalizedName(host) + return (destination, + utils.ValidateServiceName(port), + magic) -def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port): + +def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic): """Computes the signed disk information for a remote import. @type cds: string @@ -1535,8 +1599,42 @@ def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port): @param host: Hostname @type port: number @param port: Daemon port + @type magic: string + @param magic: Magic value """ - msg = _GetRieDiskInfoMessage(disk_index, host, port) + msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) - return (host, port, hmac_digest, salt) + return (host, port, magic, hmac_digest, salt) + + +def CalculateGroupIPolicy(cluster, group): + """Calculate instance policy for group. + + """ + return cluster.SimpleFillIPolicy(group.ipolicy) + + +def ComputeDiskSize(disk_template, disks): + """Compute disk size requirements according to disk template + + """ + # Required free disk space as a function of disk and swap space + req_size_dict = { + constants.DT_DISKLESS: None, + constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), + # 128 MB are added for drbd metadata for each disk + constants.DT_DRBD8: + sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks), + constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks), + constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks), + constants.DT_BLOCK: 0, + constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks), + constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks), + } + + if disk_template not in req_size_dict: + raise errors.ProgrammerError("Disk template '%s' size requirement" + " is unknown" % disk_template) + + return req_size_dict[disk_template]