Make bench send requests faster
authorAlex Pyrgiotis <apyrgio@grnet.gr>
Tue, 12 Feb 2013 17:39:26 +0000 (19:39 +0200)
committerFilippos Giannakos <philipgian@grnet.gr>
Mon, 11 Mar 2013 09:52:27 +0000 (11:52 +0200)
xseg/peers/user/bench-xseg.c
xseg/peers/user/peer.c

index 9140c83..b49218b 100644 (file)
@@ -192,9 +192,9 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
        prefs->dst_port = (xport) dst_port;
 
-       /**************************
-        * Customize struct peerd *
-        **************************/
+       /*********************************
+        * Create timers for all metrics *
+        *********************************/
 
        prefs->total_tm = malloc(sizeof(struct timer));
        prefs->get_tm = malloc(sizeof(struct timer));
@@ -210,6 +210,10 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        memset(prefs->sub_tm, 0, sizeof(struct timer));
        memset(prefs->rec_tm, 0, sizeof(struct timer));
 
+       /**************************
+        * Customize struct peerd *
+        **************************/
+
        peer->peerd_loop = custom_peerd_loop;
        peer->priv = (void *) prefs;
        return 0;
@@ -221,10 +225,11 @@ arg_fail:
 }
 
 
-int send_request(struct peerd *peer, struct bench *prefs)
+static int send_request(struct peerd *peer, struct bench *prefs)
 {
        struct xseg_request *req;
        struct xseg *xseg = peer->xseg;
+       struct peer_req *pr;
        xport srcport = prefs->src_port;
        xport dstport = prefs->dst_port;
        xport p;
@@ -235,7 +240,7 @@ int send_request(struct peerd *peer, struct bench *prefs)
 
        //srcport and dstport must already be provided by the user.
        //returns struct xseg_request with basic initializations
-       XSEGLOG2(&lc, I, "Get request %lu\n", prefs->get_tm->completed);
+       XSEGLOG("Get request %lu\n", prefs->get_tm->completed);
        timer_start(prefs->get_tm);
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
        if (!req) {
@@ -244,7 +249,7 @@ int send_request(struct peerd *peer, struct bench *prefs)
        }
        timer_stop(prefs->get_tm);
 
-       XSEGLOG2(&lc, I, "Prepare request\n");
+       XSEGLOG("Prepare request\n");
        //Allocate enough space for the data and the target's name
        r = xseg_prep_request(xseg, req, targetlen, size);
        if (r < 0) {
@@ -268,7 +273,13 @@ int send_request(struct peerd *peer, struct bench *prefs)
        req->op = X_WRITE;
 #endif
 
-       XSEGLOG2(&lc, I, "Submit request %lu\n", prefs->sub_tm->completed);
+       //Measure this?
+       XSEGLOG("Set req data\n");
+       pr = alloc_peer_req(peer);
+       r = xseg_set_req_data(xseg, req, pr);
+
+
+       XSEGLOG("Submit request %lu\n", prefs->sub_tm->completed);
        //Submit the request from the source port to the target port
        timer_start(prefs->sub_tm);
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
@@ -277,6 +288,7 @@ int send_request(struct peerd *peer, struct bench *prefs)
                return -1;
        }
        timer_stop(prefs->sub_tm);
+
        timer_start(prefs->rec_tm);
        //Send SIGIO to the process that has binded this port to inform that
        //IO is possible
@@ -309,12 +321,11 @@ int custom_peerd_loop(void *arg)
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        uint64_t loops;
+       unsigned long max_completed = prefs->ts / prefs->bs;
 
        timer_start(prefs->total_tm);
 
-       while (!isTerminate()
-                       && xq_count(&peer->free_reqs) == peer->nr_ops
-                       && prefs->rec_tm->completed != prefs->ts / prefs->bs ) {
+       while (!isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops) {
 #ifdef MT
                if (t->func) {
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
@@ -325,6 +336,7 @@ int custom_peerd_loop(void *arg)
                        continue;
                }
 #endif
+send_request:
                while (prefs->sub_tm->completed - prefs->rec_tm->completed <
                                prefs->iodepth){
                        XSEGLOG2(&lc, I, "Start sending new request\n");
@@ -333,11 +345,16 @@ int custom_peerd_loop(void *arg)
 
                //Heart of peerd_loop. This loop is common for everyone.
                for (loops = threshold; loops > 0; loops--) {
-                       if (loops == 1)
-                               xseg_prepare_wait(xseg, peer->portno_start);
-                       if (check_ports(peer))
-                               loops = threshold;
+                       if (check_ports(peer)) {
+                               if (max_completed == prefs->rec_tm->completed)
+                                       return 0;
+                               else
+                                       //If an old request has just been acked, the most sensible
+                                       //thing to do is immediately to send a new one
+                                       goto send_request;
+                       }
                }
+               xseg_prepare_wait(xseg, peer->portno_start);
 #ifdef ST_THREADS
                if (ta){
                        st_sleep(0);
@@ -349,6 +366,9 @@ int custom_peerd_loop(void *arg)
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
+
+       XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
+                       xq_count(&peer->free_reqs), peer->nr_ops);
        return 0;
 }
 
@@ -376,7 +396,7 @@ void custom_peer_finalize(struct peerd *peer)
 }
 
 
-void handle_received(struct peerd *peer, struct peer_req *pr)
+static void handle_received(struct peerd *peer, struct peer_req *pr)
 {
        //FIXME: handle null pointer
        struct bench *prefs = peer->priv;
@@ -393,8 +413,10 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
                        //This is wrong, benchmarking peer should not accept requests,
                        //only receive them.
                        complete(peer, pr);
+                       break;
                case dispatch_receive:
                        handle_received(peer, pr);
+                       break;
                default:
                        fail(peer, pr);
        }
index 7f1ab2d..5f7aa79 100644 (file)
@@ -232,6 +232,11 @@ void log_pr(char *msg, struct peer_req *pr)
        }
 }
 
+/*
+ * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
+ * queue. If a pointer from peer_reqs is popped, we are certain that the
+ * associated memory in peer_reqs is free to use
+ */
 inline struct peer_req *alloc_peer_req(struct peerd *peer)
 {
        xqindex idx = xq_pop_head(&peer->free_reqs, 1);