Revision 4a96f1d1

b/lib/cmdlib.py
8903 8903
    self.remove_instance = getattr(self.op, "remove_instance", False)
8904 8904
    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
8905 8905
                                          False)
8906
    self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
8907
    self.x509_key_name = getattr(self.op, "x509_key_name", None)
8908
    self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
8906 8909

  
8907 8910
    if self.remove_instance and not self.op.shutdown:
8908 8911
      raise errors.OpPrereqError("Can not remove instance without shutting it"
8909 8912
                                 " down before")
8910 8913

  
8914
    if self.export_mode not in constants.EXPORT_MODES:
8915
      raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
8916
                                 errors.ECODE_INVAL)
8917

  
8918
    if self.export_mode == constants.EXPORT_MODE_REMOTE:
8919
      if not self.x509_key_name:
8920
        raise errors.OpPrereqError("Missing X509 key name for encryption",
8921
                                   errors.ECODE_INVAL)
8922

  
8923
      if not self.dest_x509_ca_pem:
8924
        raise errors.OpPrereqError("Missing destination X509 CA",
8925
                                   errors.ECODE_INVAL)
8926

  
8911 8927
  def ExpandNames(self):
8912 8928
    self._ExpandAndLockInstance()
8913 8929

  
8914
    # FIXME: lock only instance primary and destination node
8915
    #
8916
    # Sad but true, for now we have do lock all nodes, as we don't know where
8917
    # the previous export might be, and and in this LU we search for it and
8918
    # remove it from its current node. In the future we could fix this by:
8919
    #  - making a tasklet to search (share-lock all), then create the new one,
8920
    #    then one to remove, after
8921
    #  - removing the removal operation altogether
8922
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8930
    # Lock all nodes for local exports
8931
    if self.export_mode == constants.EXPORT_MODE_LOCAL:
8932
      # FIXME: lock only instance primary and destination node
8933
      #
8934
      # Sad but true, for now we have do lock all nodes, as we don't know where
8935
      # the previous export might be, and in this LU we search for it and
8936
      # remove it from its current node. In the future we could fix this by:
8937
      #  - making a tasklet to search (share-lock all), then create the new one,
8938
      #    then one to remove, after
8939
      #  - removing the removal operation altogether
8940
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8923 8941

  
8924 8942
  def DeclareLocks(self, level):
8925 8943
    """Last minute lock declaration."""
......
8932 8950

  
8933 8951
    """
8934 8952
    env = {
8953
      "EXPORT_MODE": self.export_mode,
8935 8954
      "EXPORT_NODE": self.op.target_node,
8936 8955
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8937 8956
      "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8938 8957
      # TODO: Generic function for boolean env variables
8939 8958
      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
8940 8959
      }
8960

  
8941 8961
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8942
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8943
          self.op.target_node]
8962

  
8963
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
8964

  
8965
    if self.export_mode == constants.EXPORT_MODE_LOCAL:
8966
      nl.append(self.op.target_node)
8967

  
8944 8968
    return env, nl, nl
8945 8969

  
8946 8970
  def CheckPrereq(self):
......
8950 8974

  
8951 8975
    """
8952 8976
    instance_name = self.op.instance_name
8977

  
8953 8978
    self.instance = self.cfg.GetInstanceInfo(instance_name)
8954 8979
    assert self.instance is not None, \
8955 8980
          "Cannot retrieve locked instance %s" % self.op.instance_name
8956 8981
    _CheckNodeOnline(self, self.instance.primary_node)
8957 8982

  
8958
    self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
8959
    self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
8960
    assert self.dst_node is not None
8983
    if self.export_mode == constants.EXPORT_MODE_LOCAL:
8984
      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
8985
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
8986
      assert self.dst_node is not None
8987

  
8988
      _CheckNodeOnline(self, self.dst_node.name)
8989
      _CheckNodeNotDrained(self, self.dst_node.name)
8990

  
8991
      self._cds = None
8992
      self.dest_x509_ca = None
8993

  
8994
    elif self.export_mode == constants.EXPORT_MODE_REMOTE:
8995
      self.dst_node = None
8996

  
8997
      if len(self.op.target_node) != len(self.instance.disks):
8998
        raise errors.OpPrereqError(("Received destination information for %s"
8999
                                    " disks, but instance %s has %s disks") %
9000
                                   (len(self.op.target_node), instance_name,
9001
                                    len(self.instance.disks)),
9002
                                   errors.ECODE_INVAL)
