bench: Improve benchmark results
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN + 1];
52 /*
53  * This macro checks two things:
54  * a) If in-flight requests are less than given iodepth
55  * b) If we have submitted all of the requests
56  */
57 #define CAN_SEND_REQUEST(__p)                                                                                   \
58         ((__p->status->submitted - __p->status->received < __p->iodepth) && \
59         (__p->status->submitted < __p->status->max) &&                                          \
60         !isTerminate())
61
62 #define CAN_VERIFY(__p)                                                                                                 \
63         ((GET_FLAG(VERIFY, __p->flags) != VERIFY_NO) && __p->op == X_READ)
64
65 #define CAN_PRINT_PROGRESS(__p, __q)                                                                    \
66         ((GET_FLAG(PROGRESS, __p->flags) == PROGRESS_YES) &&                            \
67         (__p->status->received == __q))
68
69 void custom_peer_usage()
70 {
71         fprintf(stderr, "Custom peer options: \n"
72                         "  --------------------------------------------\n"
73                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
74                         "    --pattern | None    | I/O pattern [seq|rand]\n"
75                         "    --verify  | no      | Verify written requests [no|meta|full]\n"
76                         "    -rc       | None    | Request cap\n"
77                         "    -to       | None    | Total objects\n"
78                         "    -ts       | None    | Total I/O size\n"
79                         "    -os       | 4M      | Object size\n"
80                         "    -bs       | 4k      | Block size\n"
81                         "    -tp       | None    | Target port\n"
82                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
83                         "    --seed    | None    | Initialize LFSR and target names\n"
84                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
85                         "              |         |     [sane|eccentric|manic|paranoid]\n"
86                         "    --progress| yes     | Show progress of requests\n"
87                         "\n"
88                         "Additional information:\n"
89                         "  --------------------------------------------\n"
90                         "  The -to and -ts options are mutually exclusive\n"
91                         "\n");
92 }
93
94 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
95 {
96         struct bench *prefs;
97         char request_cap[MAX_ARG_LEN + 1];
98         char total_objects[MAX_ARG_LEN + 1];
99         char total_size[MAX_ARG_LEN + 1];
100         char object_size[MAX_ARG_LEN + 1];
101         char block_size[MAX_ARG_LEN + 1];
102         char op[MAX_ARG_LEN + 1];
103         char pattern[MAX_ARG_LEN + 1];
104         char insanity[MAX_ARG_LEN + 1];
105         char verify[MAX_ARG_LEN + 1];
106         char progress[MAX_ARG_LEN + 1];
107         struct xseg *xseg = peer->xseg;
108         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
109         long iodepth = -1;
110         long dst_port = -1;
111         unsigned long seed = -1;
112         uint64_t rc;
113         struct timespec timer_seed;
114         struct timespec *ts;
115         int set_by_hand = 0;
116         int i, r;
117
118         op[0] = 0;
119         pattern[0] = 0;
120         total_objects[0] = 0;
121         total_size[0] = 0;
122         block_size[0] = 0;
123         object_size[0] = 0;
124         insanity[0] = 0;
125         verify[0] = 0;
126         request_cap[0] = 0;
127         progress[0] = 0;
128
129         prefs = malloc(sizeof(struct bench));
130         if (!prefs) {
131                 perror("malloc");
132                 goto prefs_fail;
133         }
134         memset(prefs, 0, sizeof(struct bench));
135
136         prefs->status = malloc(sizeof(struct req_status));
137         if (!prefs->status) {
138                 perror("malloc");
139                 goto status_fail;
140         }
141         memset(prefs->status, 0, sizeof(struct req_status));
142
143         for (i = 0; i < peer->nr_ops; i++) {
144                 ts = malloc(sizeof(struct timespec));
145                 if (!ts) {
146                         perror("malloc");
147                         goto priv_fail;
148                 }
149                 peer->peer_reqs[i].priv = ts;
150         }
151
152         //Begin reading the benchmark-specific arguments
153         BEGIN_READ_ARGS(argc, argv);
154         READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
155         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
156         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
157         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
158         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
159         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
160         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
161         READ_ARG_ULONG("--iodepth", iodepth);
162         READ_ARG_ULONG("-tp", dst_port);
163         READ_ARG_ULONG("--seed", seed);
164         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
165         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
166         READ_ARG_STRING("--progress", progress, MAX_ARG_LEN);
167         END_READ_ARGS();
168
169         /*****************************\
170          * Check I/O type parameters *
171         \*****************************/
172
173         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
174         //The I/O pattern of these operations can be either sequential (seq) or
175         //random (rand)
176         if (!op[0]) {
177                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
178                 goto arg_fail;
179         }
180         r = read_op(op);
181         if (r < 0) {
182                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
183                 goto arg_fail;
184         }
185         prefs->op = r;
186
187         if (!pattern[0]) {
188                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
189                 goto arg_fail;
190         }
191         r = read_pattern(pattern);
192         if (r < 0) {
193                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
194                 goto arg_fail;
195         }
196         SET_FLAG(PATTERN, prefs->flags, r);
197
198         if (!verify[0])
199                 strcpy(verify, "no");
200         r = read_verify(verify);
201         if (r < 0) {
202                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
203                 goto arg_fail;
204         }
205         SET_FLAG(VERIFY, prefs->flags, r);
206
207         //Default iodepth value is 1
208         if (iodepth < 0)
209                 prefs->iodepth = 1;
210         else
211                 prefs->iodepth = iodepth;
212
213         /**************************\
214          * Check timer parameters *
215         \**************************/
216
217         //Most of the times, not all timers need to be used.
218         //We can choose which timers will be used by adjusting the "insanity"
219         //level of the benchmark i.e. the obscurity of code paths (get request,
220         //submit request) that will be timed.
221         if (!insanity[0])
222                 strcpy(insanity, "sane");
223
224         r = read_insanity(insanity);
225         if (r < 0) {
226                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
227                 goto arg_fail;
228         }
229         SET_FLAG(INSANITY, prefs->flags, r);
230
231         /*****************************\
232          * Check I/O size parameters *
233         \*****************************/
234
235         //Block size (bs): Defaults to 4K.
236         //It must be a number followed by one of these characters:
237         //                                              [k|K|m|M|g|G]
238         //If not, it will be considered as size in bytes.
239         //Must be integer multiple of segment's page size (typically 4k).
240         if (!block_size[0])
241                 strcpy(block_size,"4k");
242
243         prefs->bs = str2num(block_size);
244         if (!prefs->bs) {
245                 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
246                 goto arg_fail;
247         } else if (prefs->bs % xseg_page_size) {
248                 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
249                 goto arg_fail;
250         }
251
252         //Total objects (to) or total I/O size (ts).
253         //Must have the same format as "block size"
254         //They are mutually exclusive
255         if (total_objects[0] && total_size[0]) {
256                 XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
257                 goto arg_fail;
258         } else if (total_objects[0]) {
259                 prefs->to = str2num(total_objects);
260                 if (!prefs->to) {
261                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
262                         goto arg_fail;
263                 }
264                 //In this case, the maximum number of requests is the total number of
265                 //objects we will handle
266                 prefs->status->max = prefs->to;
267         } else if (total_size[0]) {
268                 if (prefs->op != X_READ && prefs->op != X_WRITE) {
269                         XSEGLOG2(&lc, E,
270                                         "Total objects must be supplied (required by -op %s)\n", op);
271                         goto arg_fail;
272                 }
273                 prefs->ts = str2num(total_size);
274                 if (!prefs->ts) {
275                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
276                         goto arg_fail;
277                 } else if (prefs->ts % prefs->bs) {
278                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
279                         goto arg_fail;
280                 }
281                 //In this case, the maximum number of requests is the number of blocks
282                 //we need to cover the total I/O size
283                 prefs->status->max = prefs->ts / prefs->bs;
284         } else {
285                 XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
286                 goto arg_fail;
287         }
288
289         //Object size (os): Defaults to 4M.
290         //Must have the same format as "block size"
291         //Must be integer multiple of "block size"
292         if (!object_size[0])
293                 strcpy(object_size,"4M");
294
295         prefs->os = str2num(object_size);
296         if (!prefs->os) {
297                 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
298                 goto arg_fail;
299         } else if (prefs->os % prefs->bs) {
300                 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
301                 goto arg_fail;
302         }
303
304
305         /*************************\
306          * Check port parameters *
307         \*************************/
308
309         if (dst_port < 0){
310                 XSEGLOG2(&lc, E, "Target port must be supplied\n");
311                 goto arg_fail;
312         }
313
314         prefs->src_port = peer->portno_start; //TODO: allow user to change this
315         prefs->dst_port = (xport) dst_port;
316
317         /*********************************\
318          * Create timers for all metrics *
319         \*********************************/
320
321         if (init_timer(&prefs->total_tm, INSANITY_SANE))
322                 goto tm_fail;
323         if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
324                 goto tm_fail;
325         if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
326                 goto tm_fail;
327         if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
328                 goto tm_fail;
329
330         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
331                 for (i = 0; i < peer->nr_ops; i++) {
332                         ts = malloc(sizeof(struct timespec));
333                         if (!ts) {
334                                 XSEGLOG2(&lc, E, "Timespec allocation failed\n");
335                                 goto tm_fail;
336                         }
337                         peer->peer_reqs[i].priv = ts;
338                 }
339         }
340
341         /*************************************\
342          * Initialize the LFSR and global_id *
343         \*************************************/
344 reseed:
345         //We proceed to initialise the global_id, and seed variables.
346         if (seed == -1) {
347                 clock_gettime(CLOCK_BENCH, &timer_seed);
348                 seed = timer_seed.tv_nsec;
349         } else {
350                 set_by_hand = 1;
351         }
352         create_id(seed);
353
354         if (prefs->status->max == 1)
355                 SET_FLAG(PATTERN, prefs->flags, PATTERN_SEQ);
356
357         if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
358                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
359                 if (!prefs->lfsr) {
360                         perror("malloc");
361                         goto lfsr_fail;
362                 }
363
364                 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
365                 if (r && set_by_hand) {
366                         XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
367                         goto lfsr_fail;
368                 } else if (r) {
369                         seed = -1;
370                         goto reseed;
371                 }
372         }
373
374         /*********************************\
375          * Miscellaneous initializations *
376         \*********************************/
377
378         /* The request cap must be enforced only after the LFSR is initialized */
379         if (request_cap[0]) {
380                 rc = str2num(request_cap);
381                 if (!rc) {
382                         XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
383                         goto arg_fail;
384                 } else if (rc > prefs->status->max) {
385                         XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
386                         goto arg_fail;
387                 }
388                 prefs->status->max = rc;
389         }
390
391         /* Benchmarking progress printing is on by default */
392         if (!progress[0])
393                 strcpy(progress, "yes");
394         r = read_progress(progress);
395         if (r < 0) {
396                 XSEGLOG2(&lc, E, "Invalid syntax: --progress %s\n", progress);
397                 goto arg_fail;
398         }
399         SET_FLAG(PROGRESS, prefs->flags, r);
400
401         prefs->peer = peer;
402         peer->peerd_loop = bench_peerd_loop;
403         peer->priv = (void *) prefs;
404         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
405         return 0;
406
407 arg_fail:
408         custom_peer_usage();
409 lfsr_fail:
410         free(prefs->lfsr);
411 tm_fail:
412         free(prefs->total_tm);
413         free(prefs->sub_tm);
414         free(prefs->get_tm);
415         free(prefs->rec_tm);
416 priv_fail:
417         for (; i >= 0; i--) {
418                 free(peer->peer_reqs[i].priv);
419         }
420 status_fail:
421         free(prefs->status);
422 prefs_fail:
423         free(prefs);
424         return -1;
425 }
426
427
428 static int send_request(struct peerd *peer, struct bench *prefs)
429 {
430         struct xseg_request *req;
431         struct xseg *xseg = peer->xseg;
432         struct peer_req *pr;
433         xport srcport = prefs->src_port;
434         xport dstport = prefs->dst_port;
435         xport p;
436
437         int r;
438         uint64_t new;
439         uint64_t size = prefs->bs;
440         struct timespec *ts;
441
442         //srcport and dstport must already be provided by the user.
443         //returns struct xseg_request with basic initializations
444         XSEGLOG2(&lc, D, "Get new request\n");
445         timer_start(prefs, prefs->get_tm);
446         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
447         if (!req) {
448                 XSEGLOG2(&lc, W, "Cannot get request\n");
449                 return -1;
450         }
451         timer_stop(prefs, prefs->get_tm, NULL);
452
453         //Allocate enough space for the data and the target's name
454         XSEGLOG2(&lc, D, "Prepare new request\n");
455         r = xseg_prep_request(xseg, req, TARGETLEN, size);
456         if (r < 0) {
457                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
458                                 TARGETLEN, (unsigned long long)size);
459                 goto put_xseg_request;
460         }
461
462         //Determine what the next target/chunk will be, based on I/O pattern
463         new = determine_next(prefs);
464         req->op = prefs->op;
465         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
466         //Create a target of this format: "bench-<global_id>-<obj_no>"
467         create_target(prefs, req, new);
468
469         if (prefs->op == X_WRITE || prefs->op == X_READ) {
470                 req->size = size;
471                 //Calculate the chunk's offset inside the object
472                 req->offset = calculate_offset(prefs, new);
473                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
474
475                 if (prefs->op == X_WRITE)
476                         create_chunk(prefs, req, new);
477         }
478
479         XSEGLOG2(&lc, D, "Allocate peer request\n");
480         pr = alloc_peer_req(peer);
481         if (!pr) {
482                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
483                                 peer->nr_ops - xq_count(&peer->free_reqs));
484                 goto put_xseg_request;
485         }
486         pr->peer = peer;
487         pr->portno = srcport;
488         pr->req = req;
489
490         //XSEGLOG2(&lc, D, "Set request data\n");
491         r = xseg_set_req_data(xseg, req, pr);
492         if (r < 0) {
493                 XSEGLOG2(&lc, W, "Cannot set request data\n");
494                 goto put_peer_request;
495         }
496
497         /*
498          * Start measuring receive time.
499          * When we receive a request, we need to have its submission time to
500          * measure elapsed time. Thus, we copy its submission time to pr->priv.
501          * QUESTION: Is this the fastest way?
502          */
503         timer_start(prefs, prefs->rec_tm);
504         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
505                 ts = (struct timespec *)pr->priv;
506                 ts->tv_sec = prefs->rec_tm->start_time.tv_sec;
507                 ts->tv_nsec = prefs->rec_tm->start_time.tv_nsec;
508         }
509
510         //Submit the request from the source port to the target port
511         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
512         timer_start(prefs, prefs->sub_tm);
513         p = xseg_submit(xseg, req, srcport, X_ALLOC);
514         if (p == NoPort) {
515                 XSEGLOG2(&lc, W, "Cannot submit request\n");
516                 goto put_peer_request;
517         }
518         prefs->status->submitted++;
519         timer_stop(prefs, prefs->sub_tm, NULL);
520
521         //Send SIGIO to the process that has bound this port to inform that
522         //IO is possible
523         r = xseg_signal(xseg, p);
524         //if (r < 0)
525         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
526
527         return 0;
528
529 put_peer_request:
530         free_peer_req(peer, pr);
531 put_xseg_request:
532         if (xseg_put_request(xseg, req, srcport))
533                 XSEGLOG2(&lc, W, "Cannot put request\n");
534         return -1;
535 }
536
537 /*
538  * This function substitutes the default generic_peerd_loop of peer.c.
539  * It's plugged to struct peerd at custom peer's initialisation
540  */
541 int bench_peerd_loop(void *arg)
542 {
543 #ifdef MT
544         struct thread *t = (struct thread *) arg;
545         struct peerd *peer = t->peer;
546         char *id = t->arg;
547 #else
548         struct peerd *peer = (struct peerd *) arg;
549         char id[4] = {'P','e','e','r'};
550 #endif
551         struct xseg *xseg = peer->xseg;
552         struct bench *prefs = peer->priv;
553         xport portno_start = peer->portno_start;
554         xport portno_end = peer->portno_end;
555         pid_t pid = syscall(SYS_gettid);
556         uint64_t threshold=1000/(1 + portno_end - portno_start);
557         uint64_t cached_prog_quantum = 0;
558         uint64_t prog_quantum = 0;
559         int r;
560         uint64_t loops;
561
562         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES) {
563                 prog_quantum = calculate_prog_quantum(prefs);
564                 cached_prog_quantum = prog_quantum;
565                 print_stats(prefs);
566         }
567
568         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
569         xseg_init_local_signal(xseg, peer->portno_start);
570
571         timer_start(prefs, prefs->total_tm);
572 send_request:
573         while (!(isTerminate() && all_peer_reqs_free(peer))) {
574                 while (CAN_SEND_REQUEST(prefs)) {
575                         xseg_cancel_wait(xseg, peer->portno_start);
576                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
577                                         prefs->status->submitted - prefs->status->received,
578                                         prefs->iodepth, prefs->status->received,
579                                         prefs->status->max);
580                         XSEGLOG2(&lc, D, "Start sending new request\n");
581                         r = send_request(peer, prefs);
582                         if (r < 0)
583                                 break;
584                 }
585                 //Heart of peerd_loop. This loop is common for everyone.
586                 for (loops = threshold; loops > 0; loops--) {
587                         if (loops == 1)
588                                 xseg_prepare_wait(xseg, peer->portno_start);
589
590                         if (UNLIKELY(CAN_PRINT_PROGRESS(prefs, prog_quantum))) {
591                                 prog_quantum += cached_prog_quantum;
592                                 print_progress(prefs);
593                         }
594
595                         if (check_ports(peer)) {
596                                 //If an old request has just been acked, the most sensible
597                                 //thing to do is to immediately send a new one
598                                 if (prefs->status->received < prefs->status->max)
599                                         goto send_request;
600                                 else
601                                         return 0;
602                         }
603                 }
604                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
605                 //struct xq *q;
606                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
607                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
608                 //              id, xq_count(q));
609                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
610                 xseg_wait_signal(xseg, 10000000UL);
611                 xseg_cancel_wait(xseg, peer->portno_start);
612                 XSEGLOG2(&lc, I, "%s woke up\n", id);
613         }
614
615         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
616                         xq_count(&peer->free_reqs), peer->nr_ops);
617         return 0;
618 }
619
620 void custom_peer_finalize(struct peerd *peer)
621 {
622         struct bench *prefs = peer->priv;
623         //TODO: Measure mean time, standard variation
624
625         if (!prefs->total_tm->completed)
626                 timer_stop(prefs, prefs->total_tm, NULL);
627
628         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES)
629                 print_progress(prefs);
630         else
631                 print_stats(prefs);
632
633         print_remaining(prefs);
634         print_res(prefs);
635         return;
636 }
637
638 /*
639  * handle_received: +1 to our received requests.
640  * Do some sanity checks and then check if request is failed.
641  * If not try to verify the request if asked.
642  */
643 static void handle_received(struct peerd *peer, struct peer_req *pr)
644 {
645         //FIXME: handle null pointer
646         struct bench *prefs = peer->priv;
647         struct timer *rec = prefs->rec_tm;
648
649         prefs->status->received++;
650         if (!pr->req) {
651                 //This is a serious error, so we must stop
652                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
653                 terminated++;
654                 return;
655         }
656
657         if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
658                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
659                 return;
660         }
661
662         timer_stop(prefs, rec, (struct timespec *)pr->priv);
663
664         if (!(pr->req->state & XS_SERVED))
665                 prefs->status->failed++;
666         else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
667                 prefs->status->corrupted++;
668
669         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
670                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
671
672         free_peer_req(peer, pr);
673 }
674
675 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
676                 enum dispatch_reason reason)
677 {
678         switch (reason) {
679                 case dispatch_accept:
680                         //This is wrong, benchmarking peer should not accept requests,
681                         //only receive them.
682                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
683                         complete(peer, pr);
684                         break;
685                 case dispatch_receive:
686                         handle_received(peer, pr);
687                         break;
688                 default:
689                         fail(peer, pr);
690         }
691         return 0;
692 }