Style changes
[archipelago] / xseg / peers / user / bench-xseg.c
index 413eb0b..1da07cb 100644 (file)
 #include <sys/util.h>
 #include <signal.h>
 #include <bench-xseg.h>
+#include <bench-lfsr.h>
 #include <limits.h>
 
 char global_id[IDLEN];
-uint64_t global_seed;
-
 /*
- * This function checks two things:
+ * This macro checks two things:
  * a) If in-flight requests are less than given iodepth
- * b) If we have submitted al of the requests
+ * b) If we have submitted all of the requests
  */
-#define CAN_SEND_REQUEST(prefs)        \
-       prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
-       prefs->sub_tm->completed < prefs->max_requests  \
+#define CAN_SEND_REQUEST(prefs)                                                                                                  \
+       ((prefs->status->submitted - prefs->status->received < prefs->iodepth) && \
+       (prefs->status->submitted < prefs->status->max))
+
+#define CAN_VERIFY(prefs)                                                                                                        \
+       ((GET_FLAG(VERIFY, prefs->flags) != VERIFY_NO) && prefs->op == X_READ)
 
 void custom_peer_usage()
 {
@@ -65,22 +67,28 @@ void custom_peer_usage()
                        "  --------------------------------------------\n"
                        "    -op       | None    | XSEG operation [read|write|info|delete]\n"
                        "    --pattern | None    | I/O pattern [seq|rand]\n"
-                       "    -to       | None    | Total objects (not for read/write)\n"
+                       "    --verify  | no      | Verify written requests [no|meta|full]\n"
+                       "    -rc       | None    | Request cap\n"
+                       "    -to       | None    | Total objects\n"
                        "    -ts       | None    | Total I/O size\n"
                        "    -os       | 4M      | Object size\n"
                        "    -bs       | 4k      | Block size\n"
-                       "    -dp       | None    | Destination port\n"
+                       "    -tp       | None    | Target port\n"
                        "    --iodepth | 1       | Number of in-flight I/O requests\n"
-                       "    --verify  | no      | Verify written requests [no|meta|hash]\n"
-                       "    --seed    | None    | Inititalize LFSR and target names\n"
+                       "    --seed    | None    | Initialize LFSR and target names\n"
                        "    --insanity| sane    | Adjust insanity level of benchmark:\n"
                        "              |         |     [sane|eccentric|manic|paranoid]\n"
+                       "\n"
+                       "Additional information:\n"
+                       "  --------------------------------------------\n"
+                       "  The -to and -ts options are mutually exclusive\n"
                        "\n");
 }
 
 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 {
        struct bench *prefs;
+       char request_cap[MAX_ARG_LEN + 1];
        char total_objects[MAX_ARG_LEN + 1];
        char total_size[MAX_ARG_LEN + 1];
        char object_size[MAX_ARG_LEN + 1];
@@ -88,9 +96,15 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        char op[MAX_ARG_LEN + 1];
        char pattern[MAX_ARG_LEN + 1];
        char insanity[MAX_ARG_LEN + 1];
+       char verify[MAX_ARG_LEN + 1];
        struct xseg *xseg = peer->xseg;
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
+       long iodepth = -1;
        long dst_port = -1;
+       unsigned long seed = -1;
+       uint64_t rc;
+       struct timespec timer_seed;
+       int set_by_hand = 0;
        int r;
 
        op[0] = 0;
@@ -100,6 +114,8 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        block_size[0] = 0;
        object_size[0] = 0;
        insanity[0] = 0;
+       verify[0] = 0;
+       request_cap[0] = 0;
 
 #ifdef MT
        for (i = 0; i < nr_threads; i++) {
@@ -118,17 +134,27 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        }
        prefs->flags = 0;
 
+       prefs->status = malloc(sizeof(struct req_status));
+       if (!prefs->status) {
+               perror("malloc");
+               return -1;
+       }
+       memset(prefs->status, 0, sizeof(struct req_status));
+
        //Begin reading the benchmark-specific arguments
        BEGIN_READ_ARGS(argc, argv);
+       READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
        READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
-       READ_ARG_ULONG("--iodepth", prefs->iodepth);
-       READ_ARG_ULONG("-dp", dst_port);
-       READ_ARG_STRING("--insanity", block_size, MAX_ARG_LEN);
+       READ_ARG_ULONG("--iodepth", iodepth);
+       READ_ARG_ULONG("-tp", dst_port);
+       READ_ARG_ULONG("--seed", seed);
+       READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
+       READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
        END_READ_ARGS();
 
        /*****************************\
@@ -136,7 +162,7 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        \*****************************/
 
        //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
-       //The I/O pattern of thesee operations can be either synchronous (sync) or
+       //The I/O pattern of these operations can be either sequential (seq) or
        //random (rand)
        if (!op[0]) {
                XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
@@ -158,7 +184,22 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
                goto arg_fail;
        }
-       prefs->flags |= (uint8_t)r;
+       SET_FLAG(PATTERN, prefs->flags, r);
+
+       if (!verify[0])
+               strcpy(verify, "no");
+       r = read_verify(verify);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
+               goto arg_fail;
+       }
+       SET_FLAG(VERIFY, prefs->flags, r);
+
+       //Default iodepth value is 1
+       if (iodepth < 0)
+               prefs->iodepth = 1;
+       else
+               prefs->iodepth = iodepth;
 
        /**************************\
         * Check timer parameters *
@@ -171,71 +212,55 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        if (!insanity[0])
                strcpy(insanity, "sane");
 
-       prefs->insanity = read_insanity(insanity);
-       if (prefs->insanity < 0) {
+       r = read_insanity(insanity);
+       if (r < 0) {
                XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
                goto arg_fail;
        }
+       SET_FLAG(INSANITY, prefs->flags, r);
 
-       /*
-        * If we have a request other than read/write, we don't need to check
-        * about size parameters, but only how many objects we want to affect
-        */
-       if (prefs->op != X_READ && prefs->op != X_WRITE) {
+       /*****************************\
+        * Check I/O size parameters *
+       \*****************************/
 
-               /***************************\
-                * Check object parameters *
-               \***************************/
+       //Block size (bs): Defaults to 4K.
+       //It must be a number followed by one of these characters:
+       //                                              [k|K|m|M|g|G]
+       //If not, it will be considered as size in bytes.
+       //Must be integer multiple of segment's page size (typically 4k).
+       if (!block_size[0])
+               strcpy(block_size,"4k");
+
+       prefs->bs = str2num(block_size);
+       if (!prefs->bs) {
+               XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
+               goto arg_fail;
+       } else if (prefs->bs % xseg_page_size) {
+               XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
+               goto arg_fail;
+       }
 
-               if (!total_objects[0]) {
-                       XSEGLOG2(&lc, E,
-                                       "Total number of objects needs to be supplied\n");
-                       goto arg_fail;
-               }
+       //Total objects (to) or total I/O size (ts).
+       //Must have the same format as "block size"
+       //They are mutually exclusive
+       if (total_objects[0] && total_size[0]) {
+               XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
+               goto arg_fail;
+       } else if (total_objects[0]) {
                prefs->to = str2num(total_objects);
                if (!prefs->to) {
                        XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
                        goto arg_fail;
                }
-
                //In this case, the maximum number of requests is the total number of
                //objects we will handle
-               prefs->max_requests = prefs->to;
-       } else {
-
-               /*************************\
-                * Check size parameters *
-               \*************************/
-
-               //Block size (bs): Defaults to 4K.
-               //It must be a number followed by one of these characters:
-               //                                              [k|K|m|M|g|G]
-               //If not, it will be considered as size in bytes.
-               //Must be integer multiple of segment's page size (typically 4k).
-               if (!block_size[0])
-                       strcpy(block_size,"4k");
-
-               //0 iodepth is an accepted value, but only for debugging purposes
-               if (!prefs->iodepth)
-                       prefs->iodepth = 1;
-
-               prefs->bs = str2num(block_size);
-               if (!prefs->bs) {
-                       XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
-                       goto arg_fail;
-               } else if (prefs->bs % xseg_page_size) {
-                       XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
-                       goto arg_fail;
-               }
-
-               //Total I/O size (ts): Must be supplied by user.
-               //Must have the same format as "total size"
-               //Must be integer multiple of "block size"
-               if (!total_size[0]) {
-                       XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
+               prefs->status->max = prefs->to;
+       } else if (total_size[0]) {
+               if (prefs->op != X_READ && prefs->op != X_WRITE) {
+                       XSEGLOG2(&lc, E,
+                                       "Total objects must be supplied (required by op %s)\n", op);
                        goto arg_fail;
                }
-
                prefs->ts = str2num(total_size);
                if (!prefs->ts) {
                        XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
@@ -243,83 +268,107 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                } else if (prefs->ts % prefs->bs) {
                        XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
                        goto arg_fail;
-               } else if (prefs->ts > xseg->segment_size) {
-                       XSEGLOG2(&lc, E,
-                                       "Total I/O size exceeds segment size\n", total_size);
-                       goto arg_fail;
                }
-
-               //Object size (os): Defaults to 4M.
-               //Must have the same format as "total size"
-               //Must be integer multiple of "block size"
-               if (!object_size[0])
-                       strcpy(object_size,"4M");
-
-               prefs->os = str2num(object_size);
-               if (!prefs->os) {
-                       XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
-                       goto arg_fail;
-               } else if (prefs->os % prefs->bs) {
-                       XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
-                       goto arg_fail;
-               }
-
                //In this case, the maximum number of requests is the number of blocks
                //we need to cover the total I/O size
-               prefs->max_requests = prefs->ts / prefs->bs;
+               prefs->status->max = prefs->ts / prefs->bs;
+       } else {
+               XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
+               goto arg_fail;
+       }
+
+       //Object size (os): Defaults to 4M.
+       //Must have the same format as "block size"
+       //Must be integer multiple of "block size"
+       if (!object_size[0])
+               strcpy(object_size,"4M");
+
+       prefs->os = str2num(object_size);
+       if (!prefs->os) {
+               XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
+               goto arg_fail;
+       } else if (prefs->os % prefs->bs) {
+               XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
+               goto arg_fail;
        }
 
-       /*************************
+
+       /*************************\
         * Check port parameters *
-        *************************/
+       \*************************/
 
        if (dst_port < 0){
-               XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
+               XSEGLOG2(&lc, E, "Target port must be supplied\n");
                goto arg_fail;
        }
 
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
        prefs->dst_port = (xport) dst_port;
 
-       /*********************************
+       /*********************************\
         * Create timers for all metrics *
-        *********************************/
+       \*********************************/
 
-       if (init_timer(&prefs->total_tm, TM_SANE))
+       if (init_timer(&prefs->total_tm, INSANITY_SANE))
                goto tm_fail;
-       if (init_timer(&prefs->sub_tm, TM_MANIC))
+       if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
                goto tm_fail;
-       if (init_timer(&prefs->get_tm, TM_PARANOID))
+       if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
                goto tm_fail;
-       if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
+       if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
                goto tm_fail;
 
-       /********************************\
-        * Customize struct peerd/prefs *
-        \********************************/
-
-       prefs->peer = peer;
-
-       //The following function initializes the global_id, global_seed extern
-       //variables.
-       create_id();
+       /*************************************\
+        * Initialize the LFSR and global_id *
+       \*************************************/
+reseed:
+       //We proceed to initialise the global_id, and seed variables.
+       if (seed == -1) {
+               clock_gettime(CLOCK_MONOTONIC_RAW, &timer_seed);
+               seed = timer_seed.tv_nsec;
+       } else {
+               set_by_hand = 1;
+       }
+       create_id(seed);
 
-       if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
-               prefs->lfsr = malloc(sizeof(struct lfsr));
+       if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
+               prefs->lfsr = malloc(sizeof(struct bench_lfsr));
                if (!prefs->lfsr) {
                        perror("malloc");
                        goto lfsr_fail;
                }
-               //FIXME: Give a name to max requests, not just prefs->ts / prefs->bs
-               //FIXME: handle better the seed passing than just giving UINT64_MAX
-               if (lfsr_init(prefs->lfsr, prefs->max_requests, UINT64_MAX)) {
-                       XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
+
+               r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
+               if (r && set_by_hand) {
+                       XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
                        goto lfsr_fail;
+               } else if (r) {
+                       seed = -1;
+                       goto reseed;
                }
        }
 
-       peer->peerd_loop = custom_peerd_loop;
+       /****************************\
+        * Finalize initializations *
+       \****************************/
+
+       /* The request cap must be enforced only after the LFSR is initialized */
+       if (request_cap[0]) {
+               rc = str2num(request_cap);
+               if (!rc) {
+                       XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
+                       goto arg_fail;
+               } else if (rc > prefs->status->max) {
+                       XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
+                       goto arg_fail;
+               }
+               prefs->status->max = rc;
+       }
+
+       prefs->peer = peer;
+       peer->peerd_loop = bench_peerd_loop;
        peer->priv = (void *) prefs;
+       XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
        return 0;
 
 arg_fail:
@@ -351,7 +400,7 @@ static int send_request(struct peerd *peer, struct bench *prefs)
 
        //srcport and dstport must already be provided by the user.
        //returns struct xseg_request with basic initializations
-       //XSEGLOG2(&lc, D, "Get new request\n");
+       XSEGLOG2(&lc, D, "Get new request\n");
        timer_start(prefs, prefs->get_tm);
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
        if (!req) {
@@ -361,7 +410,7 @@ static int send_request(struct peerd *peer, struct bench *prefs)
        timer_stop(prefs, prefs->get_tm, NULL);
 
        //Allocate enough space for the data and the target's name
-       //XSEGLOG2(&lc, D, "Prepare new request\n");
+       XSEGLOG2(&lc, D, "Prepare new request\n");
        r = xseg_prep_request(xseg, req, TARGETLEN, size);
        if (r < 0) {
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
@@ -371,24 +420,24 @@ static int send_request(struct peerd *peer, struct bench *prefs)
 
        //Determine what the next target/chunk will be, based on I/O pattern
        new = determine_next(prefs);
+       req->op = prefs->op;
        XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
-       //Create a target of this format: "bench-<obj_no>"
+       //Create a target of this format: "bench-<global_id>-<obj_no>"
        create_target(prefs, req, new);
 
-       if(prefs->op == X_WRITE || prefs->op == X_READ) {
+       if (prefs->op == X_WRITE || prefs->op == X_READ) {
                req->size = size;
-               //Calculate the chunk offset inside the object
-               req->offset = (new * prefs->bs) % prefs->os;
+               //Calculate the chunk's offset inside the object
+               req->offset = calculate_offset(prefs, new);
                XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
 
-               if(prefs->op == X_WRITE)
+               if (prefs->op == X_WRITE)
                        create_chunk(prefs, req, new);
        }
 
-       req->op = prefs->op;
 
        //Measure this?
-       //XSEGLOG2(&lc, D, "Allocate peer request\n");
+       XSEGLOG2(&lc, D, "Allocate peer request\n");
        pr = alloc_peer_req(peer);
        if (!pr) {
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
@@ -399,7 +448,7 @@ static int send_request(struct peerd *peer, struct bench *prefs)
        pr->portno = srcport;
        pr->req = req;
        pr->priv = malloc(sizeof(struct timespec));
-       if(!pr->priv) {
+       if (!pr->priv) {
                perror("malloc");
                goto put_peer_request;
        }
@@ -418,24 +467,25 @@ static int send_request(struct peerd *peer, struct bench *prefs)
         * QUESTION: Is this the fastest way?
         */
        timer_start(prefs, prefs->rec_tm);
-       if (prefs->rec_tm->insanity <= prefs->insanity)
+       if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags))
                memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
 
        //Submit the request from the source port to the target port
-       //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
+       XSEGLOG2(&lc, D, "Submit request %lu\n", new);
        timer_start(prefs, prefs->sub_tm);
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
        if (p == NoPort) {
                XSEGLOG2(&lc, W, "Cannot submit request\n");
                goto put_peer_request;
        }
+       prefs->status->submitted++;
        timer_stop(prefs, prefs->sub_tm, NULL);
 
        //Send SIGIO to the process that has bound this port to inform that
        //IO is possible
        r = xseg_signal(xseg, p);
-       if (r < 0)
-               XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
+       //if (r < 0)
+       //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
 
        return 0;
 
@@ -452,7 +502,7 @@ put_xseg_request:
  * This function substitutes the default generic_peerd_loop of peer.c.
  * It's plugged to struct peerd at custom peer's initialisation
  */
-int custom_peerd_loop(void *arg)
+int bench_peerd_loop(void *arg)
 {
 #ifdef MT
        struct thread *t = (struct thread *) arg;
@@ -490,9 +540,9 @@ send_request:
                while (CAN_SEND_REQUEST(prefs)) {
                        xseg_cancel_wait(xseg, peer->portno_start);
                        XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
-                                       prefs->sub_tm->completed - prefs->rec_tm->completed,
-                                       prefs->iodepth, prefs->sub_tm->completed,
-                                       prefs->max_requests);
+                                       prefs->status->submitted - prefs->status->received,
+                                       prefs->iodepth, prefs->status->received,
+                                       prefs->status->max);
                        XSEGLOG2(&lc, D, "Start sending new request\n");
                        r = send_request(peer, prefs);
                        if (r < 0)
@@ -506,7 +556,7 @@ send_request:
                        if (check_ports(peer)) {
                                //If an old request has just been acked, the most sensible
                                //thing to do is to immediately send a new one
-                               if (prefs->rec_tm->completed < prefs->max_requests)
+                               if (prefs->status->received < prefs->status->max)
                                        goto send_request;
                                else
                                        return 0;
@@ -547,13 +597,18 @@ void custom_peer_finalize(struct peerd *peer)
        return;
 }
 
-
+/*
+ * handle_received: +1 to our received requests.
+ * Do some sanity checks and then check if request is failed.
+ * If not try to verify the request if asked.
+ */
 static void handle_received(struct peerd *peer, struct peer_req *pr)
 {
        //FIXME: handle null pointer
        struct bench *prefs = peer->priv;
        struct timer *rec = prefs->rec_tm;
 
+       prefs->status->received++;
        if (!pr->req) {
                //This is a serious error, so we must stop
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
@@ -561,13 +616,18 @@ static void handle_received(struct peerd *peer, struct peer_req *pr)
                return;
        }
 
-       if (!pr->priv) {
+       if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
                return;
        }
 
        timer_stop(prefs, rec, pr->priv);
 
+       if (!(pr->req->state & XS_SERVED))
+               prefs->status->failed++;
+       else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
+               prefs->status->corrupted++;
+
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");