9003

  
9004
      cds = _GetClusterDomainSecret()
9005

  
9006
      # Check X509 key name
9007
      try:
9008
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
9009
      except (TypeError, ValueError), err:
9010
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
8961 9011

  
8962
    _CheckNodeOnline(self, self.dst_node.name)
8963
    _CheckNodeNotDrained(self, self.dst_node.name)
9012
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
9013
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
9014
                                   errors.ECODE_INVAL)
9015

  
9016
      # Load and verify CA
9017
      try:
9018
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
9019
      except OpenSSL.crypto.Error, err:
9020
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
9021
                                   (err, ), errors.ECODE_INVAL)
9022

  
9023
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
9024
      if errcode is not None:
9025
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, ),
9026
                                   errors.ECODE_INVAL)
9027

  
9028
      self.dest_x509_ca = cert
9029

  
9030
      # Verify target information
9031
      for idx, disk_data in enumerate(self.op.target_node):
9032
        try:
9033
          masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
9034
        except errors.GenericError, err:
9035
          raise errors.OpPrereqError("Target info for disk %s: %s" % (idx, err),
9036
                                     errors.ECODE_INVAL)
9037

  
9038
    else:
9039
      raise errors.ProgrammerError("Unhandled export mode %r" %
9040
                                   self.export_mode)
8964 9041

  
8965 9042
    # instance disk type verification
8966 9043
    # TODO: Implement export support for file-based disks
......
8976 9053
    exports will be removed from the nodes A, B and D.
8977 9054

  
8978 9055
    """
9056
    assert self.export_mode != constants.EXPORT_MODE_REMOTE
9057

  
8979 9058
    nodelist = self.cfg.GetNodeList()
8980 9059
    nodelist.remove(self.dst_node.name)
8981 9060

  
......
8999 9078
    """Export an instance to an image in the cluster.
9000 9079

  
9001 9080
    """
9081
    assert self.export_mode in constants.EXPORT_MODES
9082

  
9002 9083
    instance = self.instance
9003 9084
    src_node = instance.primary_node
9004 9085

  
......
9029 9110

  
9030 9111
      helper.CreateSnapshots()
9031 9112
      try:
9032
        (fin_resu, dresults) = helper.LocalExport(self.dst_node)
9113
        if self.export_mode == constants.EXPORT_MODE_LOCAL:
9114
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
9115
        elif self.export_mode == constants.EXPORT_MODE_REMOTE:
9116
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
9117
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
9118

  
9119
          (key_name, _, _) = self.x509_key_name
9120
          (fin_resu, dresults) = helper.RemoteExport(key_name,
9121
                                                     self.dest_x509_ca,
9122
                                                     self.op.target_node,
9123
                                                     timeouts)
9033 9124
      finally:
9034 9125
        helper.Cleanup()
9035 9126

  
......
9053 9144
        _RemoveInstance(self, feedback_fn, instance,
9054 9145
                        self.ignore_remove_failures)
9055 9146

  
9056
    self._CleanupExports(feedback_fn)
9147
    if self.export_mode == constants.EXPORT_MODE_LOCAL:
9148
      self._CleanupExports(feedback_fn)
9057 9149

  
9058 9150
    return fin_resu, dresults
9059 9151

  
b/lib/constants.py
328 328
REPLACE_DISK_CHG = "replace_new_secondary" # change secondary node
329 329
REPLACE_DISK_AUTO = "replace_auto"
330 330

  
331
# Instance export mode
332
EXPORT_MODE_LOCAL = "local"
333
EXPORT_MODE_REMOTE = "remote"
334
EXPORT_MODES = frozenset([
335
  EXPORT_MODE_LOCAL,
336
  EXPORT_MODE_REMOTE,
337
  ])
338

  
331 339
# lock recalculate mode
332 340
LOCKS_REPLACE = 'replace'
333 341
LOCKS_APPEND = 'append'
......
343 351
# Remote import/export certificate validity in seconds
344 352
RIE_CERT_VALIDITY = 24 * 60 * 60
345 353

  
354
# Remote import/export connect timeout for socat
355
RIE_CONNECT_TIMEOUT = 60
356

  
346 357
DISK_TEMPLATES = frozenset([DT_DISKLESS, DT_PLAIN,
347 358
                            DT_DRBD8, DT_FILE])
348 359

  
b/lib/masterd/instance.py
25 25

  
26 26
import logging
27 27
import time
28
import OpenSSL
28 29

  
29 30
from ganeti import constants
30 31
from ganeti import errors
......
966 967
  return [bool(dtp.success) for dtp in all_dtp]
967 968

  
968 969

  
970
class _RemoteExportCb(ImportExportCbBase):
971
  def __init__(self, feedback_fn, disk_count):
972
    """Initializes this class.
