added speer skeletor
authorGiannakos Filippos <philipgian@cslab.ece.ntua.gr>
Fri, 5 Oct 2012 16:25:24 +0000 (19:25 +0300)
committerGiannakos Filippos <philipgian@cslab.ece.ntua.gr>
Fri, 5 Oct 2012 16:25:24 +0000 (19:25 +0300)
18 files changed:
xseg/launch
xseg/peers/kernel/xsegbd.c
xseg/peers/user/Makefile
xseg/peers/user/filed.c
xseg/peers/user/mapperd.c
xseg/peers/user/monitor.c
xseg/peers/user/mpeer.c
xseg/peers/user/mpeer.h
xseg/peers/user/mt-mapperd.c
xseg/peers/user/mt-vlmcd.c
xseg/peers/user/pfiled.c
xseg/peers/user/speer.c [new file with mode: 0644]
xseg/peers/user/vlmc-tool.c
xseg/peers/user/xseg-tool.c
xseg/sys/util.h
xseg/tools/vlmc
xseg/xseg/xseg.c
xseg/xseg/xseg.h

index 1c0ebeb..77da522 100755 (executable)
@@ -26,9 +26,9 @@ function parse_config {
        [ -z "${HOSTNAME}" ] && HOSTNAME=`hostname`
        [ -z "${XSEG_HOME}" ] && XSEG_HOME="/root/archip/xseg"
        [ -z "${MODULES_DIR}" ] && MODULES_DIR="${XSEG_HOME}/lib/kernel"
-       [ -z "${SPEC}" ] && SPEC="segdev:xsegbd:16:1024:12"
+       [ -z "${SPEC}" ] && SPEC="segdev:xsegbd:512:1024:12"
        [ -z "${REQS}" ] && REQS=512
-       [ -z "${PORTS}" ] && PORTS=16
+       [ -z "${PORTS}" ] && PORTS=512
        [ -z "${IMAGES}" ] && IMAGES="/home/user/archip/xseg/peers/user/foo"
        [ -z "${MAPS}" ] && MAPS="/home/user/archip/xseg/peers/user/foo"
        [ -z "${PITHOS}" ] && PITHOS="/srv/pithos/data/blocks"
@@ -42,10 +42,11 @@ function parse_config {
        [ -z "${CHRDEV_MAJOR}" ] && CHRDEV_MAJOR=60
        [ -z "${NR_OPS}" ] && NR_OPS=128
        [ -z "${BPORT}" ] && BPORT=0
-       [ -z "${VPORT}" ] && VPORT=2
+       [ -z "${VPORT_START}" ] && VPORT_START=4
+       [ -z "${VPORT_END}" ] && VPORT_END=200
        [ -z "${MPORT}" ] && MPORT=1
-       [ -z "${MBPORT}" ] && MBPORT=3
-       [ -z "${VTOOL}" ] && VTOOL=15
+       [ -z "${MBPORT}" ] && MBPORT=2
+       [ -z "${VTOOL}" ] && VTOOL=3
 }
 
 
@@ -116,15 +117,17 @@ function spawn_pfiledm {
 }
 
 function spawn_vlmcd {
-       pgrep -f "peers/user/mt-vlmcd" || \
-       "${XSEG_HOME}/peers/user/mt-vlmcd" -t 1 -p "$VPORT" -bp "$BPORT" -mp "$MPORT" -g \
+       pgrep -f "peers/user/st-vlmcd" || \
+       "${XSEG_HOME}/peers/user/st-vlmcd" -t 1 -sp "$VPORT_START" \
+       -ep "$VPORT_END" -bp "$BPORT" -mp "$MPORT" -g \
        "${SPEC}" -n ${NR_OPS} &> "${XSEG_LOGS}/vlmcd-${HOSTNAME}" &
 #      alloc_requests "$VPORT:0" 128
 }
 
 function spawn_mapperdc {
        pgrep -f "mt-mapperd" || \
-       "${XSEG_HOME}/peers/user/mt-mapperd" -t 1 -p "$MPORT" -bp "$BPORT" -mbp "$MBPORT" -g "${SPEC}"\
+       "${XSEG_HOME}/peers/user/mt-mapperd" -t 1 -sp "$MPORT" -ep "$MPORT"\
+       -bp "$BPORT" -mbp "$MBPORT" -g "${SPEC}"\
        -n ${NR_OPS} &> "${XSEG_LOGS}/mapperd-${HOSTNAME}" & 
 #      alloc_requests "$MPORT:0" 128
 }
@@ -291,7 +294,7 @@ doublemap)
                spawn_vlmcd
                ;;
        stop)
-               pkill -f peers/user/mt-vlmcd || true
+               pkill -f peers/user/st-vlmcd || true
 #              free_requests "$VPORT:0" 128 || true
                pkill -f peers/user/mt-mapperd || true
 #              free_requests "$MPORT:0" 128 || true
