Complete unification of thread_loop/peerd_loop
[archipelago] / xseg / peers / user / bench-xseg.c
index aea6fca..0b559aa 100644 (file)
 #include <time.h>
 #include <sys/util.h>
 #include <signal.h>
+#include <bench-xseg.h>
 
 struct timespec delay = {0, 4000000};
 
-struct bench {
-       uint64_t ts; //Total I/O size
-       uint64_t os; //Object size
-       uint64_t bs; //Block size
-       uint32_t iodepth; //Num of in-flight xseg reqs
-       xport dst_port;
-       uint8_t flags;
-};
-
-int custom_peerd_loop(struct peerd *peer);
-
-#define MAX_ARG_LEN 10
-
 void custom_peer_usage()
 {
        fprintf(stderr, "Custom peer options: \n"
@@ -134,11 +122,9 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        READ_ARG_ULONG("-dp", dst_port);
        END_READ_ARGS();
 
-       /*
-        *************************
+       /*************************
         * Check size parameters *
-        *************************
-        */
+        *************************/
 
        //Block size (bs): Defaults to 4K.
        //It must be a number followed by one of these characters: [k|K|m|M|g|G].
@@ -191,24 +177,31 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                goto arg_fail;
        }
 
-       /*
-        *************************
+       /*************************
         * Check port parameters *
-        *************************
-        */
+        *************************/
 
        if (dst_port < 0){
                XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
                goto arg_fail;
        }
 
+       prefs->src_port = peer->portno_start; //TODO: allow user to change this
        prefs->dst_port = (xport) dst_port;
 
-       /*
-        **************************
+       /**************************
         * Customize struct peerd *
-        **************************
-        */
+        **************************/
+
+       prefs->total_tm = malloc(sizeof(struct timer));
+       prefs->get_tm = malloc(sizeof(struct timer));
+       prefs->sub_tm = malloc(sizeof(struct timer));
+       prefs->rec_tm = malloc(sizeof(struct timer));
+       if (!prefs->total_tm || !prefs->get_tm || !prefs->sub_tm ||
+                       !prefs->rec_tm) {
+               perror("malloc");
+               return -1;
+       }
 
        peer->custom_peerd_loop = custom_peerd_loop;
        peer->priv = (void *) prefs;
@@ -220,10 +213,69 @@ arg_fail:
        return -1;
 }
 
+
+int send_request(struct peerd *peer, struct bench *prefs)
+{
+       struct xseg_request *req;
+       struct xseg *xseg = peer->xseg;
+       xport srcport = prefs->src_port;
+       xport dstport = prefs->dst_port;
+       xport p;
+
+       int r;
+       uint32_t targetlen=10; //FIXME: handle it better
+       uint64_t size = prefs->os;
+
+       //srcport and dstport must already be provided by the user.
+       //returns struct xseg_request with basic initializations
+       req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+       if (!req) {
+               fprintf(stderr, "No request\n");
+               return -1;
+       }
+
+       //Allocate enough space for the data and the target's name
+       r = xseg_prep_request(xseg, req, targetlen, size);
+       if (r < 0) {
+               fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
+                       (unsigned long)targetlen, (unsigned long long)size);
+               xseg_put_request(xseg, req, srcport);
+               return -1;
+       }
+
+#if 0
+       //TODO: allow strcpy, memcpy
+       //Copy target's name to the newly allocated space
+       req_target = xseg_get_target(xseg, req);
+       strncpy(req_target, target, targetlen);
+
+       //Copy data buffer to the newly allocated space
+       req_data = xseg_get_data(xseg, req);
+       memcpy(req_data, buf, size);
+       req->offset = offset;
+       req->size = size;
+       req->op = X_WRITE;
+#endif
+
+       //Submit the request from the source port to the target port
+       timer_start(prefs->sub_tm);
+       p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort) {
+               fprintf(stderr, "Cannot submit\n");
+               return -1;
+       }
+       timer_stop(prefs->sub_tm);
+
+       //Send SIGIO to the process that has binded this port to inform that
+       //IO is possible
+       xseg_signal(xseg, p);
+
+       return 0;
+}
+
 /*
  * This function substitutes the default peerd_loop of peer.c.
- * This is achieved by passing to gcc a CUSTOM_LOOP macro definition which is
- * checked (ifdef) in peer.c
+ * It's plugged to struct peerd at custom peer's initialisation
  */
 int custom_peerd_loop(struct peerd *peer)
 {
@@ -242,17 +294,22 @@ int custom_peerd_loop(struct peerd *peer)
        xport portno_end = peer->portno_end;
        uint64_t threshold=1000/(1 + portno_end - portno_start);
        pid_t pid =syscall(SYS_gettid);
+       XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+       xseg_init_local_signal(xseg, peer->portno_start);
        uint64_t loops;
 
        uint64_t remaining = prefs->ts;
 
-       XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
-       xseg_init_local_signal(xseg, peer->portno_start);
-
        while (!isTerminate()
                        && xq_count(&peer->free_reqs) == peer->nr_ops
                        && remaining) {
-               for (loops= threshold; loops > 0; loops--) {
+
+               while (prefs->sub_tm->completed - prefs->sub_tm->completed <
+                               prefs->iodepth){
+                       send_request(peer, prefs);
+               }
+
+               for (loops = threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
                        if (check_ports(peer))