973

  
974
    """
975
    ImportExportCbBase.__init__(self)
976
    self._feedback_fn = feedback_fn
977
    self._dresults = [None] * disk_count
978

  
979
  @property
980
  def disk_results(self):
981
    """Returns per-disk results.
982

  
983
    """
984
    return self._dresults
985

  
986
  def ReportConnected(self, ie, private):
987
    """Called when a connection has been established.
988

  
989
    """
990
    (idx, _) = private
991

  
992
    self._feedback_fn("Disk %s is now sending data" % idx)
993

  
994
  def ReportFinished(self, ie, private):
995
    """Called when a transfer has finished.
996

  
997
    """
998
    (idx, finished_fn) = private
999

  
1000
    if ie.success:
1001
      self._feedback_fn("Disk %s finished sending data" % idx)
1002
    else:
1003
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1004
                        (idx, ie.final_message, ie.recent_output))
1005

  
1006
    self._dresults[idx] = bool(ie.success)
1007

  
1008
    if finished_fn:
1009
      finished_fn()
1010

  
1011

  
969 1012
class ExportInstanceHelper:
970 1013
  def __init__(self, lu, feedback_fn, instance):
971 1014
    """Initializes this class.
......
1088 1131

  
1089 1132
    return (fin_resu, dresults)
1090 1133

  
1134
  def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
1135
    """Inter-cluster instance export.
1136

  
1137
    @type x509_key_name: string
1138
    @param x509_key_name: X509 key name for encrypting data
1139
    @type dest_x509_ca: OpenSSL.crypto.X509
1140
    @param dest_x509_ca: Remote peer X509 CA object
1141
    @type disk_info: list
1142
    @param disk_info: Per-disk destination information
1143
    @type timeouts: L{ImportExportTimeouts}
1144
    @param timeouts: Timeouts for this import
1145

  
1146
    """
1147
    instance = self._instance
1148

  
1149
    assert len(disk_info) == len(instance.disks)
1150

  
1151
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1152

  
1153
    dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1154
                                                  dest_x509_ca)
1155

  
1156
    ieloop = ImportExportLoop(self._lu)
1157
    try:
1158
      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1159
                                                          disk_info)):
1160
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1161
        finished_fn = compat.partial(self._TransferFinished, idx)
1162
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1163
                              x509_key_name, dest_ca_pem, host, port, instance,
1164
                              constants.IEIO_SCRIPT, (dev, idx),
1165
                              timeouts, cbs, private=(idx, finished_fn)))
1166

  
1167
      ieloop.Run()
1168
    finally:
1169
      ieloop.FinalizeAll()
1170

  
1171
    return (True, cbs.disk_results)
1172

  
1091 1173
  def _TransferFinished(self, idx):
1092 1174
    """Called once a transfer has finished.
1093 1175

  
......
1152 1234
            (constants.RIE_VERSION, version))
1153 1235

  
1154 1236
  return None
1237

  
1238

  
1239
def _GetRieDiskInfoMessage(disk_index, host, port):
1240
  """Returns the hashed text for import/export disk information.
1241

  
1242
  @type disk_index: number
1243
  @param disk_index: Index of disk (included in hash)
1244
  @type host: string
1245
  @param host: Hostname
1246
  @type port: number
1247
  @param port: Daemon port
1248

  
1249
  """
1250
  return "%s:%s:%s" % (disk_index, host, port)
1251

  
1252

  
1253
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1254
  """Verifies received disk information for an export.
1255

  
1256
  @type cds: string
1257
  @param cds: Cluster domain secret
1258
  @type disk_index: number
1259
  @param disk_index: Index of disk (included in hash)
1260
  @type disk_info: sequence
1261
  @param disk_info: Disk information sent by remote peer
