added speer skeletor
[archipelago] / xseg / peers / user / mpeer.c
index 830c073..b0905c5 100644 (file)
@@ -149,7 +149,7 @@ inline static int wake_up_next_thread(struct peerd *peer)
        //struct thread *t = alloc_thread(peer);
        //wake_up_thread(t);
        //return t;
-       return (xseg_signal(peer->xseg, peer->portno));
+       return (xseg_signal(peer->xseg, peer->portno_start));
 }
 
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
@@ -168,7 +168,7 @@ void fail(struct peerd *peer, struct peer_req *pr)
        XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
        req->state |= XS_FAILED;
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
-       p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
        xseg_signal(peer->xseg, p);
        free_peer_req(peer, pr);
        wake_up_next_thread(peer);
@@ -182,7 +182,7 @@ void complete(struct peerd *peer, struct peer_req *pr)
        req->state |= XS_SERVED;
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
        //gettimeofday(&resp_start, NULL);
-       p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
        //gettimeofday(&resp_end, NULL);
        //responds++;
        //timersub(&resp_end, &resp_start, &resp_end);
@@ -239,7 +239,7 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
                return -1;
        //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
        //gettimeofday(&sub_start, NULL);
-       ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        //gettimeofday(&sub_end, NULL);
        //submits++;
        //timersub(&sub_end, &sub_start, &sub_end);
@@ -268,22 +268,25 @@ static void* thread_loop(void *arg)
        struct thread *t = (struct thread *) arg;
        struct peerd *peer = t->peer;
        struct xseg *xseg = peer->xseg;
-       uint32_t portno = peer->portno;
+       xport portno_start = peer->portno_start;
+       xport portno_end = peer->portno_end;
        struct peer_req *pr;
-       uint64_t threshold=1000;
+       uint64_t threshold=1;
        pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
        struct xseg_request *accepted, *received;
        int r;
