Revision 5d97d6dd

b/lib/masterd/instance.py
747 747
      success = diskie.Finalize() and success
748 748

  
749 749
    return success
750

  
751

  
752
class _TransferInstCbBase(ImportExportCbBase):
753
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
754
               dest_node, dest_ip):
755
    """Initializes this class.
756

  
757
    """
758
    ImportExportCbBase.__init__(self)
759

  
760
    self.lu = lu
761
    self.feedback_fn = feedback_fn
762
    self.instance = instance
763
    self.timeouts = timeouts
764
    self.src_node = src_node
765
    self.src_cbs = src_cbs
766
    self.dest_node = dest_node
767
    self.dest_ip = dest_ip
768

  
769

  
770
class _TransferInstSourceCb(_TransferInstCbBase):
771
  def ReportConnected(self, ie, dtp):
772
    """Called when a connection has been established.
773

  
774
    """
775
    assert self.src_cbs is None
776
    assert dtp.src_export == ie
777
    assert dtp.dest_import
778

  
779
    self.feedback_fn("%s is sending data on %s" %
780
                     (dtp.data.name, ie.node_name))
781

  
782
  def ReportFinished(self, ie, dtp):
783
    """Called when a transfer has finished.
784

  
785
    """
786
    assert self.src_cbs is None
787
    assert dtp.src_export == ie
788
    assert dtp.dest_import
789

  
790
    if ie.success:
791
      self.feedback_fn("%s finished sending data" % dtp.data.name)
792
    else:
793
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
794
                       (dtp.data.name, ie.final_message, ie.recent_output))
795

  
796
    dtp.RecordResult(ie.success)
797

  
798
    cb = dtp.data.finished_fn
799
    if cb:
800
      cb()
801

  
802
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
803
    # give the daemon a moment to sort things out
804
    if dtp.dest_import and not ie.success:
805
      dtp.dest_import.Abort()
806

  
807

  
808
class _TransferInstDestCb(_TransferInstCbBase):
809
  def ReportListening(self, ie, dtp):
810
    """Called when daemon started listening.
811

  
812
    """
813
    assert self.src_cbs
814
    assert dtp.src_export is None
815
    assert dtp.dest_import
816

  
817
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
818

  
819
    # Start export on source node
820
    de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
821
                    ie.listen_port, self.instance,
822
                    dtp.data.src_io, dtp.data.src_ioargs,
823
                    self.timeouts, self.src_cbs, private=dtp)
824
    ie.loop.Add(de)
825

  
826
    dtp.src_export = de
827

  
828
  def ReportConnected(self, ie, dtp):
829
    """Called when a connection has been established.
830

  
831
    """
832
    self.feedback_fn("%s is receiving data on %s" %
833
                     (dtp.data.name, self.dest_node))
834

  
835
  def ReportFinished(self, ie, dtp):
836
    """Called when a transfer has finished.
837

  
838
    """
839
    if ie.success:
840
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
841
    else:
842
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
843
                       (dtp.data.name, ie.final_message, ie.recent_output))
844

  
845
    dtp.RecordResult(ie.success)
846

  
847
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
848
    # give the daemon a moment to sort things out
849
    if dtp.src_export and not ie.success:
850
      dtp.src_export.Abort()
851

  
852

  
853
class DiskTransfer(object):
854
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
855
               finished_fn):
856
    """Initializes this class.
857

  
858
    @type name: string
859
    @param name: User-visible name for this transfer (e.g. "disk/0")
860
    @param src_io: Source I/O type
861
    @param src_ioargs: Source I/O arguments
862
    @param dest_io: Destination I/O type
863
    @param dest_ioargs: Destination I/O arguments
864
    @type finished_fn: callable
865
    @param finished_fn: Function called once transfer has finished
866

  
867
    """
868
    self.name = name
869

  
870
    self.src_io = src_io
871
    self.src_ioargs = src_ioargs
872

  
873
    self.dest_io = dest_io
874
    self.dest_ioargs = dest_ioargs
875

  
876
    self.finished_fn = finished_fn
877

  
878

  
879
class _DiskTransferPrivate(object):
880
  def __init__(self, data, success):
881
    """Initializes this class.
882

  
883
    @type data: L{DiskTransfer}
884
    @type success: bool
885

  
886
    """
887
    self.data = data
888

  
889
    self.src_export = None
890
    self.dest_import = None
891

  
892
    self.success = success
893

  
894
  def RecordResult(self, success):
895
    """Updates the status.
896

  
897
    One failed part will cause the whole transfer to fail.
898

  
899
    """
900
    self.success = self.success and success
901

  
902

  
903
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
904
                         instance, all_transfers):
905
  """Transfers an instance's data from one node to another.
906

  
907
  @param lu: Logical unit instance
908
  @param feedback_fn: Feedback function
909
  @type src_node: string
910
  @param src_node: Source node name
911
  @type dest_node: string
912
  @param dest_node: Destination node name
913
  @type dest_ip: string
914
  @param dest_ip: IP address of destination node
915
  @type instance: L{objects.Instance}
916
  @param instance: Instance object
917
  @type all_transfers: list of L{DiskTransfer} instances
918
  @param all_transfers: List of all disk transfers to be made
919
  @rtype: list
920
  @return: List with a boolean (True=successful, False=failed) for success for
921
           each transfer
922

  
923
  """
924
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
925
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
926
                                  src_node, None, dest_node, dest_ip)
927
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
928
                                 src_node, src_cbs, dest_node, dest_ip)
929

  
930
  all_dtp = []
931

  
932
  ieloop = ImportExportLoop(lu)
933
  try:
934
    for transfer in all_transfers:
935
      if transfer:
936
        feedback_fn("Exporting %s from %s to %s" %
937
                    (transfer.name, src_node, dest_node))
938

  
939
        dtp = _DiskTransferPrivate(transfer, True)
940

  
941
        di = DiskImport(lu, dest_node, None, None, instance,
942
                        transfer.dest_io, transfer.dest_ioargs,
943
                        timeouts, dest_cbs, private=dtp)
944
        ieloop.Add(di)
945

  
946
        dtp.dest_import = di
947
      else:
948
        dtp = _DiskTransferPrivate(None, False)
949

  
950
      all_dtp.append(dtp)
951

  
952
    ieloop.Run()
953
  finally:
954
    ieloop.FinalizeAll()
955

  
956
  assert len(all_dtp) == len(all_transfers)
957
  assert compat.all([(dtp.src_export is None or
958
                      dtp.src_export.success is not None) and
959
                     (dtp.dest_import is None or
960
                      dtp.dest_import.success is not None)
961
                     for dtp in all_dtp]), \
962
         "Not all imports/exports are finalized"
963

  
964
  return [bool(dtp.success) for dtp in all_dtp]

Also available in: Unified diff