Revision 9bf56d77

b/lib/cmdlib.py
6103 6103
    self.adopt_disks = has_adopt
6104 6104

  
6105 6105
    # verify creation mode
6106
    if self.op.mode not in (constants.INSTANCE_CREATE,
6107
                            constants.INSTANCE_IMPORT):
6106
    if self.op.mode not in constants.INSTANCE_CREATE_MODES:
6108 6107
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
6109 6108
                                 self.op.mode, errors.ECODE_INVAL)
6110 6109

  
......
6114 6113
      self.op.instance_name = self.hostname1.name
6115 6114
      # used in CheckPrereq for ip ping check
6116 6115
      self.check_ip = self.hostname1.ip
6116
    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
6117
      raise errors.OpPrereqError("Remote imports require names to be checked" %
6118
                                 errors.ECODE_INVAL)
6117 6119
    else:
6118 6120
      self.check_ip = None
6119 6121

  
......
6133 6135
                                 " node must be given",
6134 6136
                                 errors.ECODE_INVAL)
6135 6137

  
6138
    self._cds = _GetClusterDomainSecret()
6139

  
6136 6140
    if self.op.mode == constants.INSTANCE_IMPORT:
6137 6141
      # On import force_variant must be True, because if we forced it at
6138 6142
      # initial install, our only chance when importing it back is that it
......
6142 6146
      if self.op.no_install:
6143 6147
        self.LogInfo("No-installation mode has no effect during import")
6144 6148

  
6145
    else: # INSTANCE_CREATE
6149
    elif self.op.mode == constants.INSTANCE_CREATE:
6146 6150
      if getattr(self.op, "os_type", None) is None:
6147 6151
        raise errors.OpPrereqError("No guest OS specified",
6148 6152
                                   errors.ECODE_INVAL)
......
6151 6155
        raise errors.OpPrereqError("No disk template specified",
6152 6156
                                   errors.ECODE_INVAL)
6153 6157

  
6158
    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
6159
      # Check handshake to ensure both clusters have the same domain secret
6160
      src_handshake = getattr(self.op, "source_handshake", None)
6161
      if not src_handshake:
6162
        raise errors.OpPrereqError("Missing source handshake",
6163
                                   errors.ECODE_INVAL)
6164

  
6165
      errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
6166
                                                           src_handshake)
6167
      if errmsg:
6168
        raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
6169
                                   errors.ECODE_INVAL)
6170

  
6171
      # Load and check source CA
6172
      self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
6173
      if not self.source_x509_ca_pem:
6174
        raise errors.OpPrereqError("Missing source X509 CA",
6175
                                   errors.ECODE_INVAL)
6176

  
6177
      try:
6178
        (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
6179
                                                    self._cds)
6180
      except OpenSSL.crypto.Error, err:
6181
        raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
6182
                                   (err, ), errors.ECODE_INVAL)
6183

  
6184
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
6185
      if errcode is not None:
6186
        raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
6187
                                   errors.ECODE_INVAL)
6188

  
6189
      self.source_x509_ca = cert
6190

  
6191
      src_instance_name = getattr(self.op, "source_instance_name", None)
6192
      if not src_instance_name:
6193
        raise errors.OpPrereqError("Missing source instance name",
6194
                                   errors.ECODE_INVAL)
6195

  
6196
      self.source_instance_name = \
6197
        utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
6198

  
6199
    else:
6200
      raise errors.OpPrereqError("Invalid instance creation mode %r" %
6201
                                 self.op.mode, errors.ECODE_INVAL)
6202

  
6154 6203
  def ExpandNames(self):