index 3a1479c..0e7feb9 100644 (file)
@@ -673,7 +673,7 @@ static void xseg_callback(xport portno)
 
        for (;;) {
                xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
-               xreq = xseg_receive(xsegbd_dev->xseg, portno);
+               xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
                if (!xreq)
                        break;
 
index 4d65688..e556e2d 100644 (file)
@@ -6,7 +6,7 @@ default: all
 
 #all: filed xseg sosd vlmcd mapperd
 #all: filed xseg vlmcd mapperd
-all: filed xseg mt-sosd dummy mt-vlmcd mapperd mt-mapperd pfiled monitor vlmc-xseg
+all: filed xseg mt-sosd dummy mt-vlmcd mapperd mt-mapperd pfiled monitor vlmc-xseg st-vlmcd
 
 
 filed: filed.c $(BASE)/xseg/xseg.h
@@ -36,6 +36,9 @@ monitor: monitor.c mpeer.c mpeer.h
 mt-vlmcd: mt-vlmcd.c mpeer.c mpeer.h $(BASE)/xseg/protocol.h 
        $(CC) $(CFLAGS) -o $@ $< mpeer.c $(INC) -L$(LIB) -lxseg -lpthread
 
+st-vlmcd: mt-vlmcd.c speer.c mpeer.h $(BASE)/xseg/protocol.h 
+       $(CC) $(CFLAGS) -o $@ $< speer.c $(INC) -L$(LIB) -lxseg -lpthread
+
 mt-mapperd: mt-mapperd.c mpeer.c mpeer.h $(BASE)/xseg/protocol.h 
        $(CC) $(CFLAGS) -o $@ $< mpeer.c $(INC) -L$(LIB) -lxseg -lpthread -lgcrypt
 
@@ -46,4 +49,4 @@ vlmc-xseg: vlmc-tool.c $(BASE)/xseg/xseg.h
        $(CC) $(CFLAGS) -o $@ $< $(INC) -L$(LIB) -lxseg
 
 clean:
-       rm -f filed xseg sosd vlmcd mapperd mt-sosd dummy monitor mt-mapperd pfiled vlmc-xseg
+       rm -f filed xseg sosd vlmcd mapperd mt-sosd dummy monitor mt-mapperd pfiled vlmc-xseg st-vlmcd
index 3db4dae..b9666d4 100644 (file)
@@ -562,7 +562,7 @@ void *io_loop(void *arg)
 
        for (;;) {
                accepted = NULL;
-               accepted = xseg_accept(xseg, portno);
+               accepted = xseg_accept(xseg, portno, 0);
                if (accepted) {
                        io->req = accepted;
                        wake_up_next_iothread(store);
index e74f16b..a58e838 100644 (file)
@@ -157,7 +157,7 @@ static int mapperd_loop(struct mapperd *mapperd)
                ret = xseg_prepare_wait(xseg, mportno);
                always_assert(ret == 0);
 
-               xreq = xseg_accept(xseg, mportno);
+               xreq = xseg_accept(xseg, mportno, 0);
                if (xreq) {
                        xseg_cancel_wait(xseg, mportno);
                        /*
index 70cc8b4..282ff30 100644 (file)
@@ -64,19 +64,19 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *xreq)
 int mpause(struct peerd *peer)
 {
        struct xseg *xseg = peer->xseg;
-       struct xseg_port *port = xseg_get_port(xseg, peer->portno);
+       struct xseg_port *port = xseg_get_port(xseg, peer->portno_start);
        if (!port)
                return -1;
        
-       xlock_acquire(&port->rq_lock, peer->portno);
-       xlock_acquire(&port->pq_lock, peer->portno);
+       xlock_acquire(&port->rq_lock, peer->portno_start);
+       xlock_acquire(&port->pq_lock, peer->portno_start);
        return 0;
 }
 
 int munpause(struct peerd *peer)
 {
        struct xseg *xseg = peer->xseg;
-       struct xseg_port *port = xseg_get_port(xseg, peer->portno);
+       struct xseg_port *port = xseg_get_port(xseg, peer->portno_start);
        if (!port)
                return -1;
        
index 830c073..b0905c5 100644 (file)
@@ -149,7 +149,7 @@ inline static int wake_up_next_thread(struct peerd *peer)
        //struct thread *t = alloc_thread(peer);
        //wake_up_thread(t);
        //return t;
-       return (xseg_signal(peer->xseg, peer->portno));
+       return (xseg_signal(peer->xseg, peer->portno_start));
 }
 
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
@@ -168,7 +168,7 @@ void fail(struct peerd *peer, struct peer_req *pr)
        XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
        req->state |= XS_FAILED;
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
-       p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
        xseg_signal(peer->xseg, p);
        free_peer_req(peer, pr);
        wake_up_next_thread(peer);
@@ -182,7 +182,7 @@ void complete(struct peerd *peer, struct peer_req *pr)
        req->state |= XS_SERVED;
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
        //gettimeofday(&resp_start, NULL);
-       p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
        //gettimeofday(&resp_end, NULL);
        //responds++;
        //timersub(&resp_end, &resp_start, &resp_end);
@@ -239,7 +239,7 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
                return -1;
        //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
        //gettimeofday(&sub_start, NULL);
-       ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        //gettimeofday(&sub_end, NULL);
        //submits++;
        //timersub(&sub_end, &sub_start, &sub_end);
@@ -268,22 +268,25 @@ static void* thread_loop(void *arg)
        struct thread *t = (struct thread *) arg;
        struct peerd *peer = t->peer;
        struct xseg *xseg = peer->xseg;
-       uint32_t portno = peer->portno;
+       xport portno_start = peer->portno_start;
+       xport portno_end = peer->portno_end;
        struct peer_req *pr;
-       uint64_t threshold=1000;
+       uint64_t threshold=1;
        pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
        struct xseg_request *accepted, *received;
        int r;
+       int change;
+       xport i;
                
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
 
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
-       xseg_init_local_signal(xseg, portno);
+       xseg_init_local_signal(xseg, peer->portno_start);
        for (;;) {
                if (t->func) {
                        XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
-                       xseg_cancel_wait(xseg, portno);
+                       xseg_cancel_wait(xseg, peer->portno_start);
                        t->func(t->arg);
                        t->func = NULL;
                        t->arg = NULL;
@@ -291,52 +294,53 @@ static void* thread_loop(void *arg)
                }
 
                for(loops= threshold; loops > 0; loops--) {
-                       accepted = NULL;
-                       received = NULL;
-                       if (loops == 1)
-                               xseg_prepare_wait(xseg, portno);
-
-//                     if (xq_count(&peer->xport->request_queue)){
-                               pr = alloc_peer_req(peer);
-                               if (pr) {
-                                       accepted = xseg_accept(xseg, peer->portno);
-                                       if (accepted) {
-                                               XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
-                                               pr->req = accepted;
-                                               xseg_cancel_wait(xseg, portno);
-                                               wake_up_next_thread(peer);
-                                               handle_accepted(peer, pr, accepted);
-                                               loops = threshold;
+                       do {
+                               if (loops == 1)
+                                       xseg_prepare_wait(xseg, peer->portno_start);
+                               change = 0;
+                               for (i = portno_start; i <= portno_end; i++) {
+                                       accepted = NULL;
+                                       received = NULL;
+                                       pr = alloc_peer_req(peer);
+                                       if (pr) {
+                                               accepted = xseg_accept(xseg, i, X_NONBLOCK);
+                                               if (accepted) {
+                                                       XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
+                                                       pr->req = accepted;
+                                                       pr->portno = i;
+                                                       xseg_cancel_wait(xseg, i);
+                                                       wake_up_next_thread(peer);
+                                                       handle_accepted(peer, pr, accepted);
+                                                       change = 1;
+                                               }
+                                               else {
+                                                       free_peer_req(peer, pr);
+                                               }
                                        }
-                                       else {
-                                               free_peer_req(peer, pr);
+                                       received = xseg_receive(xseg, i, X_NONBLOCK);
+                                       if (received) {
+                                               //printf("received req id: %u\n", received - xseg->requests);
+                                               //print_req(peer->xseg, received);
+                                               r =  xseg_get_req_data(xseg, received, (void **) &pr);
+                                               if (r < 0 || !pr){
+                                                       //FIXME what to do here ?
+                                                       XSEGLOG2(&lc, W, "Received request with no pr data\n");
+                                                       xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+                                                       //FIXME if fails, put req
+                                               }
+                                               xseg_cancel_wait(xseg, i);
+                                               wake_up_next_thread(peer);
+                                               handle_received(peer, pr, received);
+                                               change = 1;
                                        }
                                }
-//                     }
-//                     if (xq_count(&peer->xport->reply_queue)){
-                               received = xseg_receive(xseg, peer->portno);
-                               if (received) {
-                                       //printf("received req id: %u\n", received - xseg->requests);
-                                       //print_req(peer->xseg, received);
-                                       r =  xseg_get_req_data(xseg, received, (void **) &pr);
-                                       if (r < 0 || !pr){
-                                               //FIXME what to do here ?
-                                               XSEGLOG2(&lc, W, "Received request with no pr data\n");
-                                               xseg_respond(peer->xseg, received, peer->portno, X_ALLOC);
-                                               //if fails, put req
-                                       }
-                                       //fail(peer, received);
-                                       //assert pr->req == received;
-                                       xseg_cancel_wait(xseg, portno);
-                                       wake_up_next_thread(peer);
-                                       handle_received(peer, pr, received);
+                               if (change)
                                        loops = threshold;
-                               }
-//                     }
+                       }while (change);
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
                xseg_wait_signal(xseg, 10000000UL);
-               xseg_cancel_wait(xseg, portno);
+               xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
        return NULL;
@@ -391,7 +395,8 @@ int peerd_start_threads(struct peerd *peer)
        return 0;
 }
 
-static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno)
+static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
+                       long portno_end, uint32_t nr_threads, uint32_t defer_portno)
 {
        int i;
        struct peerd *peer;
@@ -427,15 +432,26 @@ malloc_fail:
        if (!peer->xseg) 
                return NULL;
 
-       peer->xport = xseg_bind_port(peer->xseg, portno, NULL);
-       if (!peer->xport){
-               printf("cannot bind to port %ld\n", portno);
+       peer->portno_start = (xport) portno_start;
+       peer->portno_end= (xport) portno_end;
+       peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+       if (!peer->port){
+               printf("cannot bind to port %ld\n", peer->portno_start);
                return NULL;
        }
-       printf("%lx\n", (unsigned long) peer->xport);
-       peer->portno = xseg_portno(peer->xseg, peer->xport);
-       printf("Peer on port %u/%u\n", peer->portno,
-                       peer->xseg->config.nr_ports);
+
+       xport p;
+       for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
+               struct xseg_port *tmp;
+               tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+               if (!tmp){
+                       printf("cannot bind to port %ld\n", p);
+                       return NULL;
+               }
+       }
+
+       printf("Peer on ports  %u-%u\n", peer->portno_start,
+                       peer->portno_end);
 
        for (i = 0; i < nr_ops; i++) {
                peer->peer_reqs[i].peer = peer;
@@ -454,7 +470,7 @@ int main(int argc, char *argv[])
        //parse args
        char *spec = "";
        int i, r;
-       long portno = -1;
+       long portno_start = -1, portno_end = -1;
        //set defaults here
        uint32_t nr_ops = 16;
        uint32_t nr_threads = 16 ;
@@ -474,8 +490,14 @@ int main(int argc, char *argv[])
                        continue;
                }
 
-               if (!strcmp(argv[i], "-p") && i + 1 < argc) {
-                       portno = strtoul(argv[i+1], NULL, 10);
+               if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
+                       portno_start = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+               
+               if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
+                       portno_end = strtoul(argv[i+1], NULL, 10);
                        i += 1;
                        continue;
                }
@@ -514,7 +536,7 @@ int main(int argc, char *argv[])
        verbose = debug_level;
 
        //TODO err check
-       peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
+       peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
        if (!peer)
                return -1;
        r = custom_peer_init(peer, argc, argv);
index 6226303..da4f6ed 100644 (file)
@@ -1,16 +1,19 @@
 #include <stddef.h>
+#include <xseg/xseg.h>
 /* main mpeer structs */
 struct peer_req {
        struct peerd *peer;
        struct xseg_request *req;
        ssize_t retval;
+       xport portno;
        void *priv;
 };
 
 struct peerd {
        struct xseg *xseg;
-       struct xseg_port *xport;
-       uint32_t portno;
+       struct xseg_port *port;
+       xport portno_start;
+       xport portno_end;
        long nr_ops;
        uint32_t nr_threads;
        uint32_t defer_portno;
index f51719d..289f8cd 100644 (file)
@@ -245,7 +245,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
                goto out_hash;
        
 
-       req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+       req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
                                m->volume);
@@ -272,7 +272,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
                                m->volume);
                goto out_put;
        }
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){ 
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                m->volume);
@@ -286,7 +286,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 
 out_fail:
        remove_map(mapper, m);
@@ -423,7 +423,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
 {
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
@@ -453,7 +453,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
                                "(Map: %s [%llu]",
@@ -473,7 +473,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
                        "(Map: %s [%llu]",
@@ -487,7 +487,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
        struct mapperd *mapper = __get_mapperd(peer);
        struct map_node *mn;
        uint64_t i, pos, max_objidx = calc_map_obj(map);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
@@ -526,7 +526,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
                                map->volume);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                map->volume);
