Revision 4a96f1d1 lib/masterd/instance.py

b/lib/masterd/instance.py
25 25

  
26 26
import logging
27 27
import time
28
import OpenSSL
28 29

  
29 30
from ganeti import constants
30 31
from ganeti import errors
......
966 967
  return [bool(dtp.success) for dtp in all_dtp]
967 968

  
968 969

  
970
class _RemoteExportCb(ImportExportCbBase):
971
  def __init__(self, feedback_fn, disk_count):
972
    """Initializes this class.
973

  
974
    """
975
    ImportExportCbBase.__init__(self)
976
    self._feedback_fn = feedback_fn
977
    self._dresults = [None] * disk_count
978

  
979
  @property
980
  def disk_results(self):
981
    """Returns per-disk results.
982

  
983
    """
984
    return self._dresults
985

  
986
  def ReportConnected(self, ie, private):
987
    """Called when a connection has been established.
988

  
989
    """
990
    (idx, _) = private
991

  
992
    self._feedback_fn("Disk %s is now sending data" % idx)
993

  
994
  def ReportFinished(self, ie, private):
995
    """Called when a transfer has finished.
996

  
997
    """
998
    (idx, finished_fn) = private
999

  
1000
    if ie.success:
1001
      self._feedback_fn("Disk %s finished sending data" % idx)
1002
    else:
1003
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1004
                        (idx, ie.final_message, ie.recent_output))
1005

  
1006
    self._dresults[idx] = bool(ie.success)
1007

  
1008
    if finished_fn:
1009
      finished_fn()
1010

  
1011

  
969 1012
class ExportInstanceHelper:
970 1013
  def __init__(self, lu, feedback_fn, instance):
971 1014
    """Initializes this class.
......
1088 1131

  
1089 1132
    return (fin_resu, dresults)
1090 1133

  
1134
  def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
1135
    """Inter-cluster instance export.
1136

  
1137
    @type x509_key_name: string
1138
    @param x509_key_name: X509 key name for encrypting data
1139
    @type dest_x509_ca: OpenSSL.crypto.X509
1140
    @param dest_x509_ca: Remote peer X509 CA object
1141
    @type disk_info: list
1142
    @param disk_info: Per-disk destination information
1143
    @type timeouts: L{ImportExportTimeouts}
1144
    @param timeouts: Timeouts for this import
1145

  
1146
    """
1147
    instance = self._instance
1148

  
1149
    assert len(disk_info) == len(instance.disks)
1150

  
1151
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1152

  
1153
    dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1154
                                                  dest_x509_ca)
1155

  
1156
    ieloop = ImportExportLoop(self._lu)
1157
    try:
1158
      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1159
                                                          disk_info)):
1160
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1161
        finished_fn = compat.partial(self._TransferFinished, idx)
1162
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1163
                              x509_key_name, dest_ca_pem, host, port, instance,
1164
                              constants.IEIO_SCRIPT, (dev, idx),
1165
                              timeouts, cbs, private=(idx, finished_fn)))
1166

  
1167
      ieloop.Run()
1168
    finally:
1169
      ieloop.FinalizeAll()
1170

  
1171
    return (True, cbs.disk_results)
1172

  
1091 1173
  def _TransferFinished(self, idx):
1092 1174
    """Called once a transfer has finished.
1093 1175

  
......
1152 1234
            (constants.RIE_VERSION, version))
1153 1235

  
1154 1236
  return None
1237

  
1238

  
1239
def _GetRieDiskInfoMessage(disk_index, host, port):
1240
  """Returns the hashed text for import/export disk information.
1241

  
1242
  @type disk_index: number
1243
  @param disk_index: Index of disk (included in hash)
1244
  @type host: string
1245
  @param host: Hostname
1246
  @type port: number
1247
  @param port: Daemon port
1248

  
1249
  """
1250
  return "%s:%s:%s" % (disk_index, host, port)
1251

  
1252

  
1253
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1254
  """Verifies received disk information for an export.
1255

  
1256
  @type cds: string
1257
  @param cds: Cluster domain secret
1258
  @type disk_index: number
1259
  @param disk_index: Index of disk (included in hash)
1260
  @type disk_info: sequence
1261
  @param disk_info: Disk information sent by remote peer
1262

  
1263
  """
1264
  try:
1265
    (host, port, hmac_digest, hmac_salt) = disk_info
1266
  except (TypeError, ValueError), err:
1267
    raise errors.GenericError("Invalid data: %s" % err)
1268

  
1269
  if not (host and port):
1270
    raise errors.GenericError("Missing destination host or port")
1271

  
1272
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1273

  
1274
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1275
    raise errors.GenericError("HMAC is wrong")
1276

  
1277
  return (host, port)
1278

  
1279

  
1280
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1281
  """Computes the signed disk information for a remote import.
1282

  
1283
  @type cds: string
1284
  @param cds: Cluster domain secret
1285
  @type salt: string
1286
  @param salt: HMAC salt
1287
  @type disk_index: number
1288
  @param disk_index: Index of disk (included in hash)
1289
  @type host: string
1290
  @param host: Hostname
1291
  @type port: number
1292
  @param port: Daemon port
1293

  
1294
  """
1295
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1296
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1297
  return (host, port, hmac_digest, salt)

Also available in: Unified diff