bench: Ping target peer before starting timers
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN + 1];
52 /*
53  * This macro checks two things:
54  * a) If in-flight requests are less than given iodepth
55  * b) If we have submitted all of the requests
56  * c) If we are not in ping mode
57  * d) If we have been asked to terminate
58  */
59 #define CAN_SEND_REQUEST(__p)                                           \
60         ((__p->status->submitted - __p->status->received < __p->iodepth) && \
61         (__p->status->submitted < __p->status->max) &&                  \
62         (GET_FLAG(PING, __p->flags) == PING_MODE_OFF) &&                \
63          !isTerminate())
64
65 #define CAN_VERIFY(__p)                                                 \
66         ((GET_FLAG(VERIFY, __p->flags) != VERIFY_NO) && __p->op == X_READ)
67
68 #define CAN_PRINT_PROGRESS(__p, __q)                                    \
69         ((GET_FLAG(PROGRESS, __p->flags) == PROGRESS_YES) &&            \
70         (__p->status->received == __q))
71
72 void custom_peer_usage()
73 {
74         fprintf(stderr, "Custom peer options: \n"
75                         "  --------------------------------------------\n"
76                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
77                         "    --pattern | None    | I/O pattern [seq|rand]\n"
78                         "    --verify  | no      | Verify written requests [no|meta|full]\n"
79                         "    -rc       | None    | Request cap\n"
80                         "    -to       | None    | Total objects\n"
81                         "    -ts       | None    | Total I/O size\n"
82                         "    -os       | 4M      | Object size\n"
83                         "    -bs       | 4k      | Block size\n"
84                         "    -tp       | None    | Target port\n"
85                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
86                         "    --seed    | None    | Initialize LFSR and target names\n"
87                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
88                         "              |         |     [sane|eccentric|manic|paranoid]\n"
89                         "    --progress| yes     | Show progress of requests\n"
90                         "    --ping    | yes     | Ping target before starting benchmark\n"
91                         "\n"
92                         "Additional information:\n"
93                         "  --------------------------------------------\n"
94                         "  The -to and -ts options are mutually exclusive\n"
95                         "\n");
96 }
97
98 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
99 {
100         struct bench *prefs;
101         char request_cap[MAX_ARG_LEN + 1];
102         char total_objects[MAX_ARG_LEN + 1];
103         char total_size[MAX_ARG_LEN + 1];
104         char object_size[MAX_ARG_LEN + 1];
105         char block_size[MAX_ARG_LEN + 1];
106         char op[MAX_ARG_LEN + 1];
107         char pattern[MAX_ARG_LEN + 1];
108         char insanity[MAX_ARG_LEN + 1];
109         char verify[MAX_ARG_LEN + 1];
110         char progress[MAX_ARG_LEN + 1];
111         char ping[MAX_ARG_LEN + 1];
112         struct xseg *xseg = peer->xseg;
113         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
114         long iodepth = -1;
115         long dst_port = -1;
116         unsigned long seed = -1;
117         uint64_t rc;
118         struct timespec timer_seed;
119         struct timespec *ts;
120         int set_by_hand = 0;
121         int i, r;
122
123         op[0] = 0;
124         pattern[0] = 0;
125         total_objects[0] = 0;
126         total_size[0] = 0;
127         block_size[0] = 0;
128         object_size[0] = 0;
129         insanity[0] = 0;
130         verify[0] = 0;
131         request_cap[0] = 0;
132         progress[0] = 0;
133         ping[0] = 0;
134
135         prefs = malloc(sizeof(struct bench));
136         if (!prefs) {
137                 perror("malloc");
138                 goto prefs_fail;
139         }
140         memset(prefs, 0, sizeof(struct bench));
141
142         prefs->status = malloc(sizeof(struct req_status));
143         if (!prefs->status) {
144                 perror("malloc");
145                 goto status_fail;
146         }
147         memset(prefs->status, 0, sizeof(struct req_status));
148
149         for (i = 0; i < peer->nr_ops; i++) {
150                 ts = malloc(sizeof(struct timespec));
151                 if (!ts) {
152                         perror("malloc");
153                         goto priv_fail;
154                 }
155                 peer->peer_reqs[i].priv = ts;
156         }
157
158         //Begin reading the benchmark-specific arguments
159         BEGIN_READ_ARGS(argc, argv);
160         READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
161         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
162         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
163         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
164         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
165         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
166         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
167         READ_ARG_ULONG("--iodepth", iodepth);
168         READ_ARG_ULONG("-tp", dst_port);
169         READ_ARG_ULONG("--seed", seed);
170         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
171         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
172         READ_ARG_STRING("--progress", progress, MAX_ARG_LEN);
173         READ_ARG_STRING("--ping", ping, MAX_ARG_LEN);
174         END_READ_ARGS();
175
176         /*****************************\
177          * Check I/O type parameters *
178         \*****************************/
179
180         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
181         //The I/O pattern of these operations can be either sequential (seq) or
182         //random (rand)
183         if (!op[0]) {
184                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
185                 goto arg_fail;
186         }
187         r = read_op(op);
188         if (r < 0) {
189                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
190                 goto arg_fail;
191         }
192         prefs->op = r;
193
194         if (!pattern[0]) {
195                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
196                 goto arg_fail;
197         }
198         r = read_pattern(pattern);
199         if (r < 0) {
200                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
201                 goto arg_fail;
202         }
203         SET_FLAG(PATTERN, prefs->flags, r);
204
205         if (!verify[0])
206                 strcpy(verify, "no");
207         r = read_verify(verify);
208         if (r < 0) {
209                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
210                 goto arg_fail;
211         }
212         SET_FLAG(VERIFY, prefs->flags, r);
213
214         //Default iodepth value is 1
215         if (iodepth < 0)
216                 prefs->iodepth = 1;
217         else
218                 prefs->iodepth = iodepth;
219
220         /**************************\
221          * Check timer parameters *
222         \**************************/
223
224         //Most of the times, not all timers need to be used.
225         //We can choose which timers will be used by adjusting the "insanity"
226         //level of the benchmark i.e. the obscurity of code paths (get request,
227         //submit request) that will be timed.
228         if (!insanity[0])
229                 strcpy(insanity, "sane");
230
231         r = read_insanity(insanity);
232         if (r < 0) {
233                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
234                 goto arg_fail;
235         }
236         SET_FLAG(INSANITY, prefs->flags, r);
237
238         /*****************************\
239          * Check I/O size parameters *
240         \*****************************/
241
242         //Block size (bs): Defaults to 4K.
243         //It must be a number followed by one of these characters:
244         //                                              [k|K|m|M|g|G]
245         //If not, it will be considered as size in bytes.
246         //Must be integer multiple of segment's page size (typically 4k).
247         if (!block_size[0])
248                 strcpy(block_size,"4k");
249
250         prefs->bs = str2num(block_size);
251         if (!prefs->bs) {
252                 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
253                 goto arg_fail;
254         } else if (prefs->bs % xseg_page_size) {
255                 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
256                 goto arg_fail;
257         }
258
259         //Total objects (to) or total I/O size (ts).
260         //Must have the same format as "block size"
261         //They are mutually exclusive
262         if (total_objects[0] && total_size[0]) {
263                 XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
264                 goto arg_fail;
265         } else if (total_objects[0]) {
266                 prefs->to = str2num(total_objects);
267                 if (!prefs->to) {
268                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
269                         goto arg_fail;
270                 }
271                 //In this case, the maximum number of requests is the total number of
272                 //objects we will handle
273                 prefs->status->max = prefs->to;
274         } else if (total_size[0]) {
275                 if (prefs->op != X_READ && prefs->op != X_WRITE) {
276                         XSEGLOG2(&lc, E,
277                                         "Total objects must be supplied (required by -op %s)\n", op);
278                         goto arg_fail;
279                 }
280                 prefs->ts = str2num(total_size);
281                 if (!prefs->ts) {
282                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
283                         goto arg_fail;
284                 } else if (prefs->ts % prefs->bs) {
285                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
286                         goto arg_fail;
287                 }
288                 //In this case, the maximum number of requests is the number of blocks
289                 //we need to cover the total I/O size
290                 prefs->status->max = prefs->ts / prefs->bs;
291         } else {
292                 XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
293                 goto arg_fail;
294         }
295
296         //Object size (os): Defaults to 4M.
297         //Must have the same format as "block size"
298         //Must be integer multiple of "block size"
299         if (!object_size[0])
300                 strcpy(object_size,"4M");
301
302         prefs->os = str2num(object_size);
303         if (!prefs->os) {
304                 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
305                 goto arg_fail;
306         } else if (prefs->os % prefs->bs) {
307                 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
308                 goto arg_fail;
309         }
310
311
312         /*************************\
313          * Check port parameters *
314         \*************************/
315
316         if (dst_port < 0){
317                 XSEGLOG2(&lc, E, "Target port must be supplied\n");
318                 goto arg_fail;
319         }
320
321         prefs->src_port = peer->portno_start; //TODO: allow user to change this
322         prefs->dst_port = (xport) dst_port;
323
324         /*********************************\
325          * Create timers for all metrics *
326         \*********************************/
327
328         if (init_timer(&prefs->total_tm, INSANITY_SANE))
329                 goto tm_fail;
330         if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
331                 goto tm_fail;
332         if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
333                 goto tm_fail;
334         if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
335                 goto tm_fail;
336
337         /*************************************\
338          * Initialize the LFSR and global_id *
339         \*************************************/
340 reseed:
341         //We proceed to initialise the global_id, and seed variables.
342         if (seed == -1) {
343                 clock_gettime(CLOCK_BENCH, &timer_seed);
344                 seed = timer_seed.tv_nsec;
345         } else {
346                 set_by_hand = 1;
347         }
348         create_id(seed);
349
350         if (prefs->status->max == 1)
351                 SET_FLAG(PATTERN, prefs->flags, PATTERN_SEQ);
352
353         if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
354                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
355                 if (!prefs->lfsr) {
356                         perror("malloc");
357                         goto lfsr_fail;
358                 }
359
360                 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
361                 if (r && set_by_hand) {
362                         XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
363                         goto lfsr_fail;
364                 } else if (r) {
365                         seed = -1;
366                         goto reseed;
367                 }
368         }
369
370         /*********************************\
371          * Miscellaneous initializations *
372         \*********************************/
373
374         /* The request cap must be enforced only after the LFSR is initialized */
375         if (request_cap[0]) {
376                 rc = str2num(request_cap);
377                 if (!rc) {
378                         XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
379                         goto arg_fail;
380                 } else if (rc > prefs->status->max) {
381                         XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
382                         goto arg_fail;
383                 }
384                 prefs->status->max = rc;
385         }
386
387         /* Benchmarking progress printing is on by default */
388         if (!progress[0])
389                 strcpy(progress, "yes");
390         r = read_progress(progress);
391         if (r < 0) {
392                 XSEGLOG2(&lc, E, "Invalid syntax: --progress %s\n", progress);
393                 goto arg_fail;
394         }
395         SET_FLAG(PROGRESS, prefs->flags, r);
396
397         /* Pinging the target peer is on by default */
398         if (!ping[0])
399                 strcpy(ping, "yes");
400         r = read_ping(ping);
401         if (r < 0) {
402                 XSEGLOG2(&lc, E, "Invalid syntax: --ping %s\n", ping);
403                 goto arg_fail;
404         }
405         SET_FLAG(PING, prefs->flags, r);
406
407         prefs->peer = peer;
408         peer->peerd_loop = bench_peerd_loop;
409         peer->priv = (void *) prefs;
410         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
411         return 0;
412
413 arg_fail:
414         custom_peer_usage();
415 lfsr_fail:
416         free(prefs->lfsr);
417 tm_fail:
418         free(prefs->total_tm);
419         free(prefs->sub_tm);
420         free(prefs->get_tm);
421         free(prefs->rec_tm);
422 priv_fail:
423         for (; i >= 0; i--) {
424                 free(peer->peer_reqs[i].priv);
425         }
426 status_fail:
427         free(prefs->status);
428 prefs_fail:
429         free(prefs);
430         return -1;
431 }
432
433
434 static int send_request(struct peerd *peer, struct bench *prefs)
435 {
436         struct xseg_request *req;
437         struct xseg *xseg = peer->xseg;
438         struct peer_req *pr;
439         xport srcport = prefs->src_port;
440         xport dstport = prefs->dst_port;
441         xport p;
442
443         int r;
444         uint64_t new;
445         uint64_t size = prefs->bs;
446         struct timespec *ts;
447
448         //srcport and dstport must already be provided by the user.
449         //returns struct xseg_request with basic initializations
450         XSEGLOG2(&lc, D, "Get new request\n");
451         timer_start(prefs, prefs->get_tm);
452         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
453         if (!req) {
454                 XSEGLOG2(&lc, W, "Cannot get request\n");
455                 return -1;
456         }
457         timer_stop(prefs, prefs->get_tm, NULL);
458
459         //Allocate enough space for the data and the target's name
460         XSEGLOG2(&lc, D, "Prepare new request\n");
461         r = xseg_prep_request(xseg, req, TARGETLEN, size);
462         if (r < 0) {
463                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
464                                 TARGETLEN, (unsigned long long)size);
465                 goto put_xseg_request;
466         }
467
468         //Determine what the next target/chunk will be, based on I/O pattern
469         new = determine_next(prefs);
470         req->op = prefs->op;
471         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
472         //Create a target of this format: "bench-<global_id>-<obj_no>"
473         create_target(prefs, req, new);
474
475         if (prefs->op == X_WRITE || prefs->op == X_READ) {
476                 req->size = size;
477                 //Calculate the chunk's offset inside the object
478                 req->offset = calculate_offset(prefs, new);
479                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
480
481                 if (prefs->op == X_WRITE)
482                         create_chunk(prefs, req, new);
483         }
484
485         XSEGLOG2(&lc, D, "Allocate peer request\n");
486         pr = alloc_peer_req(peer);
487         if (!pr) {
488                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
489                                 peer->nr_ops - xq_count(&peer->free_reqs));
490                 goto put_xseg_request;
491         }
492         pr->peer = peer;
493         pr->portno = srcport;
494         pr->req = req;
495
496         //XSEGLOG2(&lc, D, "Set request data\n");
497         r = xseg_set_req_data(xseg, req, pr);
498         if (r < 0) {
499                 XSEGLOG2(&lc, W, "Cannot set request data\n");
500                 goto put_peer_request;
501         }
502
503         /*
504          * Start measuring receive time.
505          * When we receive a request, we need to have its submission time to
506          * measure elapsed time. Thus, we copy its submission time to pr->priv.
507          * QUESTION: Is this the fastest way?
508          */
509         timer_start(prefs, prefs->rec_tm);
510         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
511                 ts = (struct timespec *)pr->priv;
512                 ts->tv_sec = prefs->rec_tm->start_time.tv_sec;
513                 ts->tv_nsec = prefs->rec_tm->start_time.tv_nsec;
514         }
515
516         //Submit the request from the source port to the target port
517         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
518         timer_start(prefs, prefs->sub_tm);
519         p = xseg_submit(xseg, req, srcport, X_ALLOC);
520         if (p == NoPort) {
521                 XSEGLOG2(&lc, W, "Cannot submit request\n");
522                 goto put_peer_request;
523         }
524         prefs->status->submitted++;
525         timer_stop(prefs, prefs->sub_tm, NULL);
526
527         //Send SIGIO to the process that has bound this port to inform that
528         //IO is possible
529         r = xseg_signal(xseg, p);
530         //if (r < 0)
531         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
532
533         return 0;
534
535 put_peer_request:
536         free_peer_req(peer, pr);
537 put_xseg_request:
538         if (xseg_put_request(xseg, req, srcport))
539                 XSEGLOG2(&lc, W, "Cannot put request\n");
540         return -1;
541 }
542
543 static int send_ping_request(struct peerd *peer, struct bench *prefs)
544 {
545         struct xseg_request *req;
546         struct xseg *xseg = peer->xseg;
547         struct peer_req *pr;
548         xport srcport = prefs->src_port;
549         xport dstport = prefs->dst_port;
550         xport p;
551         int r;
552
553         XSEGLOG2(&lc, I, "Sending ping request...");
554         //srcport and dstport must already be provided by the user.
555         //returns struct xseg_request with basic initializations
556         XSEGLOG2(&lc, D, "Get new request\n");
557         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
558         if (!req) {
559                 XSEGLOG2(&lc, W, "Cannot get request\n");
560                 return -1;
561         }
562         req->op = X_PING;
563
564         XSEGLOG2(&lc, D, "Allocate peer request\n");
565         pr = alloc_peer_req(peer);
566         if (!pr) {
567                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
568                                 peer->nr_ops - xq_count(&peer->free_reqs));
569                 goto put_xseg_request;
570         }
571         pr->peer = peer;
572         pr->portno = srcport;
573         pr->req = req;
574
575         r = xseg_set_req_data(xseg, req, pr);
576         if (r < 0) {
577                 XSEGLOG2(&lc, W, "Cannot set request data\n");
578                 goto put_peer_request;
579         }
580
581         //Submit the request from the source port to the target port
582         XSEGLOG2(&lc, D, "Submit ping request");
583         p = xseg_submit(xseg, req, srcport, X_ALLOC);
584         if (p == NoPort) {
585                 XSEGLOG2(&lc, W, "Cannot submit request\n");
586                 goto put_peer_request;
587         }
588         timer_stop(prefs, prefs->sub_tm, NULL);
589
590         //Send SIGIO to the process that has bound this port to inform that
591         //IO is possible
592         r = xseg_signal(xseg, p);
593         //if (r < 0)
594         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
595
596         return 0;
597
598 put_peer_request:
599         free_peer_req(peer, pr);
600 put_xseg_request:
601         if (xseg_put_request(xseg, req, srcport))
602                 XSEGLOG2(&lc, W, "Cannot put request\n");
603         return -1;
604 }
605
606 /*
607  * This function substitutes the default generic_peerd_loop of peer.c.
608  * It's plugged to struct peerd at custom peer's initialisation
609  */
610 int bench_peerd_loop(void *arg)
611 {
612 #ifdef MT
613         struct thread *t = (struct thread *) arg;
614         struct peerd *peer = t->peer;
615         char *id = t->arg;
616 #else
617         struct peerd *peer = (struct peerd *) arg;
618         char id[4] = {'P','e','e','r'};
619 #endif
620         struct xseg *xseg = peer->xseg;
621         struct bench *prefs = peer->priv;
622         xport portno_start = peer->portno_start;
623         xport portno_end = peer->portno_end;
624         pid_t pid = syscall(SYS_gettid);
625         uint64_t threshold=1000/(1 + portno_end - portno_start);
626         uint64_t cached_prog_quantum = 0;
627         uint64_t prog_quantum = 0;
628         int r;
629         uint64_t loops;
630
631         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES) {
632                 prog_quantum = calculate_prog_quantum(prefs);
633                 cached_prog_quantum = prog_quantum;
634                 print_stats(prefs);
635         }
636
637         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
638         xseg_init_local_signal(xseg, peer->portno_start);
639
640         /* If no ping is going to be sent, we can begin the benchmark now. */
641         if (GET_FLAG(PING, prefs->flags) == PING_MODE_OFF)
642                 timer_start(prefs, prefs->total_tm);
643         else
644                 send_ping_request(peer, prefs);
645
646 send_request:
647         while (!(isTerminate() && all_peer_reqs_free(peer))) {
648                 while (CAN_SEND_REQUEST(prefs)) {
649                         xseg_cancel_wait(xseg, peer->portno_start);
650                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
651                                         prefs->status->submitted - prefs->status->received,
652                                         prefs->iodepth, prefs->status->received,
653                                         prefs->status->max);
654                         XSEGLOG2(&lc, D, "Start sending new request\n");
655                         r = send_request(peer, prefs);
656                         if (r < 0)
657                                 break;
658                 }
659                 //Heart of peerd_loop. This loop is common for everyone.
660                 for (loops = threshold; loops > 0; loops--) {
661                         if (loops == 1)
662                                 xseg_prepare_wait(xseg, peer->portno_start);
663
664                         if (UNLIKELY(CAN_PRINT_PROGRESS(prefs, prog_quantum))) {
665                                 prog_quantum += cached_prog_quantum;
666                                 print_progress(prefs);
667                         }
668
669                         if (check_ports(peer)) {
670                                 //If an old request has just been acked, the most sensible
671                                 //thing to do is to immediately send a new one
672                                 if (prefs->status->received < prefs->status->max)
673                                         goto send_request;
674                                 else
675                                         return 0;
676                         }
677                 }
678                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
679                 //struct xq *q;
680                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
681                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
682                 //              id, xq_count(q));
683                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
684                 xseg_wait_signal(xseg, 10000000UL);
685                 xseg_cancel_wait(xseg, peer->portno_start);
686                 XSEGLOG2(&lc, I, "%s woke up\n", id);
687         }
688
689         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
690                         xq_count(&peer->free_reqs), peer->nr_ops);
691         return 0;
692 }
693
694 void custom_peer_finalize(struct peerd *peer)
695 {
696         struct bench *prefs = peer->priv;
697         //TODO: Measure mean time, standard variation
698
699         if (!prefs->total_tm->completed)
700                 timer_stop(prefs, prefs->total_tm, NULL);
701
702         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES)
703                 print_progress(prefs);
704         else
705                 print_stats(prefs);
706
707         print_remaining(prefs);
708         print_res(prefs);
709         return;
710 }
711
712 /*
713  * handle_received: +1 to our received requests.
714  * Do some sanity checks and then check if request is failed.
715  * If not try to verify the request if asked.
716  */
717 static void handle_received(struct peerd *peer, struct peer_req *pr)
718 {
719         //FIXME: handle null pointer
720         struct bench *prefs = peer->priv;
721         struct timer *rec = prefs->rec_tm;
722         int start_timer = 0;
723
724         if (!pr->req) {
725                 //This is a serious error, so we must stop
726                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
727                 terminated++;
728                 return;
729         }
730
731         /*
732          * If we were in ping mode, we can now switch off and start the
733          * benchmark.
734          */
735         if (GET_FLAG(PING, prefs->flags) == PING_MODE_ON) {
736                 XSEGLOG2(&lc, I, "Ping received. Benchmark can start now.");
737                 SET_FLAG(PING, prefs->flags, PING_MODE_OFF);
738                 start_timer = 1;
739                 goto out;
740         }
741
742         prefs->status->received++;
743
744         if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
745                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
746                 return;
747         }
748
749         timer_stop(prefs, rec, (struct timespec *)pr->priv);
750
751         if (!(pr->req->state & XS_SERVED))
752                 prefs->status->failed++;
753         else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
754                 prefs->status->corrupted++;
755
756 out:
757         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
758                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
759
760         free_peer_req(peer, pr);
761
762         if (start_timer)
763                 timer_start(prefs, prefs->total_tm);
764 }
765
766 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
767                 enum dispatch_reason reason)
768 {
769         switch (reason) {
770                 case dispatch_accept:
771                         //This is wrong, benchmarking peer should not accept requests,
772                         //only receive them.
773                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
774                         complete(peer, pr);
775                         break;
776                 case dispatch_receive:
777                         handle_received(peer, pr);
778                         break;
779                 default:
780                         fail(peer, pr);
781         }
782         return 0;
783 }