@@ -543,7 +543,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
        return -1;
@@ -684,7 +684,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
                goto copyup_zeroblock;
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -707,7 +707,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        if (r<0)
                goto out_put;
        r = __set_copyup_node(mio, req, mn);
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort) {
                goto out_unset;
        }
@@ -722,7 +722,7 @@ out_unset:
        r = __set_copyup_node(mio, req, NULL);
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
        return -1;
@@ -776,7 +776,7 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_fail;
        
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_LOADING;
        XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
        uint64_t qsize = xq_count(&map->pending);
@@ -789,7 +789,7 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_LOADING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
@@ -804,7 +804,7 @@ out_err:
        strncpy(buf, target, req->targetlen);
        buf[req->targetlen] = 0;
        XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -828,7 +828,7 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
                goto out_fail;
        }
        
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_WRITING;
        XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
        uint64_t qsize = xq_count(&map->pending);
@@ -842,7 +842,7 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_WRITING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
@@ -857,7 +857,7 @@ out_err:
        strncpy(buf, target, req->targetlen);
        buf[req->targetlen] = 0;
        XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -1245,12 +1245,12 @@ static int handle_copyup(struct peerd *peer, struct peer_req *pr,
                goto out_fail;
        }
        mn->flags |= MF_OBJECT_WRITING;
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
        return 0;
 
 out_fail:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        __set_copyup_node(mio, req, NULL);
        while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
                struct peer_req * preq = (struct peer_req *) idx;