1262

  
1263
  """
1264
  try:
1265
    (host, port, hmac_digest, hmac_salt) = disk_info
1266
  except (TypeError, ValueError), err:
1267
    raise errors.GenericError("Invalid data: %s" % err)
1268

  
1269
  if not (host and port):
1270
    raise errors.GenericError("Missing destination host or port")
1271

  
1272
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1273

  
1274
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1275
    raise errors.GenericError("HMAC is wrong")
1276

  
1277
  return (host, port)
1278

  
1279

  
1280
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1281
  """Computes the signed disk information for a remote import.
1282

  
1283
  @type cds: string
1284
  @param cds: Cluster domain secret
1285
  @type salt: string
1286
  @param salt: HMAC salt
1287
  @type disk_index: number
1288
  @param disk_index: Index of disk (included in hash)
1289
  @type host: string
1290
  @param host: Hostname
1291
  @type port: number
1292
  @param port: Daemon port
1293

  
1294
  """
1295
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1296
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1297
  return (host, port, hmac_digest, salt)
b/lib/opcodes.py
668 668

  
669 669

  
670 670
class OpExportInstance(OpCode):
671
  """Export an instance."""
671
  """Export an instance.
672

  
673
  For local exports, the export destination is the node name. For remote
674
  exports, the export destination is a list of tuples, each consisting of
675
  hostname/IP address, port, HMAC and HMAC salt. The HMAC is calculated using
676
  the cluster domain secret over the value "${index}:${hostname}:${port}". The
677
  destination X509 CA must be a signed certificate.
678

  
679
  @ivar mode: Export mode (one of L{constants.EXPORT_MODES})
680
  @ivar target_node: Export destination
681
  @ivar x509_key_name: X509 key to use (remote export only)
682
  @ivar destination_x509_ca: Destination X509 CA in PEM format (remote export
683
                             only)
684

  
685
  """
672 686
  OP_ID = "OP_BACKUP_EXPORT"
673 687
  OP_DSC_FIELD = "instance_name"
674 688
  __slots__ = [
689
    # TODO: Rename target_node as it changes meaning for different export modes
690
    # (e.g. "destination")
675 691
    "instance_name", "target_node", "shutdown", "shutdown_timeout",
676 692
    "remove_instance",
677 693
    "ignore_remove_failures",
694
    "mode",
695
    "x509_key_name",
696
    "destination_x509_ca",
678 697
    ]
679 698

  
680 699

  
b/test/ganeti.masterd.instance_unittest.py
26 26
import unittest
27 27

  
28 28
from ganeti import constants
29
from ganeti import errors
29 30
from ganeti import utils
30 31
from ganeti import masterd
31 32

  
32 33
from ganeti.masterd.instance import \
33 34
  ImportExportTimeouts, _TimeoutExpired, _DiskImportExportBase, \
34
  ComputeRemoteExportHandshake, CheckRemoteExportHandshake
35
  ComputeRemoteExportHandshake, CheckRemoteExportHandshake, \
36
  ComputeRemoteImportDiskInfo, CheckRemoteExportDiskInfo
35 37

  
36 38
import testutils
37 39

  
......
86 88
    self.assert_(CheckRemoteExportHandshake(cds, hs))
87 89

  
88 90

  
91
class TestRieDiskInfo(unittest.TestCase):
92
  def test(self):
93
    cds = "bbf46ea9a"
94
    salt = "ee5ad9"
95
    di = ComputeRemoteImportDiskInfo(cds, salt, 0, "node1", 1234)
96
    self.assertEqual(CheckRemoteExportDiskInfo(cds, 0, di),
97
                     ("node1", 1234))
98

  
99
    for i in range(1, 100):
100
      # Wrong disk index
101
      self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
102
                        cds, i, di)
103

  
104
  def testCheckErrors(self):
105
    cds = "0776450535a"
106
    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
107
                      cds, 0, "")
108
    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
109
                      cds, 0, ())
110
    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
111
                      cds, 0, ("", 1, 2, 3, 4, 5))
112

  
113
    # No host/port
114
    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
115
                      cds, 0, ("", 0, "", ""))
116

  
117
    # Wrong hash
118
    self.assertRaises(errors.GenericError, CheckRemoteExportDiskInfo,
119
                      cds, 0, ("nodeX", 123, "fakehash", "xyz"))
120

  
121

  
89 122
if __name__ == "__main__":
90 123
  testutils.GanetiTestProgram()

Also available in: Unified diff