Revision 4a96f1d1 lib/masterd/instance.py
b/lib/masterd/instance.py | ||
---|---|---|
25 | 25 |
|
26 | 26 |
import logging |
27 | 27 |
import time |
28 |
import OpenSSL |
|
28 | 29 |
|
29 | 30 |
from ganeti import constants |
30 | 31 |
from ganeti import errors |
... | ... | |
966 | 967 |
return [bool(dtp.success) for dtp in all_dtp] |
967 | 968 |
|
968 | 969 |
|
970 |
class _RemoteExportCb(ImportExportCbBase): |
|
971 |
def __init__(self, feedback_fn, disk_count): |
|
972 |
"""Initializes this class. |
|
973 |
|
|
974 |
""" |
|
975 |
ImportExportCbBase.__init__(self) |
|
976 |
self._feedback_fn = feedback_fn |
|
977 |
self._dresults = [None] * disk_count |
|
978 |
|
|
979 |
@property |
|
980 |
def disk_results(self): |
|
981 |
"""Returns per-disk results. |
|
982 |
|
|
983 |
""" |
|
984 |
return self._dresults |
|
985 |
|
|
986 |
def ReportConnected(self, ie, private): |
|
987 |
"""Called when a connection has been established. |
|
988 |
|
|
989 |
""" |
|
990 |
(idx, _) = private |
|
991 |
|
|
992 |
self._feedback_fn("Disk %s is now sending data" % idx) |
|
993 |
|
|
994 |
def ReportFinished(self, ie, private): |
|
995 |
"""Called when a transfer has finished. |
|
996 |
|
|
997 |
""" |
|
998 |
(idx, finished_fn) = private |
|
999 |
|
|
1000 |
if ie.success: |
|
1001 |
self._feedback_fn("Disk %s finished sending data" % idx) |
|
1002 |
else: |
|
1003 |
self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" % |
|
1004 |
(idx, ie.final_message, ie.recent_output)) |
|
1005 |
|
|
1006 |
self._dresults[idx] = bool(ie.success) |
|
1007 |
|
|
1008 |
if finished_fn: |
|
1009 |
finished_fn() |
|
1010 |
|
|
1011 |
|
|
969 | 1012 |
class ExportInstanceHelper: |
970 | 1013 |
def __init__(self, lu, feedback_fn, instance): |
971 | 1014 |
"""Initializes this class. |
... | ... | |
1088 | 1131 |
|
1089 | 1132 |
return (fin_resu, dresults) |
1090 | 1133 |
|
1134 |
def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts): |
|
1135 |
"""Inter-cluster instance export. |
|
1136 |
|
|
1137 |
@type x509_key_name: string |
|
1138 |
@param x509_key_name: X509 key name for encrypting data |
|
1139 |
@type dest_x509_ca: OpenSSL.crypto.X509 |
|
1140 |
@param dest_x509_ca: Remote peer X509 CA object |
|
1141 |
@type disk_info: list |
|
1142 |
@param disk_info: Per-disk destination information |
|
1143 |
@type timeouts: L{ImportExportTimeouts} |
|
1144 |
@param timeouts: Timeouts for this import |
|
1145 |
|
|
1146 |
""" |
|
1147 |
instance = self._instance |
|
1148 |
|
|
1149 |
assert len(disk_info) == len(instance.disks) |
|
1150 |
|
|
1151 |
cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) |
|
1152 |
|
|
1153 |
dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
1154 |
dest_x509_ca) |
|
1155 |
|
|
1156 |
ieloop = ImportExportLoop(self._lu) |
|
1157 |
try: |
|
1158 |
for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks, |
|
1159 |
disk_info)): |
|
1160 |
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) |
|
1161 |
finished_fn = compat.partial(self._TransferFinished, idx) |
|
1162 |
ieloop.Add(DiskExport(self._lu, instance.primary_node, |
|
1163 |
x509_key_name, dest_ca_pem, host, port, instance, |
|
1164 |
constants.IEIO_SCRIPT, (dev, idx), |
|
1165 |
timeouts, cbs, private=(idx, finished_fn))) |
|
1166 |
|
|
1167 |
ieloop.Run() |
|
1168 |
finally: |
|
1169 |
ieloop.FinalizeAll() |
|
1170 |
|
|
1171 |
return (True, cbs.disk_results) |
|
1172 |
|
|
1091 | 1173 |
def _TransferFinished(self, idx): |
1092 | 1174 |
"""Called once a transfer has finished. |
1093 | 1175 |
|
... | ... | |
1152 | 1234 |
(constants.RIE_VERSION, version)) |
1153 | 1235 |
|
1154 | 1236 |
return None |
1237 |
|
|
1238 |
|
|
1239 |
def _GetRieDiskInfoMessage(disk_index, host, port): |
|
1240 |
"""Returns the hashed text for import/export disk information. |
|
1241 |
|
|
1242 |
@type disk_index: number |
|
1243 |
@param disk_index: Index of disk (included in hash) |
|
1244 |
@type host: string |
|
1245 |
@param host: Hostname |
|
1246 |
@type port: number |
|
1247 |
@param port: Daemon port |
|
1248 |
|
|
1249 |
""" |
|
1250 |
return "%s:%s:%s" % (disk_index, host, port) |
|
1251 |
|
|
1252 |
|
|
1253 |
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info): |
|
1254 |
"""Verifies received disk information for an export. |
|
1255 |
|
|
1256 |
@type cds: string |
|
1257 |
@param cds: Cluster domain secret |
|
1258 |
@type disk_index: number |
|
1259 |
@param disk_index: Index of disk (included in hash) |
|
1260 |
@type disk_info: sequence |
|
1261 |
@param disk_info: Disk information sent by remote peer |
|
1262 |
|
|
1263 |
""" |
|
1264 |
try: |
|
1265 |
(host, port, hmac_digest, hmac_salt) = disk_info |
|
1266 |
except (TypeError, ValueError), err: |
|
1267 |
raise errors.GenericError("Invalid data: %s" % err) |
|
1268 |
|
|
1269 |
if not (host and port): |
|
1270 |
raise errors.GenericError("Missing destination host or port") |
|
1271 |
|
|
1272 |
msg = _GetRieDiskInfoMessage(disk_index, host, port) |
|
1273 |
|
|
1274 |
if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): |
|
1275 |
raise errors.GenericError("HMAC is wrong") |
|
1276 |
|
|
1277 |
return (host, port) |
|
1278 |
|
|
1279 |
|
|
1280 |
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port): |
|
1281 |
"""Computes the signed disk information for a remote import. |
|
1282 |
|
|
1283 |
@type cds: string |
|
1284 |
@param cds: Cluster domain secret |
|
1285 |
@type salt: string |
|
1286 |
@param salt: HMAC salt |
|
1287 |
@type disk_index: number |
|
1288 |
@param disk_index: Index of disk (included in hash) |
|
1289 |
@type host: string |
|
1290 |
@param host: Hostname |
|
1291 |
@type port: number |
|
1292 |
@param port: Daemon port |
|
1293 |
|
|
1294 |
""" |
|
1295 |
msg = _GetRieDiskInfoMessage(disk_index, host, port) |
|
1296 |
hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) |
|
1297 |
return (host, port, hmac_digest, salt) |
Also available in: Unified diff