@@ -1297,7 +1297,7 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
        strncpy(mn->object, tmp.object, tmp.objectlen);
        mn->object[tmp.objectlen] = 0;
        mn->objectlen = tmp.objectlen;
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 
        XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
        uint64_t qsize = xq_count(&mn->pending);
@@ -1310,7 +1310,7 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
                fail(peer, preq);
@@ -1319,7 +1319,7 @@ out_fail:
 
 out_err:
        XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -1455,7 +1455,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
                return MF_PENDING;
        }
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -1472,7 +1472,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_put;
        __set_copyup_node(mio, req, mn);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -1483,7 +1483,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
        return -1;
@@ -1595,7 +1595,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -1612,7 +1612,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_put;
        __set_copyup_node(mio, req, NULL);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -1623,7 +1623,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
        return -1;
@@ -1695,7 +1695,7 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr,
                //map block delete
                map = find_map(mapper, target, req->targetlen);
                if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
+                       xseg_put_request(peer->xseg, req, pr->portno);
                        return -1;
                }
                handle_map_delete(peer, pr, map, err);
@@ -1703,12 +1703,12 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr,
                //object delete
                map = mn->map;
                if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
+                       xseg_put_request(peer->xseg, req, pr->portno);
                        return -1;
                }
                handle_object_delete(peer, pr, mn, err);
        }
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return 0;
 }
 
index 4c32209..d63947a 100644 (file)
@@ -74,7 +74,7 @@ static int handle_accepted(struct peerd *peer, struct peer_req *pr,
                return 0;               
        }
        vio->err = 0; //reset error state
