Revision eb630f50
b/daemons/ganeti-noded | ||
---|---|---|
847 | 847 |
"""Starts an import daemon. |
848 | 848 |
|
849 | 849 |
""" |
850 |
(x509_key_name, source_x509_ca, instance, dest, dest_args) = params |
|
851 |
return backend.StartImportExportDaemon(constants.IEM_IMPORT, |
|
852 |
x509_key_name, source_x509_ca, |
|
850 |
(opts_s, instance, dest, dest_args) = params |
|
851 |
|
|
852 |
opts = objects.ImportExportOptions.FromDict(opts_s) |
|
853 |
|
|
854 |
return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, |
|
853 | 855 |
None, None, |
854 | 856 |
objects.Instance.FromDict(instance), |
855 | 857 |
dest, |
856 | 858 |
_DecodeImportExportIO(dest, |
857 | 859 |
dest_args)) |
860 |
|
|
858 | 861 |
@staticmethod |
859 | 862 |
def perspective_export_start(params): |
860 | 863 |
"""Starts an export daemon. |
861 | 864 |
|
862 | 865 |
""" |
863 |
(x509_key_name, dest_x509_ca, host, port, instance, |
|
864 |
source, source_args) = params |
|
865 |
return backend.StartImportExportDaemon(constants.IEM_EXPORT, |
|
866 |
x509_key_name, dest_x509_ca, |
|
866 |
(opts_s, host, port, instance, source, source_args) = params |
|
867 |
|
|
868 |
opts = objects.ImportExportOptions.FromDict(opts_s) |
|
869 |
|
|
870 |
return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, |
|
867 | 871 |
host, port, |
868 | 872 |
objects.Instance.FromDict(instance), |
869 | 873 |
source, |
b/lib/backend.py | ||
---|---|---|
2617 | 2617 |
(prefix, utils.TimestampForFilename()))) |
2618 | 2618 |
|
2619 | 2619 |
|
2620 |
def StartImportExportDaemon(mode, key_name, ca, host, port, instance, |
|
2621 |
ieio, ieioargs): |
|
2620 |
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs): |
|
2622 | 2621 |
"""Starts an import or export daemon. |
2623 | 2622 |
|
2624 | 2623 |
@param mode: Import/output mode |
2625 |
@type key_name: string |
|
2626 |
@param key_name: RSA key name (None to use cluster certificate) |
|
2627 |
@type ca: string: |
|
2628 |
@param ca: Remote CA in PEM format (None to use cluster certificate) |
|
2624 |
@type opts: L{objects.ImportExportOptions} |
|
2625 |
@param opts: Daemon options |
|
2629 | 2626 |
@type host: string |
2630 | 2627 |
@param host: Remote host for export (None for import) |
2631 | 2628 |
@type port: int |
... | ... | |
2651 | 2648 |
else: |
2652 | 2649 |
_Fail("Invalid mode %r", mode) |
2653 | 2650 |
|
2654 |
if (key_name is None) ^ (ca is None):
|
|
2651 |
if (opts.key_name is None) ^ (opts.ca_pem is None):
|
|
2655 | 2652 |
_Fail("Cluster certificate can only be used for both key and CA") |
2656 | 2653 |
|
2657 | 2654 |
(cmd_env, cmd_prefix, cmd_suffix) = \ |
2658 | 2655 |
_GetImportExportIoCommand(instance, mode, ieio, ieioargs) |
2659 | 2656 |
|
2660 |
if key_name is None: |
|
2657 |
if opts.key_name is None:
|
|
2661 | 2658 |
# Use server.pem |
2662 | 2659 |
key_path = constants.NODED_CERT_FILE |
2663 | 2660 |
cert_path = constants.NODED_CERT_FILE |
2664 |
assert ca is None
|
|
2661 |
assert opts.ca_pem is None
|
|
2665 | 2662 |
else: |
2666 | 2663 |
(_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR, |
2667 |
key_name) |
|
2668 |
assert ca is not None
|
|
2664 |
opts.key_name)
|
|
2665 |
assert opts.ca_pem is not None
|
|
2669 | 2666 |
|
2670 | 2667 |
for i in [key_path, cert_path]: |
2671 | 2668 |
if not os.path.exists(i): |
... | ... | |
2677 | 2674 |
pid_file = utils.PathJoin(status_dir, _IES_PID_FILE) |
2678 | 2675 |
ca_file = utils.PathJoin(status_dir, _IES_CA_FILE) |
2679 | 2676 |
|
2680 |
if ca is None:
|
|
2677 |
if opts.ca_pem is None:
|
|
2681 | 2678 |
# Use server.pem |
2682 | 2679 |
ca = utils.ReadFile(constants.NODED_CERT_FILE) |
2680 |
else: |
|
2681 |
ca = opts.ca_pem |
|
2683 | 2682 |
|
2683 |
# Write CA file |
|
2684 | 2684 |
utils.WriteFile(ca_file, data=ca, mode=0400) |
2685 | 2685 |
|
2686 | 2686 |
cmd = [ |
b/lib/cmdlib.py | ||
---|---|---|
9190 | 9190 |
timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) |
9191 | 9191 |
|
9192 | 9192 |
(key_name, _, _) = self.x509_key_name |
9193 |
(fin_resu, dresults) = helper.RemoteExport(key_name, |
|
9194 |
self.dest_x509_ca, |
|
9195 |
self.op.target_node, |
|
9193 |
|
|
9194 |
dest_ca_pem = \ |
|
9195 |
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
9196 |
self.dest_x509_ca) |
|
9197 |
|
|
9198 |
opts = objects.ImportExportOptions(key_name=key_name, |
|
9199 |
ca_pem=dest_ca_pem) |
|
9200 |
|
|
9201 |
(fin_resu, dresults) = helper.RemoteExport(opts, self.op.target_node, |
|
9196 | 9202 |
timeouts) |
9197 | 9203 |
finally: |
9198 | 9204 |
helper.Cleanup() |
b/lib/masterd/instance.py | ||
---|---|---|
121 | 121 |
class _DiskImportExportBase(object): |
122 | 122 |
MODE_TEXT = None |
123 | 123 |
|
124 |
def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
|
|
124 |
def __init__(self, lu, node_name, opts,
|
|
125 | 125 |
instance, timeouts, cbs, private=None): |
126 | 126 |
"""Initializes this class. |
127 | 127 |
|
128 | 128 |
@param lu: Logical unit instance |
129 | 129 |
@type node_name: string |
130 | 130 |
@param node_name: Node name for import |
131 |
@type x509_key_name: string |
|
132 |
@param x509_key_name: Name of X509 key (None for node daemon key) |
|
133 |
@type remote_x509_ca: string |
|
134 |
@param remote_x509_ca: Remote peer's CA (None for node daemon certificate) |
|
131 |
@type opts: L{objects.ImportExportOptions} |
|
132 |
@param opts: Import/export daemon options |
|
135 | 133 |
@type instance: L{objects.Instance} |
136 | 134 |
@param instance: Instance object |
137 | 135 |
@type timeouts: L{ImportExportTimeouts} |
... | ... | |
145 | 143 |
|
146 | 144 |
self._lu = lu |
147 | 145 |
self.node_name = node_name |
148 |
self._x509_key_name = x509_key_name |
|
149 |
self._remote_x509_ca = remote_x509_ca |
|
146 |
self._opts = opts |
|
150 | 147 |
self._instance = instance |
151 | 148 |
self._timeouts = timeouts |
152 | 149 |
self._cbs = cbs |
... | ... | |
433 | 430 |
class DiskImport(_DiskImportExportBase): |
434 | 431 |
MODE_TEXT = "import" |
435 | 432 |
|
436 |
def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
|
|
433 |
def __init__(self, lu, node_name, opts, instance,
|
|
437 | 434 |
dest, dest_args, timeouts, cbs, private=None): |
438 | 435 |
"""Initializes this class. |
439 | 436 |
|
440 | 437 |
@param lu: Logical unit instance |
441 | 438 |
@type node_name: string |
442 | 439 |
@param node_name: Node name for import |
443 |
@type x509_key_name: string |
|
444 |
@param x509_key_name: Name of X509 key (None for node daemon key) |
|
445 |
@type source_x509_ca: string |
|
446 |
@param source_x509_ca: Remote peer's CA (None for node daemon certificate) |
|
440 |
@type opts: L{objects.ImportExportOptions} |
|
441 |
@param opts: Import/export daemon options |
|
447 | 442 |
@type instance: L{objects.Instance} |
448 | 443 |
@param instance: Instance object |
449 | 444 |
@param dest: I/O destination |
... | ... | |
455 | 450 |
@param private: Private data for callback functions |
456 | 451 |
|
457 | 452 |
""" |
458 |
_DiskImportExportBase.__init__(self, lu, node_name, |
|
459 |
x509_key_name, source_x509_ca, |
|
453 |
_DiskImportExportBase.__init__(self, lu, node_name, opts, |
|
460 | 454 |
instance, timeouts, cbs, private) |
461 | 455 |
self._dest = dest |
462 | 456 |
self._dest_args = dest_args |
... | ... | |
478 | 472 |
"""Starts the import daemon. |
479 | 473 |
|
480 | 474 |
""" |
481 |
return self._lu.rpc.call_import_start(self.node_name, |
|
482 |
self._x509_key_name, |
|
483 |
self._remote_x509_ca, self._instance, |
|
475 |
return self._lu.rpc.call_import_start(self.node_name, self._opts, |
|
476 |
self._instance, |
|
484 | 477 |
self._dest, self._dest_args) |
485 | 478 |
|
486 | 479 |
def CheckListening(self): |
... | ... | |
526 | 519 |
class DiskExport(_DiskImportExportBase): |
527 | 520 |
MODE_TEXT = "export" |
528 | 521 |
|
529 |
def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
|
|
522 |
def __init__(self, lu, node_name, opts,
|
|
530 | 523 |
dest_host, dest_port, instance, source, source_args, |
531 | 524 |
timeouts, cbs, private=None): |
532 | 525 |
"""Initializes this class. |
... | ... | |
534 | 527 |
@param lu: Logical unit instance |
535 | 528 |
@type node_name: string |
536 | 529 |
@param node_name: Node name for import |
537 |
@type x509_key_name: string |
|
538 |
@param x509_key_name: Name of X509 key (None for node daemon key) |
|
539 |
@type dest_x509_ca: string |
|
540 |
@param dest_x509_ca: Remote peer's CA (None for node daemon certificate) |
|
530 |
@type opts: L{objects.ImportExportOptions} |
|
531 |
@param opts: Import/export daemon options |
|
541 | 532 |
@type dest_host: string |
542 | 533 |
@param dest_host: Destination host name or IP address |
543 | 534 |
@type dest_port: number |
... | ... | |
553 | 544 |
@param private: Private data for callback functions |
554 | 545 |
|
555 | 546 |
""" |
556 |
_DiskImportExportBase.__init__(self, lu, node_name, |
|
557 |
x509_key_name, dest_x509_ca, |
|
547 |
_DiskImportExportBase.__init__(self, lu, node_name, opts, |
|
558 | 548 |
instance, timeouts, cbs, private) |
559 | 549 |
self._dest_host = dest_host |
560 | 550 |
self._dest_port = dest_port |
... | ... | |
565 | 555 |
"""Starts the export daemon. |
566 | 556 |
|
567 | 557 |
""" |
568 |
return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name, |
|
569 |
self._remote_x509_ca, |
|
558 |
return self._lu.rpc.call_export_start(self.node_name, self._opts, |
|
570 | 559 |
self._dest_host, self._dest_port, |
571 | 560 |
self._instance, self._source, |
572 | 561 |
self._source_args) |
... | ... | |
819 | 808 |
|
820 | 809 |
self.feedback_fn("%s is now listening, starting export" % dtp.data.name) |
821 | 810 |
|
811 |
opts = objects.ImportExportOptions(key_name=None, ca_pem=None) |
|
812 |
|
|
822 | 813 |
# Start export on source node |
823 |
de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip, |
|
824 |
ie.listen_port, self.instance, |
|
825 |
dtp.data.src_io, dtp.data.src_ioargs, |
|
814 |
de = DiskExport(self.lu, self.src_node, opts, self.dest_ip, ie.listen_port, |
|
815 |
self.instance, dtp.data.src_io, dtp.data.src_ioargs, |
|
826 | 816 |
self.timeouts, self.src_cbs, private=dtp) |
827 | 817 |
ie.loop.Add(de) |
828 | 818 |
|
... | ... | |
924 | 914 |
each transfer |
925 | 915 |
|
926 | 916 |
""" |
917 |
opts = objects.ImportExportOptions(key_name=None, ca_pem=None) |
|
927 | 918 |
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) |
928 | 919 |
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, |
929 | 920 |
src_node, None, dest_node, dest_ip) |
... | ... | |
941 | 932 |
|
942 | 933 |
dtp = _DiskTransferPrivate(transfer, True) |
943 | 934 |
|
944 |
di = DiskImport(lu, dest_node, None, None, instance,
|
|
935 |
di = DiskImport(lu, dest_node, opts, instance,
|
|
945 | 936 |
transfer.dest_io, transfer.dest_ioargs, |
946 | 937 |
timeouts, dest_cbs, private=dtp) |
947 | 938 |
ieloop.Add(di) |
... | ... | |
1131 | 1122 |
|
1132 | 1123 |
return (fin_resu, dresults) |
1133 | 1124 |
|
1134 |
def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
|
|
1125 |
def RemoteExport(self, opts, disk_info, timeouts):
|
|
1135 | 1126 |
"""Inter-cluster instance export. |
1136 | 1127 |
|
1137 |
@type x509_key_name: string |
|
1138 |
@param x509_key_name: X509 key name for encrypting data |
|
1139 |
@type dest_x509_ca: OpenSSL.crypto.X509 |
|
1140 |
@param dest_x509_ca: Remote peer X509 CA object |
|
1128 |
@type opts: L{objects.ImportExportOptions} |
|
1129 |
@param opts: Import/export daemon options |
|
1141 | 1130 |
@type disk_info: list |
1142 | 1131 |
@param disk_info: Per-disk destination information |
1143 | 1132 |
@type timeouts: L{ImportExportTimeouts} |
... | ... | |
1150 | 1139 |
|
1151 | 1140 |
cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) |
1152 | 1141 |
|
1153 |
dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
1154 |
dest_x509_ca) |
|
1155 |
|
|
1156 | 1142 |
ieloop = ImportExportLoop(self._lu) |
1157 | 1143 |
try: |
1158 | 1144 |
for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks, |
... | ... | |
1160 | 1146 |
self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) |
1161 | 1147 |
finished_fn = compat.partial(self._TransferFinished, idx) |
1162 | 1148 |
ieloop.Add(DiskExport(self._lu, instance.primary_node, |
1163 |
x509_key_name, dest_ca_pem, host, port, instance,
|
|
1149 |
opts, host, port, instance,
|
|
1164 | 1150 |
constants.IEIO_SCRIPT, (dev, idx), |
1165 | 1151 |
timeouts, cbs, private=(idx, finished_fn))) |
1166 | 1152 |
|
... | ... | |
1316 | 1302 |
x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
1317 | 1303 |
x509_cert_pem) |
1318 | 1304 |
|
1305 |
# Import daemon options |
|
1306 |
opts = objects.ImportExportOptions(key_name=x509_key_name, |
|
1307 |
ca_pem=source_ca_pem) |
|
1308 |
|
|
1319 | 1309 |
# Sign certificate |
1320 | 1310 |
signed_x509_cert_pem = \ |
1321 | 1311 |
utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) |
... | ... | |
1326 | 1316 |
ieloop = ImportExportLoop(lu) |
1327 | 1317 |
try: |
1328 | 1318 |
for idx, dev in enumerate(instance.disks): |
1329 |
ieloop.Add(DiskImport(lu, instance.primary_node, |
|
1330 |
x509_key_name, source_ca_pem, instance, |
|
1319 |
ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, |
|
1331 | 1320 |
constants.IEIO_SCRIPT, (dev, idx), |
1332 | 1321 |
timeouts, cbs, private=(idx, ))) |
1333 | 1322 |
|
b/lib/objects.py | ||
---|---|---|
1022 | 1022 |
] + _TIMESTAMPS |
1023 | 1023 |
|
1024 | 1024 |
|
1025 |
class ImportExportOptions(ConfigObject): |
|
1026 |
"""Options for import/export daemon |
|
1027 |
|
|
1028 |
@ivar key_name: X509 key name (None for cluster certificate) |
|
1029 |
@ivar ca_pem: Remote peer CA in PEM format (None for cluster certificate) |
|
1030 |
|
|
1031 |
""" |
|
1032 |
__slots__ = [ |
|
1033 |
"key_name", |
|
1034 |
"ca_pem", |
|
1035 |
] |
|
1036 |
|
|
1037 |
|
|
1025 | 1038 |
class ConfdRequest(ConfigObject): |
1026 | 1039 |
"""Object holding a confd request. |
1027 | 1040 |
|
b/lib/rpc.py | ||
---|---|---|
1202 | 1202 |
""" |
1203 | 1203 |
return self._SingleNodeCall(node, "x509_cert_remove", [name]) |
1204 | 1204 |
|
1205 |
def call_import_start(self, node, x509_key_name, source_x509_ca, |
|
1206 |
instance, dest, dest_args): |
|
1205 |
def call_import_start(self, node, opts, instance, dest, dest_args): |
|
1207 | 1206 |
"""Starts a listener for an import. |
1208 | 1207 |
|
1209 | 1208 |
This is a single-node call. |
... | ... | |
1215 | 1214 |
|
1216 | 1215 |
""" |
1217 | 1216 |
return self._SingleNodeCall(node, "import_start", |
1218 |
[x509_key_name, source_x509_ca,
|
|
1217 |
[opts.ToDict(),
|
|
1219 | 1218 |
self._InstDict(instance), dest, |
1220 | 1219 |
_EncodeImportExportIO(dest, dest_args)]) |
1221 | 1220 |
|
1222 |
def call_export_start(self, node, x509_key_name, dest_x509_ca, host, port,
|
|
1221 |
def call_export_start(self, node, opts, host, port,
|
|
1223 | 1222 |
instance, source, source_args): |
1224 | 1223 |
"""Starts an export daemon. |
1225 | 1224 |
|
... | ... | |
1232 | 1231 |
|
1233 | 1232 |
""" |
1234 | 1233 |
return self._SingleNodeCall(node, "export_start", |
1235 |
[x509_key_name, dest_x509_ca, host, port,
|
|
1234 |
[opts.ToDict(), host, port,
|
|
1236 | 1235 |
self._InstDict(instance), source, |
1237 | 1236 |
_EncodeImportExportIO(source, source_args)]) |
1238 | 1237 |
|
Also available in: Unified diff