add inspect queue functionality
[archipelago] / xseg / peers / user / mpeer.c
index 609e6cb..e12ef76 100644 (file)
@@ -23,6 +23,43 @@ struct thread {
 };
 
 
+inline static struct thread* alloc_thread(struct peerd *peer)
+{
+       xqindex idx = xq_pop_head(&peer->threads, 1);
+       if (idx == Noneidx)
+               return NULL;
+       return peer->thread + idx;
+}
+
+inline static void free_thread(struct peerd *peer, struct thread *t)
+{
+       xqindex idx = t - peer->thread;
+       xq_append_head(&peer->threads, idx, 1);
+}
+
+
+inline static void __wake_up_thread(struct thread *t)
+{
+       pthread_mutex_lock(&t->lock);
+       pthread_cond_signal(&t->cond);
+       pthread_mutex_unlock(&t->lock);
+}
+
+inline static void wake_up_thread(struct thread* t)
+{
+       if (t){
+               __wake_up_thread(t);
+       }
+}
+
+inline static int wake_up_next_thread(struct peerd *peer)
+{
+       //struct thread *t = alloc_thread(peer);
+       //wake_up_thread(t);
+       //return t;
+       return (xseg_signal(peer->xseg, peer->portno_start));
+}
+
 inline int canDefer(struct peerd *peer)
 {
        return !(peer->defer_portno == NoPort);
@@ -106,43 +143,6 @@ inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
        xq_append_head(&peer->free_reqs, idx, 1);
 }
 
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
-       xqindex idx = xq_pop_head(&peer->threads, 1);
-       if (idx == Noneidx)
-               return NULL;
-       return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
-       xqindex idx = t - peer->thread;
-       xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
-       pthread_mutex_lock(&t->lock);
-       pthread_cond_signal(&t->cond);
-       pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
-       if (t){
-               __wake_up_thread(t);
-       }
-}
-
-inline static int wake_up_next_thread(struct peerd *peer)
-{
-       //struct thread *t = alloc_thread(peer);
-       //wake_up_thread(t);
-       //return t;
-       return (xseg_signal(peer->xseg, peer->portno_start));
-}
-
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
 uint64_t responds = 0;
 void get_responds_stats(){
@@ -184,11 +184,6 @@ void complete(struct peerd *peer, struct peer_req *pr)
        wake_up_next_thread(peer);
 }
 
