Split peer request queues.
authorFilippos Giannakos <philipgian@grnet.gr>
Wed, 13 Mar 2013 11:45:10 +0000 (13:45 +0200)
committerFilippos Giannakos <philipgian@grnet.gr>
Thu, 14 Mar 2013 16:00:21 +0000 (18:00 +0200)
Each thread has its own peer request queue, frow which it allocates data. If
this queue is empty, it  tries to "steal" from the "next" thread. This greatly
reduces lock congestion to the peer request queue.

xseg/peers/user/peer.c
xseg/peers/user/peer.h

index c35f94e..5021849 100644 (file)
@@ -70,6 +70,7 @@ uint32_t ta = 0;
 #ifdef MT
 struct peerd *global_peer;
 
+/*
 struct thread {
        struct peerd *peer;
        pthread_t tid;
@@ -79,8 +80,9 @@ struct thread {
        void (*func)(void *arg);
        void *arg;
        void *priv;
+       struct xq free_thread_reqs;
 };
-
+*/
 inline static struct thread* alloc_thread(struct peerd *peer)
 {
        xqindex idx = xq_pop_head(&peer->threads, 1);
@@ -232,6 +234,25 @@ void log_pr(char *msg, struct peer_req *pr)
        }
 }
 
+#ifdef MT
+inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
+{
+       struct peer_req *pr;
+       struct thread *nt;
+       xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
+       if (idx != Noneidx)
+               goto out;
+       /* try to steal from the next thread */
+       nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
+       idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+       if (idx == Noneidx)
+               return NULL;
+out:
+       pr = peer->peer_reqs + idx;
+       pr->thread_no = t - peer->thread;
+       return pr;
+}
+#else
 /*
  * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
  * queue. If a pointer from peer_reqs is popped, we are certain that the
@@ -244,12 +265,18 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer)
                return NULL;
        return peer->peer_reqs + idx;
 }
+#endif
 
 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
 {
        xqindex idx = pr - peer->peer_reqs;
        pr->req = NULL;
+#ifdef MT
+       struct thread *t = &peer->thread[pr->thread_no];
+       xq_append_head(&t->free_thread_reqs, idx, 1);
+#else
        xq_append_head(&peer->free_reqs, idx, 1);
+#endif
 }
 
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
@@ -353,7 +380,11 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
+#ifdef MT
+int check_ports(struct peerd *peer, struct thread *t)
+#else
 int check_ports(struct peerd *peer)
+#endif
 {
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
@@ -370,7 +401,11 @@ int check_ports(struct peerd *peer)
                if (!isTerminate()) {
                        //Better way than alloc/free all the time?
                        //Cache the allocated peer_req?
+#ifdef MT
+                       pr = alloc_peer_req(peer, t); 
+#else
                        pr = alloc_peer_req(peer);
+#endif
                        if (pr) {
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
                                if (accepted) {
@@ -429,7 +464,7 @@ static void* thread_loop(void *arg)
                for(loops =  threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
-                       if (check_ports(peer))
+                       if (check_ports(peer, t))
                                loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
@@ -560,7 +595,11 @@ static int generic_peerd_loop(void *arg)
                for(loops = threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+                       if (check_ports(peer, t))
+#else
                        if (check_ports(peer))
+#endif
                                loops = threshold;
                }
 #ifdef ST_THREADS
@@ -626,6 +665,20 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
        peer->thread = calloc(nr_threads, sizeof(struct thread));
        if (!peer->thread)
                goto malloc_fail;
+       if (!xq_alloc_empty(&peer->threads, nr_threads))
+               goto malloc_fail;
+       for (i = 0; i < nr_threads; i++) {
+               peer->thread[i].thread_no = i;
+               if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
+                       goto malloc_fail;
+       }
+       for (i = 0; i < nr_ops; i++) {
+               __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
+       }
+
+#else
+       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+               goto malloc_fail;
 #endif
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
        if (!peer->peer_reqs){
@@ -633,13 +686,6 @@ malloc_fail:
                perror("malloc");
                return NULL;
        }
-
-       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
-               goto malloc_fail;
-#ifdef MT
-       if (!xq_alloc_empty(&peer->threads, nr_threads))
-               goto malloc_fail;
-#endif
        if (xseg_initialize()){
                printf("cannot initialize library\n");
                return NULL;
@@ -686,6 +732,7 @@ malloc_fail:
        peer->interactive_func = NULL;
 #endif
        return peer;
+
 }
 
 int pidfile_remove(char *path, int fd)
index 2589526..3ea8a91 100644 (file)
@@ -83,6 +83,21 @@ struct peer_req {
 #ifdef ST_THREADS
        st_cond_t cond;
 #endif
+#ifdef MT
+       int thread_no;
+#endif
+};
+
+struct thread {
+       struct peerd *peer;
+       pthread_t tid;
+       pthread_cond_t cond;
+       pthread_mutex_t lock;
+       int thread_no;
+       void (*func)(void *arg);
+       void *arg;
+       void *priv;
+       struct xq free_thread_reqs;
 };
 
 struct peerd {
@@ -115,17 +130,20 @@ int defer_request(struct peerd *peer, struct peer_req *pr);
 void pending(struct peerd *peer, struct peer_req *req);
 void log_pr(char *msg, struct peer_req *pr);
 int canDefer(struct peerd *peer);
-int submit_peer_req(struct peerd *peer, struct peer_req *pr);
-struct peer_req *alloc_peer_req(struct peerd *peer);
 void free_peer_req(struct peerd *peer, struct peer_req *pr);
+int submit_peer_req(struct peerd *peer, struct peer_req *pr);
 void get_submits_stats();
 void get_responds_stats();
 void usage();
 void print_req(struct xseg *xseg, struct xseg_request *req);
-int check_ports(struct peerd *peer);
 
 #ifdef MT
 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);
+struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t);
+int check_ports(struct peerd *peer, struct thread *t);
+#else
+struct peer_req *alloc_peer_req(struct peerd *peer);
+int check_ports(struct peerd *peer);
 #endif
 
 static inline struct peerd * __get_peerd(void * custom_peerd)