Split peer request queues.
[archipelago] / xseg / peers / user / peer.c
index c35f94e..5021849 100644 (file)
@@ -70,6 +70,7 @@ uint32_t ta = 0;
 #ifdef MT
 struct peerd *global_peer;
 
+/*
 struct thread {
        struct peerd *peer;
        pthread_t tid;
@@ -79,8 +80,9 @@ struct thread {
        void (*func)(void *arg);
        void *arg;
        void *priv;
+       struct xq free_thread_reqs;
 };
-
+*/
 inline static struct thread* alloc_thread(struct peerd *peer)
 {
        xqindex idx = xq_pop_head(&peer->threads, 1);
@@ -232,6 +234,25 @@ void log_pr(char *msg, struct peer_req *pr)
        }
 }
 
+#ifdef MT
+inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
+{
+       struct peer_req *pr;
+       struct thread *nt;
+       xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
+       if (idx != Noneidx)
+               goto out;
+       /* try to steal from the next thread */
+       nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
+       idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+       if (idx == Noneidx)
+               return NULL;
+out:
+       pr = peer->peer_reqs + idx;
+       pr->thread_no = t - peer->thread;
+       return pr;
+}
+#else
 /*
  * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
  * queue. If a pointer from peer_reqs is popped, we are certain that the
@@ -244,12 +265,18 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer)
                return NULL;
        return peer->peer_reqs + idx;
 }
+#endif
 
 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
 {
        xqindex idx = pr - peer->peer_reqs;
        pr->req = NULL;
+#ifdef MT
+       struct thread *t = &peer->thread[pr->thread_no];
+       xq_append_head(&t->free_thread_reqs, idx, 1);
+#else
        xq_append_head(&peer->free_reqs, idx, 1);
+#endif
 }
 
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
@@ -353,7 +380,11 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
+#ifdef MT
+int check_ports(struct peerd *peer, struct thread *t)
+#else
 int check_ports(struct peerd *peer)
+#endif
 {
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
@@ -370,7 +401,11 @@ int check_ports(struct peerd *peer)
                if (!isTerminate()) {
                        //Better way than alloc/free all the time?
                        //Cache the allocated peer_req?
+#ifdef MT
+                       pr = alloc_peer_req(peer, t); 
+#else
                        pr = alloc_peer_req(peer);
+#endif
                        if (pr) {
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
                                if (accepted) {
@@ -429,7 +464,7 @@ static void* thread_loop(void *arg)
                for(loops =  threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
-                       if (check_ports(peer))
+                       if (check_ports(peer, t))
                                loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
@@ -560,7 +595,11 @@ static int generic_peerd_loop(void *arg)
                for(loops = threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+                       if (check_ports(peer, t))
+#else
                        if (check_ports(peer))
+#endif
                                loops = threshold;
                }
 #ifdef ST_THREADS
@@ -626,6 +665,20 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
        peer->thread = calloc(nr_threads, sizeof(struct thread));
        if (!peer->thread)
                goto malloc_fail;
+       if (!xq_alloc_empty(&peer->threads, nr_threads))
+               goto malloc_fail;
+       for (i = 0; i < nr_threads; i++) {
+               peer->thread[i].thread_no = i;
+               if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
+                       goto malloc_fail;
+       }
+       for (i = 0; i < nr_ops; i++) {
+               __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
+       }
+
+#else
+       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+               goto malloc_fail;
 #endif
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
        if (!peer->peer_reqs){
@@ -633,13 +686,6 @@ malloc_fail:
                perror("malloc");
                return NULL;
        }
-
-       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
-               goto malloc_fail;
-#ifdef MT
-       if (!xq_alloc_empty(&peer->threads, nr_threads))
-               goto malloc_fail;
-#endif
        if (xseg_initialize()){
                printf("cannot initialize library\n");
                return NULL;
@@ -686,6 +732,7 @@ malloc_fail:
        peer->interactive_func = NULL;
 #endif
        return peer;
+
 }
 
 int pidfile_remove(char *path, int fd)