Revision 9bf56d77 lib/masterd/instance.py

b/lib/masterd/instance.py
1189 1189
      self._RemoveSnapshot(idx)
1190 1190

  
1191 1191

  
1192
class _RemoteImportCb(ImportExportCbBase):
1193
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1194
               external_address):
1195
    """Initializes this class.
1196

  
1197
    @type cds: string
1198
    @param cds: Cluster domain secret
1199
    @type x509_cert_pem: string
1200
    @param x509_cert_pem: CA used for signing import key
1201
    @type disk_count: number
1202
    @param disk_count: Number of disks
1203
    @type external_address: string
1204
    @param external_address: External address of destination node
1205

  
1206
    """
1207
    ImportExportCbBase.__init__(self)
1208
    self._feedback_fn = feedback_fn
1209
    self._cds = cds
1210
    self._x509_cert_pem = x509_cert_pem
1211
    self._disk_count = disk_count
1212
    self._external_address = external_address
1213

  
1214
    self._dresults = [None] * disk_count
1215
    self._daemon_port = [None] * disk_count
1216

  
1217
    self._salt = utils.GenerateSecret(8)
1218

  
1219
  @property
1220
  def disk_results(self):
1221
    """Returns per-disk results.
1222

  
1223
    """
1224
    return self._dresults
1225

  
1226
  def _CheckAllListening(self):
1227
    """Checks whether all daemons are listening.
1228

  
1229
    If all daemons are listening, the information is sent to the client.
1230

  
1231
    """
1232
    if not compat.all(dp is not None for dp in self._daemon_port):
1233
      return
1234

  
1235
    host = self._external_address
1236

  
1237
    disks = []
1238
    for idx, port in enumerate(self._daemon_port):
1239
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1240
                                               idx, host, port))
1241

  
1242
    assert len(disks) == self._disk_count
1243

  
1244
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1245
      "disks": disks,
1246
      "x509_ca": self._x509_cert_pem,
1247
      })
1248

  
1249
  def ReportListening(self, ie, private):
1250
    """Called when daemon started listening.
1251

  
1252
    """
1253
    (idx, ) = private
1254

  
1255
    self._feedback_fn("Disk %s is now listening" % idx)
1256

  
1257
    assert self._daemon_port[idx] is None
1258

  
1259
    self._daemon_port[idx] = ie.listen_port
1260

  
1261
    self._CheckAllListening()
1262

  
1263
  def ReportConnected(self, ie, private):
1264
    """Called when a connection has been established.
1265

  
1266
    """
1267
    (idx, ) = private
1268

  
1269
    self._feedback_fn("Disk %s is now receiving data" % idx)
1270

  
1271
  def ReportFinished(self, ie, private):
1272
    """Called when a transfer has finished.
1273

  
1274
    """
1275
    (idx, ) = private
1276

  
1277
    # Daemon is certainly no longer listening
1278
    self._daemon_port[idx] = None
1279

  
1280
    if ie.success:
1281
      self._feedback_fn("Disk %s finished receiving data" % idx)
1282
    else:
1283
      self._feedback_fn(("Disk %s failed to receive data: %s"
1284
                         " (recent output: %r)") %
1285
                        (idx, ie.final_message, ie.recent_output))
1286

  
1287
    self._dresults[idx] = bool(ie.success)
1288

  
1289

  
1290
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1291
  """Imports an instance from another cluster.
1292

  
1293
  @param lu: Logical unit instance
1294
  @param feedback_fn: Feedback function
1295
  @type instance: L{objects.Instance}
1296
  @param instance: Instance object
1297
  @type source_x509_ca: OpenSSL.crypto.X509
1298
  @param source_x509_ca: Import source's X509 CA
1299
  @type cds: string
1300
  @param cds: Cluster domain secret
1301
  @type timeouts: L{ImportExportTimeouts}
1302
  @param timeouts: Timeouts for this import
1303

  
1304
  """
1305
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1306
                                                  source_x509_ca)
1307

  
1308
  # Create crypto key
1309
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1310
                                        constants.RIE_CERT_VALIDITY)
1311
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1312

  
1313
  (x509_key_name, x509_cert_pem) = result.payload
1314
  try:
1315
    # Load certificate
1316
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1317
                                                x509_cert_pem)
1318

  
1319
    # Sign certificate
1320
    signed_x509_cert_pem = \
1321
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1322

  
1323
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1324
                          len(instance.disks), instance.primary_node)
1325

  
1326
    ieloop = ImportExportLoop(lu)
1327
    try:
1328
      for idx, dev in enumerate(instance.disks):
1329
        ieloop.Add(DiskImport(lu, instance.primary_node,
1330
                              x509_key_name, source_ca_pem, instance,
1331
                              constants.IEIO_SCRIPT, (dev, idx),
1332
                              timeouts, cbs, private=(idx, )))
1333

  
1334
      ieloop.Run()
1335
    finally:
1336
      ieloop.FinalizeAll()
1337
  finally:
1338
    # Remove crypto key and certificate
1339
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1340
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1341

  
1342
  return cbs.disk_results
1343

  
1344

  
1192 1345
def _GetImportExportHandshakeMessage(version):
1193 1346
  """Returns the handshake message for a RIE protocol version.
1194 1347

  

Also available in: Unified diff