+       int change;
+       xport i;
                
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
 
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
-       xseg_init_local_signal(xseg, portno);
+       xseg_init_local_signal(xseg, peer->portno_start);
        for (;;) {
                if (t->func) {
                        XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
-                       xseg_cancel_wait(xseg, portno);
+                       xseg_cancel_wait(xseg, peer->portno_start);
                        t->func(t->arg);
                        t->func = NULL;
                        t->arg = NULL;
@@ -291,52 +294,53 @@ static void* thread_loop(void *arg)
                }
 
                for(loops= threshold; loops > 0; loops--) {
-                       accepted = NULL;
-                       received = NULL;
-                       if (loops == 1)
-                               xseg_prepare_wait(xseg, portno);
-
-//                     if (xq_count(&peer->xport->request_queue)){
-                               pr = alloc_peer_req(peer);
-                               if (pr) {
-                                       accepted = xseg_accept(xseg, peer->portno);
-                                       if (accepted) {
-                                               XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
-                                               pr->req = accepted;
-                                               xseg_cancel_wait(xseg, portno);
-                                               wake_up_next_thread(peer);
-                                               handle_accepted(peer, pr, accepted);
-                                               loops = threshold;
+                       do {
+                               if (loops == 1)
+                                       xseg_prepare_wait(xseg, peer->portno_start);
+                               change = 0;
+                               for (i = portno_start; i <= portno_end; i++) {
+                                       accepted = NULL;
+                                       received = NULL;
+                                       pr = alloc_peer_req(peer);
+                                       if (pr) {
+                                               accepted = xseg_accept(xseg, i, X_NONBLOCK);
+                                               if (accepted) {
+                                                       XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
+                                                       pr->req = accepted;
+                                                       pr->portno = i;
+                                                       xseg_cancel_wait(xseg, i);
+                                                       wake_up_next_thread(peer);
+                                                       handle_accepted(peer, pr, accepted);
+                                                       change = 1;
+                                               }
+                                               else {
+                                                       free_peer_req(peer, pr);
+                                               }
                                        }
-                                       else {
-                                               free_peer_req(peer, pr);
+                                       received = xseg_receive(xseg, i, X_NONBLOCK);
+                                       if (received) {
+                                               //printf("received req id: %u\n", received - xseg->requests);
+                                               //print_req(peer->xseg, received);
+                                               r =  xseg_get_req_data(xseg, received, (void **) &pr);
+                                               if (r < 0 || !pr){
+                                                       //FIXME what to do here ?
+                                                       XSEGLOG2(&lc, W, "Received request with no pr data\n");
+                                                       xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+                                                       //FIXME if fails, put req
+                                               }
+                                               xseg_cancel_wait(xseg, i);
+                                               wake_up_next_thread(peer);
+                                               handle_received(peer, pr, received);
+                                               change = 1;
                                        }
                                }
-//                     }
-//                     if (xq_count(&peer->xport->reply_queue)){
-                               received = xseg_receive(xseg, peer->portno);
-                               if (received) {
-                                       //printf("received req id: %u\n", received - xseg->requests);
-                                       //print_req(peer->xseg, received);
-                                       r =  xseg_get_req_data(xseg, received, (void **) &pr);
-                                       if (r < 0 || !pr){
-                                               //FIXME what to do here ?
-                                               XSEGLOG2(&lc, W, "Received request with no pr data\n");
-                                               xseg_respond(peer->xseg, received, peer->portno, X_ALLOC);
-                                               //if fails, put req
-                                       }
-                                       //fail(peer, received);
-                                       //assert pr->req == received;
-                                       xseg_cancel_wait(xseg, portno);
-                                       wake_up_next_thread(peer);
-                                       handle_received(peer, pr, received);
+                               if (change)
                                        loops = threshold;
-                               }
-//                     }
+                       }while (change);
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
                xseg_wait_signal(xseg, 10000000UL);
-               xseg_cancel_wait(xseg, portno);
+               xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
        return NULL;
@@ -391,7 +395,8 @@ int peerd_start_threads(struct peerd *peer)
        return 0;
 }
 
-static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno)
+static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
+                       long portno_end, uint32_t nr_threads, uint32_t defer_portno)
 {
        int i;
        struct peerd *peer;
@@ -427,15 +432,26 @@ malloc_fail:
        if (!peer->xseg) 
                return NULL;
 
-       peer->xport = xseg_bind_port(peer->xseg, portno, NULL);
-       if (!peer->xport){
-               printf("cannot bind to port %ld\n", portno);
+       peer->portno_start = (xport) portno_start;
+       peer->portno_end= (xport) portno_end;
+       peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+       if (!peer->port){
+               printf("cannot bind to port %ld\n", peer->portno_start);
                return NULL;
        }
-       printf("%lx\n", (unsigned long) peer->xport);
-       peer->portno = xseg_portno(peer->xseg, peer->xport);
-       printf("Peer on port %u/%u\n", peer->portno,
-                       peer->xseg->config.nr_ports);
+
+       xport p;
+       for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
+               struct xseg_port *tmp;
+               tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+               if (!tmp){
+                       printf("cannot bind to port %ld\n", p);
+                       return NULL;
+               }
+       }
+
+       printf("Peer on ports  %u-%u\n", peer->portno_start,
+                       peer->portno_end);
 
        for (i = 0; i < nr_ops; i++) {
                peer->peer_reqs[i].peer = peer;
@@ -454,7 +470,7 @@ int main(int argc, char *argv[])
        //parse args
        char *spec = "";
        int i, r;
-       long portno = -1;
+       long portno_start = -1, portno_end = -1;
        //set defaults here
        uint32_t nr_ops = 16;
        uint32_t nr_threads = 16 ;
@@ -474,8 +490,14 @@ int main(int argc, char *argv[])
                        continue;
                }
 
-               if (!strcmp(argv[i], "-p") && i + 1 < argc) {
-                       portno = strtoul(argv[i+1], NULL, 10);
+               if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
+                       portno_start = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+               
+               if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
+                       portno_end = strtoul(argv[i+1], NULL, 10);
                        i += 1;
                        continue;
                }
@@ -514,7 +536,7 @@ int main(int argc, char *argv[])
        verbose = debug_level;
 
        //TODO err check
-       peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
+       peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
        if (!peer)
                return -1;
        r = custom_peer_init(peer, argc, argv);