Revision 262db388 nbd.c

b/nbd.c
20 20
#include "block.h"
21 21
#include "block_int.h"
22 22

  
23
#include "qemu-coroutine.h"
24

  
23 25
#include <errno.h>
24 26
#include <string.h>
25 27
#ifndef _WIN32
......
607 609

  
608 610
    NBDExport *exp;
609 611
    int sock;
612

  
613
    Coroutine *recv_coroutine;
614

  
615
    CoMutex send_lock;
616
    Coroutine *send_coroutine;
610 617
};
611 618

  
612 619
static void nbd_client_get(NBDClient *client)
......
681 688
    g_free(exp);
682 689
}
683 690

  
684
static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
691
static void nbd_read(void *opaque);
692
static void nbd_restart_write(void *opaque);
693

  
694
static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
685 695
                             int len)
686 696
{
687 697
    NBDClient *client = req->client;
688 698
    int csock = client->sock;
689 699
    int rc, ret;
690 700

  
701
    qemu_co_mutex_lock(&client->send_lock);
702
    qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client);
703
    client->send_coroutine = qemu_coroutine_self();
704

  
691 705
    if (!len) {
692 706
        rc = nbd_send_reply(csock, reply);
693 707
        if (rc == -1) {
......
697 711
        socket_set_cork(csock, 1);
698 712
        rc = nbd_send_reply(csock, reply);
699 713
        if (rc != -1) {
700
            ret = write_sync(csock, req->data, len);
714
            ret = qemu_co_send(csock, req->data, len);
701 715
            if (ret != len) {
702 716
                errno = EIO;
703 717
                rc = -1;
......
708 722
        }
709 723
        socket_set_cork(csock, 0);
710 724
    }
725

  
726
    client->send_coroutine = NULL;
727
    qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
728
    qemu_co_mutex_unlock(&client->send_lock);
711 729
    return rc;
712 730
}
713 731

  
714
static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
732
static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
715 733
{
716 734
    NBDClient *client = req->client;
717 735
    int csock = client->sock;
718 736
    int rc;
719 737

  
738
    client->recv_coroutine = qemu_coroutine_self();
720 739
    if (nbd_receive_request(csock, request) == -1) {
721 740
        rc = -EIO;
722 741
        goto out;
......
741 760
    if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) {
742 761
        TRACE("Reading %u byte(s)", request->len);
743 762

  
744
        if (read_sync(csock, req->data, request->len) != request->len) {
763
        if (qemu_co_recv(csock, req->data, request->len) != request->len) {
745 764
            LOG("reading from socket failed");
746 765
            rc = -EIO;
747 766
            goto out;
......
750 769
    rc = 0;
751 770

  
752 771
out:
772
    client->recv_coroutine = NULL;
753 773
    return rc;
754 774
}
755 775

  
756
static int nbd_trip(NBDClient *client)
776
static void nbd_trip(void *opaque)
757 777
{
778
    NBDClient *client = opaque;
758 779
    NBDRequest *req = nbd_request_get(client);
759 780
    NBDExport *exp = client->exp;
760 781
    struct nbd_request request;
761 782
    struct nbd_reply reply;
762
    int rc = -1;
763 783
    int ret;
764 784

  
765 785
    TRACE("Reading request.");
766 786

  
767
    ret = nbd_do_receive_request(req, &request);
787
    ret = nbd_co_receive_request(req, &request);
768 788
    if (ret == -EIO) {
769 789
        goto out;
770 790
    }
......
799 819
        }
800 820

  
801 821
        TRACE("Read %u byte(s)", request.len);
802
        if (nbd_do_send_reply(req, &reply, request.len) < 0)
822
        if (nbd_co_send_reply(req, &reply, request.len) < 0)
803 823
            goto out;
804 824
        break;
805 825
    case NBD_CMD_WRITE:
......
822 842
        }
823 843

  
824 844
        if (request.type & NBD_CMD_FLAG_FUA) {
825
            ret = bdrv_flush(exp->bs);
845
            ret = bdrv_co_flush(exp->bs);
826 846
            if (ret < 0) {
827 847
                LOG("flush failed");
828 848
                reply.error = -ret;
......
830 850
            }
831 851
        }
832 852

  
833
        if (nbd_do_send_reply(req, &reply, 0) < 0)
853
        if (nbd_co_send_reply(req, &reply, 0) < 0)
834 854
            goto out;
835 855
        break;
836 856
    case NBD_CMD_DISC:
837 857
        TRACE("Request type is DISCONNECT");
838 858
        errno = 0;
839
        return 1;
859
        goto out;
840 860
    case NBD_CMD_FLUSH:
841 861
        TRACE("Request type is FLUSH");
842 862

  
843
        ret = bdrv_flush(exp->bs);
863
        ret = bdrv_co_flush(exp->bs);
844 864
        if (ret < 0) {
845 865
            LOG("flush failed");
846 866
            reply.error = -ret;
847 867
        }
848 868

  
849
        if (nbd_do_send_reply(req, &reply, 0) < 0)
869
        if (nbd_co_send_reply(req, &reply, 0) < 0)
850 870
            goto out;
851 871
        break;
852 872
    case NBD_CMD_TRIM:
853 873
        TRACE("Request type is TRIM");
854
        ret = bdrv_discard(exp->bs, (request.from + exp->dev_offset) / 512,
855
                           request.len / 512);
874
        ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512,
875
                              request.len / 512);
856 876
        if (ret < 0) {
857 877
            LOG("discard failed");
858 878
            reply.error = -ret;
859 879
        }
860
        if (nbd_do_send_reply(req, &reply, 0) < 0)
880
        if (nbd_co_send_reply(req, &reply, 0) < 0)
861 881
            goto out;
862 882
        break;
863 883
    default:
......
865 885
    invalid_request:
866 886
        reply.error = -EINVAL;
867 887
    error_reply:
868
        if (nbd_do_send_reply(req, &reply, 0) == -1)
888
        if (nbd_co_send_reply(req, &reply, 0) == -1)
869 889
            goto out;
870 890
        break;
871 891
    }
872 892

  
873 893
    TRACE("Request/Reply complete");
874 894

  
875
    rc = 0;
895
    nbd_request_put(req);
896
    return;
897

  
876 898
out:
877 899
    nbd_request_put(req);
878
    return rc;
900
    nbd_client_close(client);
879 901
}
880 902

  
881 903
static void nbd_read(void *opaque)
882 904
{
883 905
    NBDClient *client = opaque;
884 906

  
885
    if (nbd_trip(client) != 0) {
886
        nbd_client_close(client);
907
    if (client->recv_coroutine) {
908
        qemu_coroutine_enter(client->recv_coroutine, NULL);
909
    } else {
910
        qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
887 911
    }
888 912
}
889 913

  
914
static void nbd_restart_write(void *opaque)
915
{
916
    NBDClient *client = opaque;
917

  
918
    qemu_coroutine_enter(client->send_coroutine, NULL);
919
}
920

  
890 921
NBDClient *nbd_client_new(NBDExport *exp, int csock,
891 922
                          void (*close)(NBDClient *))
892 923
{
......
899 930
    client->exp = exp;
900 931
    client->sock = csock;
901 932
    client->close = close;
933
    qemu_co_mutex_init(&client->send_lock);
902 934
    qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
903 935
    return client;
904 936
}

Also available in: Unified diff