bench: Handle progress report better
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <xseg/xseg.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/types.h>
42 #include <pthread.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50 #include <math.h>
51
52 /*
53  * This macro checks two things:
54  * a) If in-flight requests are less than given iodepth
55  * b) If we have submitted all of the requests
56  * c) If we are not in ping mode
57  * d) If we have been asked to terminate
58  */
59 #define CAN_SEND_REQUEST(__p)                                           \
60         ((__p->status->submitted - __p->status->received < __p->iodepth) && \
61         (__p->status->submitted < __p->status->max) &&                  \
62         (GET_FLAG(PING, __p->flags) == PING_MODE_OFF) &&                \
63          !isTerminate())
64
65 #define CAN_VERIFY(__p)                                                 \
66         ((GET_FLAG(VERIFY, __p->flags) != VERIFY_NO) && __p->op == X_READ)
67
68 #define CAN_PRINT_PROGRESS(__p, __q)                                    \
69         ((GET_FLAG(PROGRESS, __p->flags) == PROGRESS_YES) &&            \
70         (__p->status->received == __q))
71
72 void custom_peer_usage()
73 {
74         fprintf(stderr, "Custom peer options: \n"
75                 "  --------------------------------------------\n"
76                 "    -op       | None    | XSEG operation [read|write|info|delete]\n"
77                 "    --pattern | None    | I/O pattern [seq|rand]\n"
78                 "    --verify  | no      | Verify written requests [no|meta|full]\n"
79                 "    -rc       | None    | Request cap\n"
80                 "    -to       | None    | Total objects\n"
81                 "    -ts       | None    | Total I/O size\n"
82                 "    -os       | 4M      | Object size\n"
83                 "    -bs       | 4k      | Block size\n"
84                 "    -tp       | None    | Target port\n"
85                 "    --iodepth | 1       | Number of in-flight I/O requests\n"
86                 "    --seed    | None    | Initialize LFSR and target names\n"
87                 "    --insanity| sane    | Adjust insanity level of benchmark:\n"
88                 "              |         |     [sane|eccentric|manic|paranoid]\n"
89                 "    --progress| yes     | Show progress of requests [yes|no]\n"
90                 "    --ping    | yes     | Ping target before starting benchmark\n"
91                 "              |         |     [yes|no]\n"
92                 "    --prefix  | 'bench' | Add a common prefix to all object names\n"
93                 "    --objname | 'bench' | Use only one object with this name\n"
94                 "\n"
95                 "Additional information:\n"
96                 "  --------------------------------------------\n"
97                 "  * The -to and -ts options are mutually exclusive\n"
98                 "\n"
99                 "  * The object name is always not null-terminated and\n"
100                 "    defaults to the following structure:\n"
101                 "           <prefix>-<seed>-<object number>\n"
102                 "\n"
103                 "    where:\n"
104                 "    a. <prefix> is given by user or defaults to 'bench'\n"
105                 "    b. <seed> is given by user or defaults to a random value.\n"
106                 "       Its length will be 9 digits, with trailing zeros where\n"
107                 "       necessary\n"
108                 "    c. <object number> is out of the user's control. It is\n"
109                 "       calculated during the benchmark and is a 15-digit\n"
110                 "       number, allowing a maximum of 1 quadrillion objects\n"
111                 "\n"
112                 "   So, if bench is called with the arguments:\n"
113                 "           --prefix obj --seed 999\n"
114                 "\n"
115                 "   and <object number> is 9,the resulting object name will\n"
116                 "   be:\n"
117                 "           obj-000000999-000000000000009\n"
118                 "\n"
119                 " * The above object name structure can by bypassed with the\n"
120                 "   --objname <object name> argument. This implies the\n"
121                 "   following:\n"
122                 "\n"
123                 "   a. -to option is strictly restricted to 1\n"
124                 "   b. -ts option defaults to (and can't be larger than)\n"
125                 "      the object size (-os argument)\n"
126                 "   c. --prefix is must be unused. If used, it produces an "
127                 "      error to alert the user\n"
128                 "\n");
129 }
130
131 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
132 {
133         struct bench *prefs;
134         char request_cap[MAX_ARG_LEN + 1];
135         char total_objects[MAX_ARG_LEN + 1];
136         char total_size[MAX_ARG_LEN + 1];
137         char object_size[MAX_ARG_LEN + 1];
138         char block_size[MAX_ARG_LEN + 1];
139         char op[MAX_ARG_LEN + 1];
140         char pattern[MAX_ARG_LEN + 1];
141         char insanity[MAX_ARG_LEN + 1];
142         char verify[MAX_ARG_LEN + 1];
143         char progress[MAX_ARG_LEN + 1];
144         char ping[MAX_ARG_LEN + 1];
145         char prefix[XSEG_MAX_TARGETLEN + 1];
146         char objname[XSEG_MAX_TARGETLEN + 1];
147         struct xseg *xseg = peer->xseg;
148         struct object_vars *obv;
149         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
150         long iodepth = -1;
151         long dst_port = -1;
152         unsigned long seed = -1;
153         unsigned long seed_max;
154         uint64_t rc;
155         struct timespec *ts;
156         int set_by_hand = 1;
157         int j, r;
158
159         op[0] = 0;
160         pattern[0] = 0;
161         total_objects[0] = 0;
162         total_size[0] = 0;
163         block_size[0] = 0;
164         object_size[0] = 0;
165         insanity[0] = 0;
166         verify[0] = 0;
167         request_cap[0] = 0;
168         progress[0] = 0;
169         ping[0] = 0;
170         prefix[0] = 0;
171         objname[0] = 0;
172
173         /* allocate struct bench */
174         prefs = malloc(sizeof(struct bench));
175         if (!prefs) {
176                 perror("malloc");
177                 goto prefs_fail;
178         }
179         memset(prefs, 0, sizeof(struct bench));
180
181         /* allocate struct req_status */
182         prefs->status = malloc(sizeof(struct req_status));
183         if (!prefs->status) {
184                 perror("malloc");
185                 goto status_fail;
186         }
187         memset(prefs->status, 0, sizeof(struct req_status));
188
189         /* allocate struct object_name */
190         prefs->objvars = malloc(sizeof(struct object_vars));
191         if (!prefs->objvars) {
192                 perror("malloc");
193                 goto object_name_fail;
194         }
195         memset(prefs->objvars, 0, sizeof(struct object_vars));
196
197         /* allocate a struct timespec for each peer request */
198         for (j = 0; j < peer->nr_ops; j++) {
199                 ts = malloc(sizeof(struct timespec));
200                 if (!ts) {
201                         perror("malloc");
202                         goto priv_fail;
203                 }
204                 peer->peer_reqs[j].priv = ts;
205         }
206
207         //Begin reading the benchmark-specific arguments
208         BEGIN_READ_ARGS(argc, argv);
209         READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
210         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
211         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
212         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
213         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
214         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
215         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
216         READ_ARG_ULONG("--iodepth", iodepth);
217         READ_ARG_ULONG("-tp", dst_port);
218         READ_ARG_ULONG("--seed", seed);
219         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
220         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
221         READ_ARG_STRING("--progress", progress, MAX_ARG_LEN);
222         READ_ARG_STRING("--ping", ping, MAX_ARG_LEN);
223         READ_ARG_STRING("--prefix", prefix, XSEG_MAX_TARGETLEN);
224         READ_ARG_STRING("--objname", objname, XSEG_MAX_TARGETLEN);
225         END_READ_ARGS();
226
227         /********************************\
228          * Check object name parameters *
229         \********************************/
230         if (objname[0] && prefix[0]) {
231                 XSEGLOG2(&lc, E, "--objname and --prefix options cannot be"
232                                 "used together.");
233                 goto arg_fail;
234         }
235
236         obv = prefs->objvars;
237         obv->seedlen = SEEDLEN;
238         obv->objnumlen = OBJNUMLEN;
239         if (objname[0]) {
240                 strncpy(obv->name, objname, XSEG_MAX_TARGETLEN);
241                 obv->prefixlen = 0;
242                 obv->namelen = strlen(objname);
243         } else {
244                 if (!prefix[0]) /* In this case we use a default value */
245                         strcpy(prefix, "bench");
246                 strncpy(obv->prefix, prefix, XSEG_MAX_TARGETLEN);
247                 obv->prefixlen = strlen(prefix);
248                 /* We add 2 for the extra dashes */
249                 obv->namelen = obv->prefixlen + obv->seedlen +
250                         obv->objnumlen + 2;
251         }
252
253         /* Only --prefix can exceed bounds since --objname is bounded */
254         if (obv->namelen > XSEG_MAX_TARGETLEN) {
255                 XSEGLOG2(&lc, E, "--prefix %s: Prefix is too long.", prefix);
256                 goto arg_fail;
257         }
258
259         /*****************************\
260          * Check I/O type parameters *
261         \*****************************/
262
263         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
264         //The I/O pattern of these operations can be either sequential (seq) or
265         //random (rand)
266         if (!op[0]) {
267                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
268                 goto arg_fail;
269         }
270         r = read_op(op);
271         if (r < 0) {
272                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
273                 goto arg_fail;
274         }
275         prefs->op = r;
276
277         if (!pattern[0]) {
278                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
279                 goto arg_fail;
280         }
281         r = read_pattern(pattern);
282         if (r < 0) {
283                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
284                 goto arg_fail;
285         }
286         SET_FLAG(PATTERN, prefs->flags, r);
287
288         if (!verify[0])
289                 strcpy(verify, "no");
290         r = read_verify(verify);
291         if (r < 0) {
292                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
293                 goto arg_fail;
294         }
295         SET_FLAG(VERIFY, prefs->flags, r);
296
297         //Default iodepth value is 1
298         if (iodepth < 0)
299                 prefs->iodepth = 1;
300         else
301                 prefs->iodepth = iodepth;
302
303         /**************************\
304          * Check timer parameters *
305         \**************************/
306
307         //Most of the times, not all timers need to be used.
308         //We can choose which timers will be used by adjusting the "insanity"
309         //level of the benchmark i.e. the obscurity of code paths (get request,
310         //submit request) that will be timed.
311         if (!insanity[0])
312                 strcpy(insanity, "sane");
313
314         r = read_insanity(insanity);
315         if (r < 0) {
316                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
317                 goto arg_fail;
318         }
319         SET_FLAG(INSANITY, prefs->flags, r);
320
321         /*****************************\
322          * Check I/O size parameters *
323         \*****************************/
324
325         //Block size (bs): Defaults to 4K.
326         //It must be a number followed by one of these characters:
327         //                                              [k|K|m|M|g|G]
328         //If not, it will be considered as size in bytes.
329         //Must be integer multiple of segment's page size (typically 4k).
330         if (!block_size[0])
331                 strcpy(block_size,"4k");
332
333         prefs->bs = str2num(block_size);
334         if (!prefs->bs) {
335                 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
336                 goto arg_fail;
337         } else if (prefs->bs % xseg_page_size) {
338                 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
339                 goto arg_fail;
340         }
341
342         //Object size (os): Defaults to 4M.
343         //Must have the same format as "block size"
344         //Must be integer multiple of "block size"
345         if (!object_size[0])
346                 strcpy(object_size,"4M");
347
348         prefs->os = str2num(object_size);
349         if (!prefs->os) {
350                 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
351                 goto arg_fail;
352         } else if (prefs->os % prefs->bs) {
353                 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
354                 goto arg_fail;
355         }
356
357         //Total objects (to) or total I/O size (ts).
358         //Must have the same format as "block size"
359         //They are mutually exclusive
360         if (total_objects[0] && total_size[0]) {
361                 XSEGLOG2(&lc, E, "Total objects and total size are "
362                                 "mutually exclusive\n");
363                 goto arg_fail;
364         } else if (total_objects[0]) {
365                 prefs->to = str2num(total_objects);
366                 if (!prefs->to) {
367                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n",
368                                         total_objects);
369                         goto arg_fail;
370                 }
371                 //In this case, the maximum number of requests is the total
372                 //number of objects we will handle
373                 prefs->status->max = prefs->to;
374         } else if (total_size[0]) {
375                 if (prefs->op != X_READ && prefs->op != X_WRITE) {
376                         XSEGLOG2(&lc, E, "Total objects must be supplied "
377                                         "(required by -op %s)\n", op);
378                         goto arg_fail;
379                 }
380                 prefs->ts = str2num(total_size);
381                 if (!prefs->ts) {
382                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
383                         goto arg_fail;
384                 } else if (prefs->ts % prefs->bs) {
385                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
386                         goto arg_fail;
387                 }
388                 //In this case, the maximum number of requests is the number of
389                 //blocks we need to cover the total I/O size
390                 prefs->status->max = prefs->ts / prefs->bs;
391         } else if (!objname[0]) {
392                 XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
393                 goto arg_fail;
394         }
395
396         /*
397          * Enforce --objname restrictions here.
398          */
399         if (obv->name[0]) {
400                 if (prefs->to > 1) {
401                         XSEGLOG2(&lc, E, "-to %s: Total objects are restricted "
402                                         "to 1 due to --objname %s\n",
403                                         total_objects, objname);
404                         goto arg_fail;
405                 } else if (prefs->ts > prefs->os) {
406                         XSEGLOG2(&lc, E, "-ts %s: Total size can't be larger "
407                                         "than object size (%s) due to "
408                                         "--objname %s\n",
409                                         total_size, object_size, objname);
410                         goto arg_fail;
411                 } else if (prefs->op == X_READ || prefs->op == X_WRITE) {
412                         prefs->ts = prefs->os;
413                         prefs->status->max = prefs->ts / prefs->bs;
414                 } else {
415                         prefs->to = 1;
416                         prefs->status->max = 1;
417                 }
418         }
419
420         if (prefs->status->max == 1)
421                 SET_FLAG(PATTERN, prefs->flags, PATTERN_SEQ);
422
423         /*************************\
424          * Check port parameters *
425         \*************************/
426
427         if (dst_port < 0){
428                 XSEGLOG2(&lc, E, "Target port must be supplied\n");
429                 goto arg_fail;
430         }
431
432         prefs->src_port = peer->portno_start; //TODO: allow user to change this
433         prefs->dst_port = (xport) dst_port;
434
435         /*********************************\
436          * Create timers for all metrics *
437         \*********************************/
438
439         if (init_timer(&prefs->total_tm, INSANITY_SANE))
440                 goto tm_fail;
441         if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
442                 goto tm_fail;
443         if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
444                 goto tm_fail;
445         if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
446                 goto tm_fail;
447
448         /***********************\
449          * Initialize the LFSR *
450         \***********************/
451
452         seed_max = pow(10, obv->seedlen + 1) - 1;
453         if (seed == -1) {
454                 srand(time(NULL));
455                 set_by_hand = 0;
456         } else if (validate_seed(prefs, seed)) {
457                 XSEGLOG2(&lc, E, "--seed %lu: Seed larger than %lu. Only its "
458                                 "first %d digits will be used",
459                                 seed, seed_max, obv->seedlen);
460                 goto arg_fail;
461         }
462
463 reseed:
464         if (!set_by_hand)
465                 seed = rand() % seed_max + 1;
466
467         if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
468                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
469                 if (!prefs->lfsr) {
470                         perror("malloc");
471                         goto lfsr_fail;
472                 }
473
474                 r = lfsr_init(prefs->lfsr, prefs->status->max,
475                                 seed, seed & 0xF);
476                 if (r) {
477                         if (!set_by_hand) {
478                                 free(prefs->lfsr);
479                                 goto reseed;
480                         }
481                         XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
482                         goto lfsr_fail;
483                 }
484         }
485         obv->seed = seed;
486
487         /*********************************\
488          * Miscellaneous initializations *
489         \*********************************/
490
491         /* The request cap must be enforced only after the LFSR is initialized */
492         if (request_cap[0]) {
493                 rc = str2num(request_cap);
494                 if (!rc) {
495                         XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
496                         goto arg_fail;
497                 } else if (rc > prefs->status->max) {
498                         XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
499                         goto arg_fail;
500                 }
501                 prefs->status->max = rc;
502         }
503
504         /* Benchmarking progress printing is on by default */
505         if (!progress[0])
506                 strcpy(progress, "yes");
507         r = read_progress(progress);
508         if (r < 0) {
509                 XSEGLOG2(&lc, E, "Invalid syntax: --progress %s\n", progress);
510                 goto arg_fail;
511         }
512         SET_FLAG(PROGRESS, prefs->flags, r);
513
514         /* Pinging the target peer is on by default */
515         if (!ping[0])
516                 strcpy(ping, "yes");
517         r = read_ping(ping);
518         if (r < 0) {
519                 XSEGLOG2(&lc, E, "Invalid syntax: --ping %s\n", ping);
520                 goto arg_fail;
521         }
522         SET_FLAG(PING, prefs->flags, r);
523
524         prefs->peer = peer;
525         peer->peerd_loop = bench_peerd_loop;
526         peer->priv = (void *) prefs;
527
528         if (obv->prefixlen)
529                 XSEGLOG2(&lc, I, "Seed is %u, prefix is %s",
530                                 obv->seed, obv->prefix);
531         else
532                 XSEGLOG2(&lc, I, "Seed is %u, object name is %s",
533                                 obv->seed, obv->name);
534
535         return 0;
536
537 arg_fail:
538         custom_peer_usage();
539 lfsr_fail:
540         free(prefs->lfsr);
541 tm_fail:
542         free(prefs->total_tm);
543         free(prefs->sub_tm);
544         free(prefs->get_tm);
545         free(prefs->rec_tm);
546 priv_fail:
547         j--;
548         for (; j >= 0; j--) {
549                 free(peer->peer_reqs[j].priv);
550         }
551 object_name_fail:
552         free(prefs->objvars);
553 status_fail:
554         free(prefs->status);
555 prefs_fail:
556         free(prefs);
557         return -1;
558 }
559
560
561 static int send_request(struct peerd *peer, struct bench *prefs)
562 {
563         struct xseg_request *req;
564         struct xseg *xseg = peer->xseg;
565         struct peer_req *pr;
566         struct object_vars *obv = prefs->objvars;
567         xport srcport = prefs->src_port;
568         xport dstport = prefs->dst_port;
569         xport p;
570
571         int r;
572         uint64_t new;
573         uint64_t size = prefs->bs;
574         struct timespec *ts;
575
576         //srcport and dstport must already be provided by the user.
577         //returns struct xseg_request with basic initializations
578         XSEGLOG2(&lc, D, "Get new request\n");
579         timer_start(prefs, prefs->get_tm);
580         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
581         if (!req) {
582                 XSEGLOG2(&lc, W, "Cannot get request\n");
583                 return -1;
584         }
585         timer_stop(prefs, prefs->get_tm, NULL);
586
587         /*
588          * Allocate enough space for the data and the target's name.
589          * Also, allocate one extra byte to prevent buffer overflow due to the
590          * obligatory null termination of snprint(). This extra byte will not be
591          * counted as part of the target's name.
592          */
593         XSEGLOG2(&lc, D, "Prepare new request\n");
594         r = xseg_prep_request(xseg, req, obv->namelen + 1, size);
595         if (r < 0) {
596                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
597                                 obv->namelen + 1, (unsigned long long)size);
598                 goto put_xseg_request;
599         }
600         req->targetlen--;
601
602         //Determine what the next target/chunk will be, based on I/O pattern
603         new = determine_next(prefs);
604         req->op = prefs->op;
605         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
606         obv->objnum = __get_object(prefs, new);
607         create_target(prefs, req);
608
609         if (prefs->op == X_WRITE || prefs->op == X_READ) {
610                 req->size = size;
611                 //Calculate the chunk's offset inside the object
612                 req->offset = calculate_offset(prefs, new);
613                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
614
615                 if (prefs->op == X_WRITE)
616                         create_chunk(prefs, req, new);
617         }
618
619         XSEGLOG2(&lc, D, "Allocate peer request\n");
620         pr = alloc_peer_req(peer);
621         if (!pr) {
622                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
623                                 peer->nr_ops - xq_count(&peer->free_reqs));
624                 goto put_xseg_request;
625         }
626         pr->peer = peer;
627         pr->portno = srcport;
628         pr->req = req;
629
630         //XSEGLOG2(&lc, D, "Set request data\n");
631         r = xseg_set_req_data(xseg, req, pr);
632         if (r < 0) {
633                 XSEGLOG2(&lc, W, "Cannot set request data\n");
634                 goto put_peer_request;
635         }
636
637         /*
638          * Start measuring receive time.
639          * When we receive a request, we need to have its submission time to
640          * measure elapsed time. Thus, we copy its submission time to pr->priv.
641          * QUESTION: Is this the fastest way?
642          */
643         timer_start(prefs, prefs->rec_tm);
644         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
645                 ts = (struct timespec *)pr->priv;
646                 ts->tv_sec = prefs->rec_tm->start_time.tv_sec;
647                 ts->tv_nsec = prefs->rec_tm->start_time.tv_nsec;
648         }
649
650         //Submit the request from the source port to the target port
651         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
652         timer_start(prefs, prefs->sub_tm);
653         p = xseg_submit(xseg, req, srcport, X_ALLOC);
654         if (p == NoPort) {
655                 XSEGLOG2(&lc, W, "Cannot submit request\n");
656                 goto put_peer_request;
657         }
658         prefs->status->submitted++;
659         timer_stop(prefs, prefs->sub_tm, NULL);
660
661         //Send SIGIO to the process that has bound this port to inform that
662         //IO is possible
663         r = xseg_signal(xseg, p);
664         //if (r < 0)
665         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
666
667         return 0;
668
669 put_peer_request:
670         free_peer_req(peer, pr);
671 put_xseg_request:
672         if (xseg_put_request(xseg, req, srcport))
673                 XSEGLOG2(&lc, W, "Cannot put request\n");
674         return -1;
675 }
676
677 static int send_ping_request(struct peerd *peer, struct bench *prefs)
678 {
679         struct xseg_request *req;
680         struct xseg *xseg = peer->xseg;
681         struct peer_req *pr;
682         xport srcport = prefs->src_port;
683         xport dstport = prefs->dst_port;
684         xport p;
685         int r;
686
687         XSEGLOG2(&lc, I, "Sending ping request...");
688         //srcport and dstport must already be provided by the user.
689         //returns struct xseg_request with basic initializations
690         XSEGLOG2(&lc, D, "Get new request\n");
691         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
692         if (!req) {
693                 XSEGLOG2(&lc, W, "Cannot get request\n");
694                 return -1;
695         }
696         req->op = X_PING;
697
698         XSEGLOG2(&lc, D, "Allocate peer request\n");
699         pr = alloc_peer_req(peer);
700         if (!pr) {
701                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
702                                 peer->nr_ops - xq_count(&peer->free_reqs));
703                 goto put_xseg_request;
704         }
705         pr->peer = peer;
706         pr->portno = srcport;
707         pr->req = req;
708
709         r = xseg_set_req_data(xseg, req, pr);
710         if (r < 0) {
711                 XSEGLOG2(&lc, W, "Cannot set request data\n");
712                 goto put_peer_request;
713         }
714
715         //Submit the request from the source port to the target port
716         XSEGLOG2(&lc, D, "Submit ping request");
717         p = xseg_submit(xseg, req, srcport, X_ALLOC);
718         if (p == NoPort) {
719                 XSEGLOG2(&lc, W, "Cannot submit request\n");
720                 goto put_peer_request;
721         }
722         timer_stop(prefs, prefs->sub_tm, NULL);
723
724         //Send SIGIO to the process that has bound this port to inform that
725         //IO is possible
726         r = xseg_signal(xseg, p);
727         //if (r < 0)
728         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
729
730         return 0;
731
732 put_peer_request:
733         free_peer_req(peer, pr);
734 put_xseg_request:
735         if (xseg_put_request(xseg, req, srcport))
736                 XSEGLOG2(&lc, W, "Cannot put request\n");
737         return -1;
738 }
739
740 /*
741  * This function substitutes the default generic_peerd_loop of peer.c.
742  * It's plugged to struct peerd at custom peer's initialisation
743  */
744 int bench_peerd_loop(void *arg)
745 {
746 #ifdef MT
747         struct thread *t = (struct thread *) arg;
748         struct peerd *peer = t->peer;
749         char *id = t->arg;
750 #else
751         struct peerd *peer = (struct peerd *) arg;
752         char id[4] = {'P','e','e','r'};
753 #endif
754         struct xseg *xseg = peer->xseg;
755         struct bench *prefs = peer->priv;
756         xport portno_start = peer->portno_start;
757         xport portno_end = peer->portno_end;
758         pid_t pid = syscall(SYS_gettid);
759         uint64_t threshold=1000/(1 + portno_end - portno_start);
760         uint64_t cached_prog_quantum = 0;
761         uint64_t prog_quantum = 0;
762         int r;
763         uint64_t loops;
764
765         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES) {
766                 prog_quantum = calculate_prog_quantum(prefs);
767                 cached_prog_quantum = prog_quantum;
768                 print_stats(prefs);
769         }
770
771         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
772         xseg_init_local_signal(xseg, peer->portno_start);
773
774         /* If no ping is going to be sent, we can begin the benchmark now. */
775         if (GET_FLAG(PING, prefs->flags) == PING_MODE_OFF)
776                 timer_start(prefs, prefs->total_tm);
777         else
778                 send_ping_request(peer, prefs);
779
780 send_request:
781         while (!(isTerminate() && all_peer_reqs_free(peer))) {
782                 while (CAN_SEND_REQUEST(prefs)) {
783                         xseg_cancel_wait(xseg, peer->portno_start);
784                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
785                                         prefs->status->submitted - prefs->status->received,
786                                         prefs->iodepth, prefs->status->received,
787                                         prefs->status->max);
788                         XSEGLOG2(&lc, D, "Start sending new request\n");
789                         r = send_request(peer, prefs);
790                         if (r < 0)
791                                 break;
792                 }
793                 //Heart of peerd_loop. This loop is common for everyone.
794                 for (loops = threshold; loops > 0; loops--) {
795                         if (loops == 1)
796                                 xseg_prepare_wait(xseg, peer->portno_start);
797
798                         if (UNLIKELY(CAN_PRINT_PROGRESS(prefs, prog_quantum))) {
799                                 prog_quantum += cached_prog_quantum;
800                                 print_progress(prefs);
801                         }
802
803                         if (check_ports(peer)) {
804                                 //If an old request has just been acked, the most sensible
805                                 //thing to do is to immediately send a new one
806                                 if (prefs->status->received < prefs->status->max)
807                                         goto send_request;
808                                 else
809                                         return 0;
810                         }
811                 }
812                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
813                 //struct xq *q;
814                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
815                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
816                 //              id, xq_count(q));
817                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
818                 xseg_wait_signal(xseg, 10000000UL);
819                 xseg_cancel_wait(xseg, peer->portno_start);
820                 XSEGLOG2(&lc, I, "%s woke up\n", id);
821         }
822
823         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
824                         xq_count(&peer->free_reqs), peer->nr_ops);
825         return 0;
826 }
827
828 void custom_peer_finalize(struct peerd *peer)
829 {
830         struct bench *prefs = peer->priv;
831         //TODO: Measure mean time, standard variation
832
833         if (!prefs->total_tm->completed)
834                 timer_stop(prefs, prefs->total_tm, NULL);
835
836         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES)
837                 clear_lines(prefs);
838
839         print_stats(prefs);
840         print_remaining(prefs);
841         print_res(prefs);
842         return;
843 }
844
845 /*
846  * handle_received: +1 to our received requests.
847  * Do some sanity checks and then check if request is failed.
848  * If not try to verify the request if asked.
849  */
850 static void handle_received(struct peerd *peer, struct peer_req *pr)
851 {
852         //FIXME: handle null pointer
853         struct bench *prefs = peer->priv;
854         struct timer *rec = prefs->rec_tm;
855         int start_timer = 0;
856
857         if (!pr->req) {
858                 //This is a serious error, so we must stop
859                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
860                 terminated++;
861                 return;
862         }
863
864         /*
865          * If we were in ping mode, we can now switch off and start the
866          * benchmark.
867          */
868         if (GET_FLAG(PING, prefs->flags) == PING_MODE_ON) {
869                 XSEGLOG2(&lc, I, "Ping received. Benchmark can start now.");
870                 SET_FLAG(PING, prefs->flags, PING_MODE_OFF);
871                 start_timer = 1;
872                 goto out;
873         }
874
875         prefs->status->received++;
876
877         if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
878                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
879                 return;
880         }
881
882         timer_stop(prefs, rec, (struct timespec *)pr->priv);
883
884         if (!(pr->req->state & XS_SERVED))
885                 prefs->status->failed++;
886         else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
887                 prefs->status->corrupted++;
888
889 out:
890         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
891                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
892
893         free_peer_req(peer, pr);
894
895         if (start_timer)
896                 timer_start(prefs, prefs->total_tm);
897 }
898
899 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
900                 enum dispatch_reason reason)
901 {
902         switch (reason) {
903                 case dispatch_accept:
904                         //This is wrong, benchmarking peer should not accept requests,
905                         //only receive them.
906                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
907                         complete(peer, pr);
908                         break;
909                 case dispatch_receive:
910                         handle_received(peer, pr);
911                         break;
912                 default:
913                         fail(peer, pr);
914         }
915         return 0;
916 }