6155 6204
    """ExpandNames for CreateInstance.
6156 6205

  
......
6829 6878
          self.LogWarning("Some disks for instance %s on node %s were not"
6830 6879
                          " imported successfully" % (instance, pnode_name))
6831 6880

  
6881
      elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
6882
        feedback_fn("* preparing remote import...")
6883
        connect_timeout = constants.RIE_CONNECT_TIMEOUT
6884
        timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
6885

  
6886
        disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
6887
                                                     self.source_x509_ca,
6888
                                                     self._cds, timeouts)
6889
        if not compat.all(disk_results):
6890
          # TODO: Should the instance still be started, even if some disks
6891
          # failed to import (valid for local imports, too)?
6892
          self.LogWarning("Some disks for instance %s on node %s were not"
6893
                          " imported successfully" % (instance, pnode_name))
6894

  
6895
        # Run rename script on newly imported instance
6896
        assert iobj.name == instance
6897
        feedback_fn("Running rename script for %s" % instance)
6898
        result = self.rpc.call_instance_run_rename(pnode_name, iobj,
6899
                                                   self.source_instance_name,
6900
                                                   self.op.debug_level)
6901
        if result.fail_msg:
6902
          self.LogWarning("Failed to run rename script for %s on node"
6903
                          " %s: %s" % (instance, pnode_name, result.fail_msg))
6904

  
6832 6905
      else:
6833 6906
        # also checked in the prereq part
6834 6907
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
b/lib/constants.py
343 343
# instance creation modes
344 344
INSTANCE_CREATE = "create"
345 345
INSTANCE_IMPORT = "import"
346
INSTANCE_REMOTE_IMPORT = "remote-import"
347
INSTANCE_CREATE_MODES = frozenset([
348
  INSTANCE_CREATE,
349
  INSTANCE_IMPORT,
350
  INSTANCE_REMOTE_IMPORT,
351
  ])
346 352

  
347 353
# Remote import/export handshake message and version
348 354
RIE_VERSION = 0
......
703 709
# Execution log types
704 710
ELOG_MESSAGE = "message"
705 711
ELOG_PROGRESS = "progress"
712
ELOG_REMOTE_IMPORT = "remote-import"
706 713

  
707 714
# max dynamic devices
708 715
MAX_NICS = 8
b/lib/masterd/instance.py
1189 1189
      self._RemoveSnapshot(idx)
1190 1190

  
1191 1191

  
1192
class _RemoteImportCb(ImportExportCbBase):
1193
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1194
               external_address):
1195
    """Initializes this class.
1196

  
1197
    @type cds: string
1198
    @param cds: Cluster domain secret
1199
    @type x509_cert_pem: string
1200
    @param x509_cert_pem: CA used for signing import key
1201
    @type disk_count: number
1202
    @param disk_count: Number of disks
1203
    @type external_address: string
1204
    @param external_address: External address of destination node
1205

  
1206
    """
1207
    ImportExportCbBase.__init__(self)
1208
    self._feedback_fn = feedback_fn
1209
    self._cds = cds
1210
    self._x509_cert_pem = x509_cert_pem
1211
    self._disk_count = disk_count
1212
    self._external_address = external_address
1213

  
1214
    self._dresults = [None] * disk_count
1215
    self._daemon_port = [None] * disk_count
1216

  
1217
    self._salt = utils.GenerateSecret(8)
1218

  
1219
  @property
1220
  def disk_results(self):
1221
    """Returns per-disk results.
1222

  
1223
    """
1224
    return self._dresults
1225

  
1226
  def _CheckAllListening(self):
1227
    """Checks whether all daemons are listening.
1228

  
1229
    If all daemons are listening, the information is sent to the client.
1230

  
1231
    """
1232
    if not compat.all(dp is not None for dp in self._daemon_port):
1233
      return
1234

  
1235
    host = self._external_address
1236

  
1237
    disks = []
1238
    for idx, port in enumerate(self._daemon_port):
1239
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1240
                                               idx, host, port))
1241

  
1242
    assert len(disks) == self._disk_count
1243

  
1244
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1245
      "disks": disks,
1246
      "x509_ca": self._x509_cert_pem,
1247
      })
1248

  
1249
  def ReportListening(self, ie, private):
1250
    """Called when daemon started listening.
1251

  
1252
    """
1253
    (idx, ) = private
1254

  
1255
    self._feedback_fn("Disk %s is now listening" % idx)
1256

  
1257
    assert self._daemon_port[idx] is None
1258

  
1259
    self._daemon_port[idx] = ie.listen_port
1260

  
1261
    self._CheckAllListening()
1262

  
1263
  def ReportConnected(self, ie, private):
1264
    """Called when a connection has been established.
