Revision 5d97d6dd lib/masterd/instance.py
b/lib/masterd/instance.py | ||
---|---|---|
747 | 747 |
success = diskie.Finalize() and success |
748 | 748 |
|
749 | 749 |
return success |
750 |
|
|
751 |
|
|
752 |
class _TransferInstCbBase(ImportExportCbBase): |
|
753 |
def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, |
|
754 |
dest_node, dest_ip): |
|
755 |
"""Initializes this class. |
|
756 |
|
|
757 |
""" |
|
758 |
ImportExportCbBase.__init__(self) |
|
759 |
|
|
760 |
self.lu = lu |
|
761 |
self.feedback_fn = feedback_fn |
|
762 |
self.instance = instance |
|
763 |
self.timeouts = timeouts |
|
764 |
self.src_node = src_node |
|
765 |
self.src_cbs = src_cbs |
|
766 |
self.dest_node = dest_node |
|
767 |
self.dest_ip = dest_ip |
|
768 |
|
|
769 |
|
|
770 |
class _TransferInstSourceCb(_TransferInstCbBase): |
|
771 |
def ReportConnected(self, ie, dtp): |
|
772 |
"""Called when a connection has been established. |
|
773 |
|
|
774 |
""" |
|
775 |
assert self.src_cbs is None |
|
776 |
assert dtp.src_export == ie |
|
777 |
assert dtp.dest_import |
|
778 |
|
|
779 |
self.feedback_fn("%s is sending data on %s" % |
|
780 |
(dtp.data.name, ie.node_name)) |
|
781 |
|
|
782 |
def ReportFinished(self, ie, dtp): |
|
783 |
"""Called when a transfer has finished. |
|
784 |
|
|
785 |
""" |
|
786 |
assert self.src_cbs is None |
|
787 |
assert dtp.src_export == ie |
|
788 |
assert dtp.dest_import |
|
789 |
|
|
790 |
if ie.success: |
|
791 |
self.feedback_fn("%s finished sending data" % dtp.data.name) |
|
792 |
else: |
|
793 |
self.feedback_fn("%s failed to send data: %s (recent output: %r)" % |
|
794 |
(dtp.data.name, ie.final_message, ie.recent_output)) |
|
795 |
|
|
796 |
dtp.RecordResult(ie.success) |
|
797 |
|
|
798 |
cb = dtp.data.finished_fn |
|
799 |
if cb: |
|
800 |
cb() |
|
801 |
|
|
802 |
# TODO: Check whether sending SIGTERM right away is okay, maybe we should |
|
803 |
# give the daemon a moment to sort things out |
|
804 |
if dtp.dest_import and not ie.success: |
|
805 |
dtp.dest_import.Abort() |
|
806 |
|
|
807 |
|
|
808 |
class _TransferInstDestCb(_TransferInstCbBase): |
|
809 |
def ReportListening(self, ie, dtp): |
|
810 |
"""Called when daemon started listening. |
|
811 |
|
|
812 |
""" |
|
813 |
assert self.src_cbs |
|
814 |
assert dtp.src_export is None |
|
815 |
assert dtp.dest_import |
|
816 |
|
|
817 |
self.feedback_fn("%s is now listening, starting export" % dtp.data.name) |
|
818 |
|
|
819 |
# Start export on source node |
|
820 |
de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip, |
|
821 |
ie.listen_port, self.instance, |
|
822 |
dtp.data.src_io, dtp.data.src_ioargs, |
|
823 |
self.timeouts, self.src_cbs, private=dtp) |
|
824 |
ie.loop.Add(de) |
|
825 |
|
|
826 |
dtp.src_export = de |
|
827 |
|
|
828 |
def ReportConnected(self, ie, dtp): |
|
829 |
"""Called when a connection has been established. |
|
830 |
|
|
831 |
""" |
|
832 |
self.feedback_fn("%s is receiving data on %s" % |
|
833 |
(dtp.data.name, self.dest_node)) |
|
834 |
|
|
835 |
def ReportFinished(self, ie, dtp): |
|
836 |
"""Called when a transfer has finished. |
|
837 |
|
|
838 |
""" |
|
839 |
if ie.success: |
|
840 |
self.feedback_fn("%s finished receiving data" % dtp.data.name) |
|
841 |
else: |
|
842 |
self.feedback_fn("%s failed to receive data: %s (recent output: %r)" % |
|
843 |
(dtp.data.name, ie.final_message, ie.recent_output)) |
|
844 |
|
|
845 |
dtp.RecordResult(ie.success) |
|
846 |
|
|
847 |
# TODO: Check whether sending SIGTERM right away is okay, maybe we should |
|
848 |
# give the daemon a moment to sort things out |
|
849 |
if dtp.src_export and not ie.success: |
|
850 |
dtp.src_export.Abort() |
|
851 |
|
|
852 |
|
|
853 |
class DiskTransfer(object): |
|
854 |
def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, |
|
855 |
finished_fn): |
|
856 |
"""Initializes this class. |
|
857 |
|
|
858 |
@type name: string |
|
859 |
@param name: User-visible name for this transfer (e.g. "disk/0") |
|
860 |
@param src_io: Source I/O type |
|
861 |
@param src_ioargs: Source I/O arguments |
|
862 |
@param dest_io: Destination I/O type |
|
863 |
@param dest_ioargs: Destination I/O arguments |
|
864 |
@type finished_fn: callable |
|
865 |
@param finished_fn: Function called once transfer has finished |
|
866 |
|
|
867 |
""" |
|
868 |
self.name = name |
|
869 |
|
|
870 |
self.src_io = src_io |
|
871 |
self.src_ioargs = src_ioargs |
|
872 |
|
|
873 |
self.dest_io = dest_io |
|
874 |
self.dest_ioargs = dest_ioargs |
|
875 |
|
|
876 |
self.finished_fn = finished_fn |
|
877 |
|
|
878 |
|
|
879 |
class _DiskTransferPrivate(object): |
|
880 |
def __init__(self, data, success): |
|
881 |
"""Initializes this class. |
|
882 |
|
|
883 |
@type data: L{DiskTransfer} |
|
884 |
@type success: bool |
|
885 |
|
|
886 |
""" |
|
887 |
self.data = data |
|
888 |
|
|
889 |
self.src_export = None |
|
890 |
self.dest_import = None |
|
891 |
|
|
892 |
self.success = success |
|
893 |
|
|
894 |
def RecordResult(self, success): |
|
895 |
"""Updates the status. |
|
896 |
|
|
897 |
One failed part will cause the whole transfer to fail. |
|
898 |
|
|
899 |
""" |
|
900 |
self.success = self.success and success |
|
901 |
|
|
902 |
|
|
903 |
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, |
|
904 |
instance, all_transfers): |
|
905 |
"""Transfers an instance's data from one node to another. |
|
906 |
|
|
907 |
@param lu: Logical unit instance |
|
908 |
@param feedback_fn: Feedback function |
|
909 |
@type src_node: string |
|
910 |
@param src_node: Source node name |
|
911 |
@type dest_node: string |
|
912 |
@param dest_node: Destination node name |
|
913 |
@type dest_ip: string |
|
914 |
@param dest_ip: IP address of destination node |
|
915 |
@type instance: L{objects.Instance} |
|
916 |
@param instance: Instance object |
|
917 |
@type all_transfers: list of L{DiskTransfer} instances |
|
918 |
@param all_transfers: List of all disk transfers to be made |
|
919 |
@rtype: list |
|
920 |
@return: List with a boolean (True=successful, False=failed) for success for |
|
921 |
each transfer |
|
922 |
|
|
923 |
""" |
|
924 |
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) |
|
925 |
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, |
|
926 |
src_node, None, dest_node, dest_ip) |
|
927 |
dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, |
|
928 |
src_node, src_cbs, dest_node, dest_ip) |
|
929 |
|
|
930 |
all_dtp = [] |
|
931 |
|
|
932 |
ieloop = ImportExportLoop(lu) |
|
933 |
try: |
|
934 |
for transfer in all_transfers: |
|
935 |
if transfer: |
|
936 |
feedback_fn("Exporting %s from %s to %s" % |
|
937 |
(transfer.name, src_node, dest_node)) |
|
938 |
|
|
939 |
dtp = _DiskTransferPrivate(transfer, True) |
|
940 |
|
|
941 |
di = DiskImport(lu, dest_node, None, None, instance, |
|
942 |
transfer.dest_io, transfer.dest_ioargs, |
|
943 |
timeouts, dest_cbs, private=dtp) |
|
944 |
ieloop.Add(di) |
|
945 |
|
|
946 |
dtp.dest_import = di |
|
947 |
else: |
|
948 |
dtp = _DiskTransferPrivate(None, False) |
|
949 |
|
|
950 |
all_dtp.append(dtp) |
|
951 |
|
|
952 |
ieloop.Run() |
|
953 |
finally: |
|
954 |
ieloop.FinalizeAll() |
|
955 |
|
|
956 |
assert len(all_dtp) == len(all_transfers) |
|
957 |
assert compat.all([(dtp.src_export is None or |
|
958 |
dtp.src_export.success is not None) and |
|
959 |
(dtp.dest_import is None or |
|
960 |
dtp.dest_import.success is not None) |
|
961 |
for dtp in all_dtp]), \ |
|
962 |
"Not all imports/exports are finalized" |
|
963 |
|
|
964 |
return [bool(dtp.success) for dtp in all_dtp] |
Also available in: Unified diff