-void pending(struct peerd *peer, struct peer_req *pr)
-{
-               pr->req->state = XS_PENDING;
-}
-
 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
                                struct xseg_request *req)
 {
@@ -198,7 +193,7 @@ static void handle_accepted(struct peerd *peer, struct peer_req *pr,
        xreq->serviced = 0;
        //xreq->state = XS_ACCEPTED;
        pr->retval = 0;
-       dispatch(peer, pr, req, accept);
+       dispatch(peer, pr, req, dispatch_accept);
 }
 
 static void handle_received(struct peerd *peer, struct peer_req *pr,
@@ -207,7 +202,7 @@ static void handle_received(struct peerd *peer, struct peer_req *pr,
        //struct xseg_request *req = pr->req;
        //assert req->state != XS_ACCEPTED;
        XSEGLOG2(&lc, D, "Handle received \n");
-       dispatch(peer, pr, req, receive);
+       dispatch(peer, pr, req, dispatch_receive);
 
 }
 struct timeval sub_start, sub_end, sub_accum = {0, 0};
@@ -254,6 +249,58 @@ int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
                return -1;
 }
 
+static int check_ports(struct peerd *peer)
+{
+       struct xseg *xseg = peer->xseg;
+       xport portno_start = peer->portno_start;
+       xport portno_end = peer->portno_end;
+       struct xseg_request *accepted, *received;
+       struct peer_req *pr;
+       xport i;
+       int  r, c = 0;
+
+       for (i = portno_start; i <= portno_end; i++) {
+               accepted = NULL;
+               received = NULL;
+               pr = alloc_peer_req(peer);
+               if (pr) {
+                       accepted = xseg_accept(xseg, i, X_NONBLOCK);
+                       if (accepted) {
+                               pr->req = accepted;
+                               pr->portno = i;
+                               xseg_cancel_wait(xseg, i);
+                               handle_accepted(peer, pr, accepted);
+                               c = 1;
+                       }
+                       else {
+                               free_peer_req(peer, pr);
+                       }
+               }
+               received = xseg_receive(xseg, i, X_NONBLOCK);
+               if (received) {
+                       r =  xseg_get_req_data(xseg, received, (void **) &pr);
+                       if (r < 0 || !pr){
+                               XSEGLOG2(&lc, W, "Received request with no pr data\n");
+                               xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+                               if (p == NoPort){
+                                       XSEGLOG2(&lc, W, "Could not respond stale request");
+                                       xseg_put_request(xseg, received, portno_start);
+                                       continue;
+                               } else {
+                                       xseg_signal(xseg, p);
+                               }
+                       } else {
+                               //maybe perform sanity check for pr
+                               xseg_cancel_wait(xseg, i);
+                               handle_received(peer, pr, received);
+                               c = 1;
+                       }
+               }
+       }
+
+       return c;
+}
+
 static void* thread_loop(void *arg)
 {
        struct thread *t = (struct thread *) arg;
@@ -261,15 +308,10 @@ static void* thread_loop(void *arg)
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
        xport portno_end = peer->portno_end;
-       struct peer_req *pr;
-       uint64_t threshold=1;
        pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
-       struct xseg_request *accepted, *received;
-       int r;
-       int change;
-       xport i;
-               
+       uint64_t threshold=1000/(1 + portno_end - portno_start);
+       
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
 
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
@@ -285,49 +327,10 @@ static void* thread_loop(void *arg)
                }
 
                for(loops= threshold; loops > 0; loops--) {
-                       do {
-                               if (loops == 1)
-                                       xseg_prepare_wait(xseg, peer->portno_start);
-                               change = 0;
-                               for (i = portno_start; i <= portno_end; i++) {
-                                       accepted = NULL;
-                                       received = NULL;
-                                       pr = alloc_peer_req(peer);
-                                       if (pr) {
-                                               accepted = xseg_accept(xseg, i, X_NONBLOCK);
-                                               if (accepted) {
-                                                       XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
-                                                       pr->req = accepted;
-                                                       pr->portno = i;
-                                                       xseg_cancel_wait(xseg, i);
-                                                       wake_up_next_thread(peer);
-                                                       handle_accepted(peer, pr, accepted);
-                                                       change = 1;
-                                               }
-                                               else {
-                                                       free_peer_req(peer, pr);
-                                               }
-                                       }
-                                       received = xseg_receive(xseg, i, X_NONBLOCK);
-                                       if (received) {
-                                               //printf("received req id: %u\n", received - xseg->requests);
-                                               //print_req(peer->xseg, received);
-                                               r =  xseg_get_req_data(xseg, received, (void **) &pr);
-                                               if (r < 0 || !pr){
-                                                       //FIXME what to do here ?
-                                                       XSEGLOG2(&lc, W, "Received request with no pr data\n");
-                                                       xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
-                                                       //FIXME if fails, put req
-                                               }
-                                               xseg_cancel_wait(xseg, i);
-                                               wake_up_next_thread(peer);
-                                               handle_received(peer, pr, received);
-                                               change = 1;
-                                       }
-                               }
-                               if (change)
-                                       loops = threshold;
-                       }while (change);
+                       if (loops == 1)
+                               xseg_prepare_wait(xseg, peer->portno_start);
+                       if (check_ports(peer))
+                               loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
                xseg_wait_signal(xseg, 10000000UL);
@@ -463,7 +466,7 @@ int main(int argc, char *argv[])
        //parse args
        char *spec = "";
        int i, r;
-       long portno_start = -1, portno_end = -1;
+       long portno_start = -1, portno_end = -1, portno = -1;
        //set defaults here
        uint32_t nr_ops = 16;
        uint32_t nr_threads = 16 ;
@@ -495,6 +498,12 @@ int main(int argc, char *argv[])
                        continue;
                }
 
+               if (!strcmp(argv[i], "-p") && i + 1 < argc) {
+                       portno = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
+
                if (!strcmp(argv[i], "-n") && i + 1 < argc) {
                        nr_ops = strtoul(argv[i+1], NULL, 10);
                        i += 1;
@@ -527,6 +536,10 @@ int main(int argc, char *argv[])
        
        //TODO perform argument sanity checks
        verbose = debug_level;
+       if (portno != -1) {
+               portno_start = portno;
+               portno_end = portno;
+       }
 
        //TODO err check
        peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);