1265

  
1266
    """
1267
    (idx, ) = private
1268

  
1269
    self._feedback_fn("Disk %s is now receiving data" % idx)
1270

  
1271
  def ReportFinished(self, ie, private):
1272
    """Called when a transfer has finished.
1273

  
1274
    """
1275
    (idx, ) = private
1276

  
1277
    # Daemon is certainly no longer listening
1278
    self._daemon_port[idx] = None
1279

  
1280
    if ie.success:
1281
      self._feedback_fn("Disk %s finished receiving data" % idx)
1282
    else:
1283
      self._feedback_fn(("Disk %s failed to receive data: %s"
1284
                         " (recent output: %r)") %
1285
                        (idx, ie.final_message, ie.recent_output))
1286

  
1287
    self._dresults[idx] = bool(ie.success)
1288

  
1289

  
1290
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1291
  """Imports an instance from another cluster.
1292

  
1293
  @param lu: Logical unit instance
1294
  @param feedback_fn: Feedback function
1295
  @type instance: L{objects.Instance}
1296
  @param instance: Instance object
1297
  @type source_x509_ca: OpenSSL.crypto.X509
1298
  @param source_x509_ca: Import source's X509 CA
1299
  @type cds: string
1300
  @param cds: Cluster domain secret
1301
  @type timeouts: L{ImportExportTimeouts}
1302
  @param timeouts: Timeouts for this import
1303

  
1304
  """
1305
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1306
                                                  source_x509_ca)
1307

  
1308
  # Create crypto key
1309
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1310
                                        constants.RIE_CERT_VALIDITY)
1311
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1312

  
1313
  (x509_key_name, x509_cert_pem) = result.payload
1314
  try:
1315
    # Load certificate
1316
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1317
                                                x509_cert_pem)
1318

  
1319
    # Sign certificate
1320
    signed_x509_cert_pem = \
1321
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1322

  
1323
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1324
                          len(instance.disks), instance.primary_node)
1325

  
1326
    ieloop = ImportExportLoop(lu)
1327
    try:
1328
      for idx, dev in enumerate(instance.disks):
1329
        ieloop.Add(DiskImport(lu, instance.primary_node,
1330
                              x509_key_name, source_ca_pem, instance,
1331
                              constants.IEIO_SCRIPT, (dev, idx),
1332
                              timeouts, cbs, private=(idx, )))
1333

  
1334
      ieloop.Run()
1335
    finally:
1336
      ieloop.FinalizeAll()
1337
  finally:
1338
    # Remove crypto key and certificate
1339
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1340
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1341

  
1342
  return cbs.disk_results
1343

  
1344

  
1192 1345
def _GetImportExportHandshakeMessage(version):
1193 1346
  """Returns the handshake message for a RIE protocol version.
1194 1347

  
b/lib/opcodes.py
460 460
# instance opcodes
461 461

  
462 462
class OpCreateInstance(OpCode):
463
  """Create an instance."""
463
  """Create an instance.
464

  
465
  @ivar instance_name: Instance name
466
  @ivar mode: Instance creation mode (one of L{constants.INSTANCE_CREATE_MODES})
467
  @ivar source_handshake: Signed handshake from source (remote import only)
468
  @ivar source_x509_ca: Source X509 CA in PEM format (remote import only)
469
  @ivar source_instance_name: Previous name of instance (remote import only)
470

  
471
  """
464 472
  OP_ID = "OP_INSTANCE_CREATE"
465 473
  OP_DSC_FIELD = "instance_name"
466 474
  __slots__ = [
......
473 481
    "file_storage_dir", "file_driver",
474 482
    "iallocator",
475 483
    "hypervisor", "hvparams", "beparams",
484
    "source_handshake",
485
    "source_x509_ca",
486
    "source_instance_name",
476 487
    "dry_run",
477 488
    ]
478 489

  

Also available in: Unified diff