-       vio->mreq = xseg_get_request(peer->xseg, peer->portno, 
+       vio->mreq = xseg_get_request(peer->xseg, pr->portno, 
                                        vlmc->mportno, X_ALLOC);
        if (!vio->mreq)
                goto out_err;
@@ -104,7 +104,7 @@ static int handle_accepted(struct peerd *peer, struct peer_req *pr,
        }
        xseg_set_req_data(peer->xseg, vio->mreq, pr);
        __set_vio_state(vio, MAPPING);
-       p = xseg_submit(peer->xseg, vio->mreq, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -118,7 +118,7 @@ static int handle_accepted(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+       xseg_put_request(peer->xseg, vio->mreq, pr->portno);
 out_err:
        __set_vio_state(vio, CONCLUDED);
        fail(peer, pr);
@@ -146,7 +146,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
        /* FIXME shouldn's XS_FAILED be sufficient ?? */
        if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
                fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
-               xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+               xseg_put_request(peer->xseg, vio->mreq, pr->portno);
                vio->mreq = NULL;
                __set_vio_state(vio, CONCLUDED);
                fail(peer, pr);
@@ -154,13 +154,12 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
                char *data = xseg_get_data(peer->xseg, pr->req);
                *(off_t *)data = xinfo->size;
-               xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+               xseg_put_request(peer->xseg, vio->mreq, pr->portno);
                vio->mreq = NULL;
                __set_vio_state(vio, CONCLUDED);
                complete(peer, pr);
        } else if (vio->mreq->op == X_CLOSE) {
-               struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
-               xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+               xseg_put_request(peer->xseg, vio->mreq, pr->portno);
                vio->mreq = NULL;
                __set_vio_state(vio, CONCLUDED);
                complete(peer, pr);
@@ -168,7 +167,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
                if (!mreply->cnt){
                        printf("foo2\n");
-                       xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+                       xseg_put_request(peer->xseg, vio->mreq, pr->portno);
                        vio->mreq = NULL;
                        __set_vio_state(vio, CONCLUDED);
                        fail(peer, pr);
@@ -178,7 +177,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
                if (!vio->breqs) {
                        printf("foo3\n");
-                       xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+                       xseg_put_request(peer->xseg, vio->mreq, pr->portno);
                        vio->mreq = NULL;
                        __set_vio_state(vio, CONCLUDED);
                        fail(peer, pr);
@@ -190,7 +189,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                        datalen = mreply->segs[i].size;
                        offset = mreply->segs[i].offset;
                        targetlen = mreply->segs[i].targetlen;
-                       breq = xseg_get_request(peer->xseg, peer->portno, vlmc->bportno, X_ALLOC);
+                       breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
                        if (!breq) {
                                vio->err = 1;
                                break;
@@ -198,7 +197,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                        r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
                        if (r < 0) {
                                vio->err = 1;
-                               xseg_put_request(peer->xseg, breq, peer->portno);
+                               xseg_put_request(peer->xseg, breq, pr->portno);
                                break;
                        }
                        breq->offset = offset;
@@ -207,26 +206,26 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                        target = xseg_get_target(peer->xseg, breq);
                        if (!target) {
                                vio->err = 1;
-                               xseg_put_request(peer->xseg, breq, peer->portno);
+                               xseg_put_request(peer->xseg, breq, pr->portno);
                                break;
                        }
                        strncpy(target, mreply->segs[i].target, targetlen);
                        r = xseg_set_req_data(peer->xseg, breq, pr);
                        if (r<0) {
                                vio->err = 1;
-                               xseg_put_request(peer->xseg, breq, peer->portno);
+                               xseg_put_request(peer->xseg, breq, pr->portno);
                                break;
                        }
 
                        // this should work, right ?
                        breq->data = pr->req->data + pos;
                        pos += datalen;
-                       p = xseg_submit(peer->xseg, breq, peer->portno, X_ALLOC);
+                       p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
                        if (p == NoPort){
                                void *dummy;
                                vio->err = 1;
                                xseg_get_req_data(peer->xseg, breq, &dummy);
-                               xseg_put_request(peer->xseg, breq, peer->portno);
+                               xseg_put_request(peer->xseg, breq, pr->portno);
                                break;
                        }
                        r = xseg_signal(peer->xseg, p);
@@ -236,7 +235,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr,
                        vio->breqs[i] = breq;
                }
                vio->breq_cnt = i;
-               xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+               xseg_put_request(peer->xseg, vio->mreq, pr->portno);
                vio->mreq = NULL;
                if (i == 0) {
                        printf("foo4\n");
@@ -269,7 +268,7 @@ static int handle_serving(struct peerd *peer, struct peer_req *pr,
                //assert breq->serviced == breq->size
                __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
        }
-       xseg_put_request(peer->xseg, breq, peer->portno);
+       xseg_put_request(peer->xseg, breq, pr->portno);
 
        if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
                __set_vio_state(vio, CONCLUDED);
index c80599d..51a9d38 100644 (file)
@@ -618,7 +618,7 @@ void *io_loop(void *arg)
 
        for (;;) {
                accepted = NULL;
-               accepted = xseg_accept(xseg, portno);
+               accepted = xseg_accept(xseg, portno, 0);
                if (accepted) {
                        io->req = accepted;
                        wake_up_next_iothread(pfiled);
diff --git a/xseg/peers/user/speer.c b/xseg/peers/user/speer.c
new file mode 100644 (file)
index 0000000..c84b4e9
--- /dev/null
@@ -0,0 +1,540 @@
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <xseg/xseg.h>
+#include <pthread.h>
+#include <mpeer.h>
+#include <sys/syscall.h>
+#include <sys/time.h>
+#include <signal.h>
+
+#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
+#define LOG(level, ...)                                              \
+               do {                                                               \
+                       if (level <=  verbose) {                           \
+                               fprintf(stderr, "%s: "  REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
+                       }                                                          \
+               }while (0)
+
+
+unsigned int verbose = 0;
+struct log_ctx lc;
+
+struct thread {
+       struct peerd *peer;
+       pthread_t tid;
+       pthread_cond_t cond;
+       pthread_mutex_t lock;
+       void (*func)(void *arg);
+       void *arg;
+};
+
+
+inline int canDefer(struct peerd *peer)
+{
+       return !(peer->defer_portno == NoPort);
+}
+
+void print_req(struct xseg *xseg, struct xseg_request *req)
+{
+       char target[64], data[64];
+       char *req_target, *req_data;
+       unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
+       req_target = xseg_get_target(xseg, req);
+       req_data = xseg_get_data(xseg, req);
+
+       if (1) {
+               strncpy(target, req_target, end);
+               target[end] = 0;
+               strncpy(data, req_data, 63);
+               data[63] = 0;
+               printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
+                               "src: %u, st: %u, dst: %u dt: %u\n"
+                               "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
+                               (unsigned long)(req),
+                               (unsigned int)req->op,
+                               (unsigned long long)req->offset,
+                               (unsigned long)req->size,
+                               (unsigned long)req->serviced,
+                               (unsigned int)req->state,
+                               (unsigned int)req->src_portno,
+                               (unsigned int)req->src_transit_portno,
+                               (unsigned int)req->dst_portno,
+                               (unsigned int)req->dst_transit_portno,
+                               (unsigned int)req->targetlen, target,
+                               (unsigned long long)req->datalen, data);
+       }
+}
+void log_pr(char *msg, struct peer_req *pr)
+{
+       char target[64], data[64];
+       char *req_target, *req_data;
+       struct peerd *peer = pr->peer;
+       struct xseg *xseg = pr->peer->xseg;
+       req_target = xseg_get_target(xseg, pr->req);
+       req_data = xseg_get_data(xseg, pr->req);
+       /* null terminate name in case of req->target is less than 63 characters,
+        * and next character after name (aka first byte of next buffer) is not
+        * null
+        */
+       unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
+       if (verbose) {
+               strncpy(target, req_target, end);
+               target[end] = 0;
+               strncpy(data, req_data, 63);
+               data[63] = 0;
+               printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
+                               "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
+                               msg,
+                               (unsigned int)(pr - peer->peer_reqs),
+                               (unsigned int)pr->req->op,
+                               (unsigned long long)pr->req->offset,
+                               (unsigned long)pr->req->size,
+                               (unsigned long)pr->req->serviced,
+                               (unsigned long)pr->retval,
+                               (unsigned int)pr->req->state,
+                               (unsigned int)pr->req->targetlen, target,
+                               (unsigned long long)pr->req->datalen, data);
+       }
+}
+
+inline struct peer_req *alloc_peer_req(struct peerd *peer)
+{
+       xqindex idx = xq_pop_head(&peer->free_reqs, 1);
+       if (idx == Noneidx)
+               return NULL;
+       return peer->peer_reqs + idx;
+}
+
+inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
+{
+       xqindex idx = pr - peer->peer_reqs;
+       pr->req = NULL;
+       xq_append_head(&peer->free_reqs, idx, 1);
+}
+
+inline static struct thread* alloc_thread(struct peerd *peer)
+{
+       xqindex idx = xq_pop_head(&peer->threads, 1);
+       if (idx == Noneidx)
+               return NULL;
+       return peer->thread + idx;
+}
+
+inline static void free_thread(struct peerd *peer, struct thread *t)
+{
+       xqindex idx = t - peer->thread;
+       xq_append_head(&peer->threads, idx, 1);
+}
+
+
+inline static void __wake_up_thread(struct thread *t)
+{
+       pthread_mutex_lock(&t->lock);
+       pthread_cond_signal(&t->cond);
+       pthread_mutex_unlock(&t->lock);
+}
+
+inline static void wake_up_thread(struct thread* t)
+{
+       if (t){
+               __wake_up_thread(t);
+       }
+}
+
+inline static int wake_up_next_thread(struct peerd *peer)
+{
+       //struct thread *t = alloc_thread(peer);
+       //wake_up_thread(t);
+       //return t;
+       return (xseg_signal(peer->xseg, peer->portno_start));
+}
+
+struct timeval resp_start, resp_end, resp_accum = {0, 0};
+uint64_t responds = 0;
+void get_responds_stats(){
+               printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
+                               //(unsigned int)(t - peer->thread),
+                               resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
+}
+
+//FIXME error check
+void fail(struct peerd *peer, struct peer_req *pr)
+{
+       struct xseg_request *req = pr->req;
+       uint32_t p;
+       XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
+       req->state |= XS_FAILED;
+       //xseg_set_req_data(peer->xseg, pr->req, NULL);
+       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+       xseg_signal(peer->xseg, p);
+       free_peer_req(peer, pr);
+       wake_up_next_thread(peer);
+}
+
+//FIXME error check
+void complete(struct peerd *peer, struct peer_req *pr)
+{
+       struct xseg_request *req = pr->req;
+       uint32_t p;
+       req->state |= XS_SERVED;
+       //xseg_set_req_data(peer->xseg, pr->req, NULL);
+       //gettimeofday(&resp_start, NULL);
+       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+       //gettimeofday(&resp_end, NULL);
+       //responds++;
+       //timersub(&resp_end, &resp_start, &resp_end);
+       //timeradd(&resp_end, &resp_accum, &resp_accum);
+       //printf("xseg_signal: %u\n", p);
+       xseg_signal(peer->xseg, p);
+       free_peer_req(peer, pr);
+       wake_up_next_thread(peer);
+}
+
+void pending(struct peerd *peer, struct peer_req *pr)
+{
+               pr->req->state = XS_PENDING;
+}
+
+static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
+                               struct xseg_request *req)
+{
+       struct xseg_request *xreq = pr->req;
+       //assert xreq == req;
+       XSEGLOG2(&lc, D, "Handle accepted");
+       xreq->serviced = 0;
+       //xreq->state = XS_ACCEPTED;
+       pr->retval = 0;
+       dispatch(peer, pr, req);
+}
+
+static void handle_received(struct peerd *peer, struct peer_req *pr,
+                               struct xseg_request *req)
+{
+       //struct xseg_request *req = pr->req;
+       //assert req->state != XS_ACCEPTED;
+       XSEGLOG2(&lc, D, "Handle received \n");
+       dispatch(peer, pr, req);
+
+}
+struct timeval sub_start, sub_end, sub_accum = {0, 0};
+uint64_t submits = 0;
+void get_submits_stats(){
+               printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
+                               //(unsigned int)(t - peer->thread),
+                               sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
+}
+
+int submit_peer_req(struct peerd *peer, struct peer_req *pr)
+{
+       uint32_t ret;
+       struct xseg_request *req = pr->req;
+       // assert req->portno == peer->portno ?
+       //TODO small function with error checking
+       XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
+       ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
+       if (ret < 0)
+               return -1;
+       //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
+       //gettimeofday(&sub_start, NULL);
+       ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
+       //gettimeofday(&sub_end, NULL);
+       //submits++;
+       //timersub(&sub_end, &sub_start, &sub_end);
+       //timeradd(&sub_end, &sub_accum, &sub_accum);
+       if (ret == NoPort)
+               return -1;
+       xseg_signal(peer->xseg, ret);
+       return 0;
+}
+
+int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
+{
+       struct thread *t = alloc_thread(peer);
+       if (t) {
+               t->func = func;
+               t->arg = arg;
+               wake_up_thread(t);
+               return 0;
+       } else
+               // we could hijack a thread
+               return -1;
+}
+
+static void* thread_loop(void *arg)
+{
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       struct xseg *xseg = peer->xseg;
+       xport portno_start = peer->portno_start;
+       xport portno_end = peer->portno_end;
+       struct peer_req *pr;
+       uint64_t threshold=1;
+       pid_t pid =syscall(SYS_gettid);
+       uint64_t loops;
+       struct xseg_request *accepted, *received;
+       int r;
+       int change;
+       xport i;
+               
+       XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
+
+       XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
+       xseg_init_local_signal(xseg, peer->portno_start);
+       for (;;) {
+               if (t->func) {
+                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+                       xseg_cancel_wait(xseg, peer->portno_start);
+                       t->func(t->arg);
+                       t->func = NULL;
+                       t->arg = NULL;
+                       continue;
+               }
+
+               for(loops= threshold; loops > 0; loops--) {
+                       do {
+                               if (loops == 1)
+                                       xseg_prepare_wait(xseg, peer->portno_start);
+                               change = 0;
+                               for (i = portno_start; i <= portno_end; i++) {
+                                       accepted = NULL;
+                                       received = NULL;
+                                       pr = alloc_peer_req(peer);
+                                       if (pr) {
+                                               accepted = xseg_accept(xseg, i, X_NONBLOCK);
+                                               if (accepted) {
+                                                       XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
+                                                       pr->req = accepted;
+                                                       pr->portno = i;
+                                                       xseg_cancel_wait(xseg, i);
+                                                       handle_accepted(peer, pr, accepted);
+                                                       change = 1;
+                                               }
+                                               else {
+                                                       free_peer_req(peer, pr);
+                                               }
+                                       }
+                                       received = xseg_receive(xseg, i, X_NONBLOCK);
+                                       if (received) {
+                                               //printf("received req id: %u\n", received - xseg->requests);
+                                               //print_req(peer->xseg, received);
+                                               r =  xseg_get_req_data(xseg, received, (void **) &pr);
+                                               if (r < 0 || !pr){
+                                                       //FIXME what to do here ?
+                                                       XSEGLOG2(&lc, W, "Received request with no pr data\n");
+                                                       xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+                                                       //FIXME if fails, put req
+                                               }
+                                               xseg_cancel_wait(xseg, i);
+                                               handle_received(peer, pr, received);
+                                               change = 1;
+                                       }
+                               }
+                               if (change)
+                                       loops = threshold;
+                       }while (change);
+               }
+               XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
+               xseg_wait_signal(xseg, 10000000UL);
+               xseg_cancel_wait(xseg, peer->portno_start);
+               XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
+       }
+       return NULL;
+}
+
+void defer_request(struct peerd *peer, struct peer_req *pr)
+{
+       // assert canDefer(peer);
+//     xseg_submit(peer->xseg, peer->defer_portno, pr->req);
+//     xseg_signal(peer->xseg, peer->defer_portno);
+//     free_peer_req(peer, pr);
+}
+
+static int peerd_loop(struct peerd *peer) 
+{
+       if (peer->interactive_func)
+               peer->interactive_func();
+       for (;;) {
+               pthread_join(peer->thread[0].tid, NULL);
+       }
+       return 0;
+}
+
+static struct xseg *join(char *spec)
+{
+       struct xseg_config config;
+       struct xseg *xseg;
+
+       (void)xseg_parse_spec(spec, &config);
+       xseg = xseg_join(config.type, config.name, "posix", NULL);
+       if (xseg)
+               return xseg;
+
+       (void)xseg_create(&config);
+       return xseg_join(config.type, config.name, "posix", NULL);
+}
+
+int peerd_start_threads(struct peerd *peer)
+{
+       int i;
+       uint32_t nr_threads = peer->nr_threads;
+       //TODO err check
+       for (i = 0; i < nr_threads; i++) {
+               peer->thread[i].peer = peer;
+               pthread_cond_init(&peer->thread[i].cond,NULL);
+               pthread_mutex_init(&peer->thread[i].lock, NULL);
+               pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
+               peer->thread[i].func = NULL;
+               peer->thread[i].arg = NULL;
+
+       }
+       return 0;
+}
+
+static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
+                       long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+{
+       int i;
+       struct peerd *peer;
+       peer = malloc(sizeof(struct peerd));
+       if (!peer) {
+               perror("malloc");
+               return NULL;
+       }
+       peer->nr_ops = nr_ops;
+       peer->defer_portno = defer_portno;
+       peer->nr_threads = nr_threads;
+
+       peer->thread = calloc(nr_threads, sizeof(struct thread));
+       if (!peer->thread)
+               goto malloc_fail;
+       peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
+       if (!peer->peer_reqs){
+malloc_fail:
+               perror("malloc");
+               return NULL;
+       }
+
+       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+               goto malloc_fail;
+       if (!xq_alloc_empty(&peer->threads, nr_threads))
+               goto malloc_fail;
+
+       if (xseg_initialize()){
+               printf("cannot initialize library\n");
+               return NULL;
+       }
+       peer->xseg = join(spec);
+       if (!peer->xseg) 
+               return NULL;
+
+       peer->portno_start = (xport) portno_start;
+       peer->portno_end= (xport) portno_end;
+       peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+       if (!peer->port){
+               printf("cannot bind to port %ld\n", peer->portno_start);
+               return NULL;
+       }
+
+       xport p;
+       for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
+               struct xseg_port *tmp;
+               tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+               if (!tmp){
+                       printf("cannot bind to port %ld\n", p);
+                       return NULL;
+               }
+       }
+
+       printf("Peer on ports  %u-%u\n", peer->portno_start,
+                       peer->portno_end);
+
+       for (i = 0; i < nr_ops; i++) {
+               peer->peer_reqs[i].peer = peer;
+               peer->peer_reqs[i].req = NULL;
+               peer->peer_reqs[i].retval = 0;
+               peer->peer_reqs[i].priv = NULL;
+       }
+       peer->interactive_func = NULL;
+       return peer;
+}
+
+
+int main(int argc, char *argv[])
+{
+       struct peerd *peer = NULL;
+       //parse args
+       char *spec = "";
+       int i, r;
+       long portno_start = -1, portno_end = -1;
+       //set defaults here
+       uint32_t nr_ops = 16;
+       uint32_t nr_threads = 1 ;
+       unsigned int debug_level = 0;
+       uint32_t defer_portno = NoPort;
+       char *logfile = NULL;
+
+       //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
+       // -dp xseg_portno to defer blocking requests
+       // -l log file ?
+       //TODO print messages on arg parsing error
+       
+       for (i = 1; i < argc; i++) {
+               if (!strcmp(argv[i], "-g") && i + 1 < argc) {
+                       spec = argv[i+1];
+                       i += 1;
+                       continue;
+               }
+
+               if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
+                       portno_start = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+               
+               if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
+                       portno_end = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+
+               if (!strcmp(argv[i], "-n") && i + 1 < argc) {
+                       nr_ops = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+               if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
+                       debug_level = atoi(argv[i+1]);
+                       i += 1;
+                       continue;
+               }
+               if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
+                       defer_portno = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+               if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
+                       logfile = argv[i+1];
+                       i += 1;
+                       continue;
+               }
+
+       }
+       init_logctx(&lc, argv[0], debug_level, logfile);
+       XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
+       
+       //TODO perform argument sanity checks
+       verbose = debug_level;
+
+       //TODO err check
+       peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+       if (!peer)
+               return -1;
+       r = custom_peer_init(peer, argc, argv);
+       if (r < 0)
+               return -1;
+       peerd_start_threads(peer);
+       return peerd_loop(peer);
+}
index 967df1c..d4bdf18 100644 (file)
@@ -79,7 +79,7 @@ int wait_reply(struct xseg_request *expected_req)
        struct xseg_request *rec;
        xseg_prepare_wait(xseg, srcport);
        while(1) {
-               rec = xseg_receive(xseg, srcport);
+               rec = xseg_receive(xseg, srcport, 0);
                if (rec) {
                        if (rec != expected_req) {
                                fprintf(stderr, "Unknown received req. Putting req.\n");
index b18a869..74ce401 100644 (file)
@@ -510,28 +510,28 @@ int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
                        active = 0;
 
                        //FIXME
-                       req = xseg_accept(xseg, portno1);
+                       req = xseg_accept(xseg, portno1, 0);
                        if (req) {
                                xseg_submit(xseg, req, portno2, X_ALLOC);
                                log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
                                active += 1;
                        }
 
-                       req = xseg_accept(xseg, portno2);
+                       req = xseg_accept(xseg, portno2, 0);
                        if (req) {
                                xseg_submit(xseg, req, portno1, X_ALLOC);
                                log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
                                active += 1;
                        }
 
-                       req = xseg_receive(xseg, portno1);
+                       req = xseg_receive(xseg, portno1, 0);
                        if (req) {
                                xseg_respond(xseg, req, portno2, X_ALLOC);
                                log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
                                active += 1;
                        }
 
-                       req = xseg_receive(xseg, portno2);
+                       req = xseg_receive(xseg, portno2, 0);
                        if (req) {
                                xseg_respond(xseg, req, portno1, X_ALLOC);
                                log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
@@ -632,7 +632,7 @@ int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksiz
                        }
                }
 
-               received = xseg_receive(xseg, srcport);
+               received = xseg_receive(xseg, srcport, 0);
                if (received) {
                        xseg_cancel_wait(xseg, srcport);
                        nr_received += 1;
@@ -738,7 +738,7 @@ int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize
                        }
                }
 
-               received = xseg_receive(xseg, srcport);
+               received = xseg_receive(xseg, srcport, 0);
                if (received) {
                        xseg_cancel_wait(xseg, srcport);
                        nr_received += 1;
@@ -832,7 +832,7 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
                                        perror("Cannot signal peer");
                        }
                }
-               received = xseg_receive(xseg, srcport);
+               received = xseg_receive(xseg, srcport, 0);
                if (received) {
                        xseg_cancel_wait(xseg, srcport);
                        --nr_flying;
@@ -955,7 +955,7 @@ int cmd_put_requests(void)
        struct xseg_request *req;
 
        for (;;) {
-               req = xseg_accept(xseg, dstport);
+               req = xseg_accept(xseg, dstport, 0);
                if (!req)
                        break;
                if (xseg_put_request(xseg, req, srcport))
@@ -975,7 +975,7 @@ int cmd_finish(unsigned long nr, int fail)
 
        for (; nr--;) {
                xseg_prepare_wait(xseg, srcport);
-               req = xseg_accept(xseg, srcport);
+               req = xseg_accept(xseg, srcport, 0);
                if (req) {
                        req_target = xseg_get_target(xseg, req);
                        req_data = xseg_get_data(xseg, req);
@@ -1054,7 +1054,7 @@ int cmd_wait(uint32_t nr)
        init_local_signal(); 
 
        for (;;) {
-               req = xseg_receive(xseg, srcport);
+               req = xseg_receive(xseg, srcport, 0);
                if (req) {
                        handle_reply(req);
                        nr--;
@@ -1081,7 +1081,7 @@ int cmd_put_replies(void)
        struct xseg_request *req;
 
        for (;;) {
-               req = xseg_receive(xseg, dstport);
+               req = xseg_receive(xseg, dstport, 0);
                if (!req)
                        break;
                fprintf(stderr, "request: %08llx%08llx\n"
index 52e04d7..f1abc46 100644 (file)
@@ -20,8 +20,9 @@
                } while(0) 
 
 /* general purpose xflags */
-#define X_ALLOC ((uint32_t) (1 << 0))
-#define X_LOCAL ((uint32_t) (1 << 1))
+#define X_ALLOC    ((uint32_t) (1 << 0))
+#define X_LOCAL    ((uint32_t) (1 << 1))
+#define X_NONBLOCK ((uint32_t) (1 << 2))
 
 
 typedef uint64_t xpointer;
index 3922608..d62d062 100755 (executable)
@@ -75,7 +75,7 @@ def vlmc_map(args):
 
         port = prev + 1
         fd = os.open(XSEGBD_SYSFS + "add", os.O_WRONLY)
-        os.write(fd, "%s %d:%d:%d" % (name, port, VPORT, REQS))
+        os.write(fd, "%s %d:%d:%d" % (name, port, port + VPORT_START-4, REQS))
         os.close(fd)
     except Exception, reason:
         print >> sys.stderr, reason
index 8bb6220..fc3f207 100644 (file)
@@ -1334,7 +1334,7 @@ out:
        
 }
 
-struct xseg_request *xseg_receive(struct xseg *xseg, xport portno)
+struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
 {
        xqindex xqi;
        xserial serial = NoSerial;
@@ -1344,7 +1344,12 @@ struct xseg_request *xseg_receive(struct xseg *xseg, xport portno)
        if (!port)
                return NULL;
 retry:
-       xlock_acquire(&port->pq_lock, portno);
+       if (flags & X_NONBLOCK) {
+               if (!xlock_try_lock(&port->pq_lock, portno))
+                       return NULL;
+       } else {
+               xlock_acquire(&port->pq_lock, portno);
+       }
        q = XPTR_TAKE(port->reply_queue, xseg->segment);
        xqi = __xq_pop_head(q);
        xlock_release(&port->pq_lock);
@@ -1365,7 +1370,7 @@ retry:
        return req;
 }
 
-struct xseg_request *xseg_accept(struct xseg *xseg, xport portno)
+struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags)
 {
        xqindex xqi;
        struct xq *q;
@@ -1373,7 +1378,13 @@ struct xseg_request *xseg_accept(struct xseg *xseg, xport portno)
        struct xseg_port *port = xseg_get_port(xseg, portno);
        if (!port)
                return NULL;
-       xlock_acquire(&port->rq_lock, portno);
+       if (flags & X_NONBLOCK) {
+               if (!xlock_try_lock(&port->rq_lock, portno))
+                       return NULL;
+       } else {
+               xlock_acquire(&port->rq_lock, portno);
+       }
+
        q = XPTR_TAKE(port->request_queue, xseg->segment);
        xqi = __xq_pop_head(q);
        xlock_release(&port->rq_lock);
index c8e85dc..745ab77 100644 (file)
@@ -337,13 +337,15 @@ struct xseg_request *  xseg_get_request     ( struct xseg         * xseg,
                                              uint32_t              flags     );
 
 struct xseg_request *  xseg_receive         ( struct xseg         * xseg,
-                                              xport                 portno    );
+                                              xport                 portno,    
+                                             uint32_t              flags     );
 /*                    \___________________/                       \_________/ */
 /*                     ___________________                         _________  */
 /*                    /                   \                       /         \ */
 
 struct xseg_request *  xseg_accept          ( struct xseg         * xseg,
-                                              xport                 portno    );
+                                              xport                 portno,    
+                                             uint32_t              flags     );
 
               xport    xseg_respond         ( struct xseg         * xseg,
                                               struct xseg_request * xreq,