bench: Fix target to not include null termination.
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN + 1];
52 /*
53  * This macro checks two things:
54  * a) If in-flight requests are less than given iodepth
55  * b) If we have submitted all of the requests
56  */
57 #define CAN_SEND_REQUEST(__p)                                                                                   \
58         ((__p->status->submitted - __p->status->received < __p->iodepth) && \
59         (__p->status->submitted < __p->status->max) &&                                          \
60         !isTerminate())
61
62 #define CAN_VERIFY(__p)                                                                                                 \
63         ((GET_FLAG(VERIFY, __p->flags) != VERIFY_NO) && __p->op == X_READ)
64
65 #define CAN_PRINT_PROGRESS(__p, __q)                                                                    \
66         ((GET_FLAG(PROGRESS, __p->flags) == PROGRESS_YES) &&                            \
67         (__p->status->received == __q))
68
69 void custom_peer_usage()
70 {
71         fprintf(stderr, "Custom peer options: \n"
72                         "  --------------------------------------------\n"
73                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
74                         "    --pattern | None    | I/O pattern [seq|rand]\n"
75                         "    --verify  | no      | Verify written requests [no|meta|full]\n"
76                         "    -rc       | None    | Request cap\n"
77                         "    -to       | None    | Total objects\n"
78                         "    -ts       | None    | Total I/O size\n"
79                         "    -os       | 4M      | Object size\n"
80                         "    -bs       | 4k      | Block size\n"
81                         "    -tp       | None    | Target port\n"
82                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
83                         "    --seed    | None    | Initialize LFSR and target names\n"
84                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
85                         "              |         |     [sane|eccentric|manic|paranoid]\n"
86                         "    --progress| yes     | Show progress of requests\n"
87                         "\n"
88                         "Additional information:\n"
89                         "  --------------------------------------------\n"
90                         "  The -to and -ts options are mutually exclusive\n"
91                         "\n");
92 }
93
94 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
95 {
96         struct bench *prefs;
97         char request_cap[MAX_ARG_LEN + 1];
98         char total_objects[MAX_ARG_LEN + 1];
99         char total_size[MAX_ARG_LEN + 1];
100         char object_size[MAX_ARG_LEN + 1];
101         char block_size[MAX_ARG_LEN + 1];
102         char op[MAX_ARG_LEN + 1];
103         char pattern[MAX_ARG_LEN + 1];
104         char insanity[MAX_ARG_LEN + 1];
105         char verify[MAX_ARG_LEN + 1];
106         char progress[MAX_ARG_LEN + 1];
107         struct xseg *xseg = peer->xseg;
108         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
109         long iodepth = -1;
110         long dst_port = -1;
111         unsigned long seed = -1;
112         uint64_t rc;
113         struct timespec timer_seed;
114         int set_by_hand = 0;
115         int r;
116
117         op[0] = 0;
118         pattern[0] = 0;
119         total_objects[0] = 0;
120         total_size[0] = 0;
121         block_size[0] = 0;
122         object_size[0] = 0;
123         insanity[0] = 0;
124         verify[0] = 0;
125         request_cap[0] = 0;
126         progress[0] = 0;
127
128 #ifdef MT
129         for (i = 0; i < nr_threads; i++) {
130                 prefs = peer->thread[i]->priv;
131                 prefs = malloc(sizeof(struct bench));
132                 if (!prefs) {
133                         perror("malloc");
134                         return -1;
135                 }
136         }
137 #endif
138         prefs = malloc(sizeof(struct bench));
139         if (!prefs) {
140                 perror("malloc");
141                 return -1;
142         }
143         memset(prefs, 0, sizeof(struct bench));
144
145         prefs->status = malloc(sizeof(struct req_status));
146         if (!prefs->status) {
147                 perror("malloc");
148                 return -1;
149         }
150
151         memset(prefs->status, 0, sizeof(struct req_status));
152
153         //Begin reading the benchmark-specific arguments
154         BEGIN_READ_ARGS(argc, argv);
155         READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
156         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
157         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
158         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
159         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
160         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
161         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
162         READ_ARG_ULONG("--iodepth", iodepth);
163         READ_ARG_ULONG("-tp", dst_port);
164         READ_ARG_ULONG("--seed", seed);
165         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
166         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
167         READ_ARG_STRING("--progress", progress, MAX_ARG_LEN);
168         END_READ_ARGS();
169
170         /*****************************\
171          * Check I/O type parameters *
172         \*****************************/
173
174         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
175         //The I/O pattern of these operations can be either sequential (seq) or
176         //random (rand)
177         if (!op[0]) {
178                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
179                 goto arg_fail;
180         }
181         r = read_op(op);
182         if (r < 0) {
183                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
184                 goto arg_fail;
185         }
186         prefs->op = r;
187
188         if (!pattern[0]) {
189                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
190                 goto arg_fail;
191         }
192         r = read_pattern(pattern);
193         if (r < 0) {
194                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
195                 goto arg_fail;
196         }
197         SET_FLAG(PATTERN, prefs->flags, r);
198
199         if (!verify[0])
200                 strcpy(verify, "no");
201         r = read_verify(verify);
202         if (r < 0) {
203                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
204                 goto arg_fail;
205         }
206         SET_FLAG(VERIFY, prefs->flags, r);
207
208         //Default iodepth value is 1
209         if (iodepth < 0)
210                 prefs->iodepth = 1;
211         else
212                 prefs->iodepth = iodepth;
213
214         /**************************\
215          * Check timer parameters *
216         \**************************/
217
218         //Most of the times, not all timers need to be used.
219         //We can choose which timers will be used by adjusting the "insanity"
220         //level of the benchmark i.e. the obscurity of code paths (get request,
221         //submit request) that will be timed.
222         if (!insanity[0])
223                 strcpy(insanity, "sane");
224
225         r = read_insanity(insanity);
226         if (r < 0) {
227                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
228                 goto arg_fail;
229         }
230         SET_FLAG(INSANITY, prefs->flags, r);
231
232         /*****************************\
233          * Check I/O size parameters *
234         \*****************************/
235
236         //Block size (bs): Defaults to 4K.
237         //It must be a number followed by one of these characters:
238         //                                              [k|K|m|M|g|G]
239         //If not, it will be considered as size in bytes.
240         //Must be integer multiple of segment's page size (typically 4k).
241         if (!block_size[0])
242                 strcpy(block_size,"4k");
243
244         prefs->bs = str2num(block_size);
245         if (!prefs->bs) {
246                 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
247                 goto arg_fail;
248         } else if (prefs->bs % xseg_page_size) {
249                 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
250                 goto arg_fail;
251         }
252
253         //Total objects (to) or total I/O size (ts).
254         //Must have the same format as "block size"
255         //They are mutually exclusive
256         if (total_objects[0] && total_size[0]) {
257                 XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
258                 goto arg_fail;
259         } else if (total_objects[0]) {
260                 prefs->to = str2num(total_objects);
261                 if (!prefs->to) {
262                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
263                         goto arg_fail;
264                 }
265                 //In this case, the maximum number of requests is the total number of
266                 //objects we will handle
267                 prefs->status->max = prefs->to;
268         } else if (total_size[0]) {
269                 if (prefs->op != X_READ && prefs->op != X_WRITE) {
270                         XSEGLOG2(&lc, E,
271                                         "Total objects must be supplied (required by -op %s)\n", op);
272                         goto arg_fail;
273                 }
274                 prefs->ts = str2num(total_size);
275                 if (!prefs->ts) {
276                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
277                         goto arg_fail;
278                 } else if (prefs->ts % prefs->bs) {
279                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
280                         goto arg_fail;
281                 }
282                 //In this case, the maximum number of requests is the number of blocks
283                 //we need to cover the total I/O size
284                 prefs->status->max = prefs->ts / prefs->bs;
285         } else {
286                 XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
287                 goto arg_fail;
288         }
289
290         //Object size (os): Defaults to 4M.
291         //Must have the same format as "block size"
292         //Must be integer multiple of "block size"
293         if (!object_size[0])
294                 strcpy(object_size,"4M");
295
296         prefs->os = str2num(object_size);
297         if (!prefs->os) {
298                 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
299                 goto arg_fail;
300         } else if (prefs->os % prefs->bs) {
301                 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
302                 goto arg_fail;
303         }
304
305
306         /*************************\
307          * Check port parameters *
308         \*************************/
309
310         if (dst_port < 0){
311                 XSEGLOG2(&lc, E, "Target port must be supplied\n");
312                 goto arg_fail;
313         }
314
315         prefs->src_port = peer->portno_start; //TODO: allow user to change this
316         prefs->dst_port = (xport) dst_port;
317
318         /*********************************\
319          * Create timers for all metrics *
320         \*********************************/
321
322         if (init_timer(&prefs->total_tm, INSANITY_SANE))
323                 goto tm_fail;
324         if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
325                 goto tm_fail;
326         if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
327                 goto tm_fail;
328         if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
329                 goto tm_fail;
330
331         /*************************************\
332          * Initialize the LFSR and global_id *
333         \*************************************/
334 reseed:
335         //We proceed to initialise the global_id, and seed variables.
336         if (seed == -1) {
337                 clock_gettime(CLOCK_BENCH, &timer_seed);
338                 seed = timer_seed.tv_nsec;
339         } else {
340                 set_by_hand = 1;
341         }
342         create_id(seed);
343
344         if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
345                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
346                 if (!prefs->lfsr) {
347                         perror("malloc");
348                         goto lfsr_fail;
349                 }
350
351                 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
352                 if (r && set_by_hand) {
353                         XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
354                         goto lfsr_fail;
355                 } else if (r) {
356                         seed = -1;
357                         goto reseed;
358                 }
359         }
360
361         /*********************************\
362          * Miscellaneous initializations *
363         \*********************************/
364
365         /* The request cap must be enforced only after the LFSR is initialized */
366         if (request_cap[0]) {
367                 rc = str2num(request_cap);
368                 if (!rc) {
369                         XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
370                         goto arg_fail;
371                 } else if (rc > prefs->status->max) {
372                         XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
373                         goto arg_fail;
374                 }
375                 prefs->status->max = rc;
376         }
377
378         /* Benchmarking progress printing is on by default */
379         if (!progress[0])
380                 strcpy(progress, "yes");
381         r = read_progress(progress);
382         if (r < 0) {
383                 XSEGLOG2(&lc, E, "Invalid syntax: --progress %s\n", progress);
384                 goto arg_fail;
385         }
386         SET_FLAG(PROGRESS, prefs->flags, r);
387
388         prefs->peer = peer;
389         peer->peerd_loop = bench_peerd_loop;
390         peer->priv = (void *) prefs;
391         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
392         return 0;
393
394 arg_fail:
395         custom_peer_usage();
396 lfsr_fail:
397         free(prefs->lfsr);
398 tm_fail:
399         free(prefs->total_tm);
400         free(prefs->sub_tm);
401         free(prefs->get_tm);
402         free(prefs->rec_tm);
403         free(prefs);
404         return -1;
405 }
406
407
408 static int send_request(struct peerd *peer, struct bench *prefs)
409 {
410         struct xseg_request *req;
411         struct xseg *xseg = peer->xseg;
412         struct peer_req *pr;
413         xport srcport = prefs->src_port;
414         xport dstport = prefs->dst_port;
415         xport p;
416
417         int r;
418         uint64_t new;
419         uint64_t size = prefs->bs;
420
421         //srcport and dstport must already be provided by the user.
422         //returns struct xseg_request with basic initializations
423         XSEGLOG2(&lc, D, "Get new request\n");
424         timer_start(prefs, prefs->get_tm);
425         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
426         if (!req) {
427                 XSEGLOG2(&lc, W, "Cannot get request\n");
428                 return -1;
429         }
430         timer_stop(prefs, prefs->get_tm, NULL);
431
432         //Allocate enough space for the data and the target's name
433         XSEGLOG2(&lc, D, "Prepare new request\n");
434         r = xseg_prep_request(xseg, req, TARGETLEN, size);
435         if (r < 0) {
436                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
437                                 TARGETLEN, (unsigned long long)size);
438                 goto put_xseg_request;
439         }
440
441         //Determine what the next target/chunk will be, based on I/O pattern
442         new = determine_next(prefs);
443         req->op = prefs->op;
444         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
445         //Create a target of this format: "bench-<global_id>-<obj_no>"
446         create_target(prefs, req, new);
447
448         if (prefs->op == X_WRITE || prefs->op == X_READ) {
449                 req->size = size;
450                 //Calculate the chunk's offset inside the object
451                 req->offset = calculate_offset(prefs, new);
452                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
453
454                 if (prefs->op == X_WRITE)
455                         create_chunk(prefs, req, new);
456         }
457
458
459         //Measure this?
460         XSEGLOG2(&lc, D, "Allocate peer request\n");
461         pr = alloc_peer_req(peer);
462         if (!pr) {
463                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
464                                 peer->nr_ops - xq_count(&peer->free_reqs));
465                 goto put_xseg_request;
466         }
467         pr->peer = peer;
468         pr->portno = srcport;
469         pr->req = req;
470         pr->priv = malloc(sizeof(struct timespec));
471         if (!pr->priv) {
472                 perror("malloc");
473                 goto put_peer_request;
474         }
475
476         //XSEGLOG2(&lc, D, "Set request data\n");
477         r = xseg_set_req_data(xseg, req, pr);
478         if (r < 0) {
479                 XSEGLOG2(&lc, W, "Cannot set request data\n");
480                 goto put_peer_request;
481         }
482
483         /*
484          * Start measuring receive time.
485          * When we receive a request, we need to have its submission time to
486          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
487          * QUESTION: Is this the fastest way?
488          */
489         timer_start(prefs, prefs->rec_tm);
490         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags))
491                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
492
493         //Submit the request from the source port to the target port
494         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
495         timer_start(prefs, prefs->sub_tm);
496         p = xseg_submit(xseg, req, srcport, X_ALLOC);
497         if (p == NoPort) {
498                 XSEGLOG2(&lc, W, "Cannot submit request\n");
499                 goto put_peer_request;
500         }
501         prefs->status->submitted++;
502         timer_stop(prefs, prefs->sub_tm, NULL);
503
504         //Send SIGIO to the process that has bound this port to inform that
505         //IO is possible
506         r = xseg_signal(xseg, p);
507         //if (r < 0)
508         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
509
510         return 0;
511
512 put_peer_request:
513         free(pr->priv);
514         free_peer_req(peer, pr);
515 put_xseg_request:
516         if (xseg_put_request(xseg, req, srcport))
517                 XSEGLOG2(&lc, W, "Cannot put request\n");
518         return -1;
519 }
520
521 /*
522  * This function substitutes the default generic_peerd_loop of peer.c.
523  * It's plugged to struct peerd at custom peer's initialisation
524  */
525 int bench_peerd_loop(void *arg)
526 {
527 #ifdef MT
528         struct thread *t = (struct thread *) arg;
529         struct peerd *peer = t->peer;
530         char *id = t->arg;
531 #else
532         struct peerd *peer = (struct peerd *) arg;
533         char id[4] = {'P','e','e','r'};
534 #endif
535         struct xseg *xseg = peer->xseg;
536         struct bench *prefs = peer->priv;
537         xport portno_start = peer->portno_start;
538         xport portno_end = peer->portno_end;
539         pid_t pid = syscall(SYS_gettid);
540         uint64_t threshold=1000/(1 + portno_end - portno_start);
541         uint64_t cached_prog_quantum = 0;
542         uint64_t prog_quantum = 0;
543         int r;
544         uint64_t loops;
545
546         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES) {
547                 prog_quantum = calculate_prog_quantum(prefs);
548                 cached_prog_quantum = prog_quantum;
549                 print_stats(prefs);
550         }
551
552         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
553         xseg_init_local_signal(xseg, peer->portno_start);
554
555         timer_start(prefs, prefs->total_tm);
556 send_request:
557         while (!(isTerminate() && all_peer_reqs_free(peer))) {
558                 while (CAN_SEND_REQUEST(prefs)) {
559                         xseg_cancel_wait(xseg, peer->portno_start);
560                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
561                                         prefs->status->submitted - prefs->status->received,
562                                         prefs->iodepth, prefs->status->received,
563                                         prefs->status->max);
564                         XSEGLOG2(&lc, D, "Start sending new request\n");
565                         r = send_request(peer, prefs);
566                         if (r < 0)
567                                 break;
568                 }
569                 //Heart of peerd_loop. This loop is common for everyone.
570                 for (loops = threshold; loops > 0; loops--) {
571                         if (loops == 1)
572                                 xseg_prepare_wait(xseg, peer->portno_start);
573
574                         if (UNLIKELY(CAN_PRINT_PROGRESS(prefs, prog_quantum))) {
575                                 prog_quantum += cached_prog_quantum;
576                                 print_progress(prefs);
577                         }
578
579                         if (check_ports(peer)) {
580                                 //If an old request has just been acked, the most sensible
581                                 //thing to do is to immediately send a new one
582                                 if (prefs->status->received < prefs->status->max)
583                                         goto send_request;
584                                 else
585                                         return 0;
586                         }
587                 }
588                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
589                 //struct xq *q;
590                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
591                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
592                 //              id, xq_count(q));
593                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
594                 xseg_wait_signal(xseg, 10000000UL);
595                 xseg_cancel_wait(xseg, peer->portno_start);
596                 XSEGLOG2(&lc, I, "%s woke up\n", id);
597         }
598
599         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
600                         xq_count(&peer->free_reqs), peer->nr_ops);
601         return 0;
602 }
603
604 void custom_peer_finalize(struct peerd *peer)
605 {
606         struct bench *prefs = peer->priv;
607         //TODO: Measure mean time, standard variation
608
609         if (!prefs->total_tm->completed)
610                 timer_stop(prefs, prefs->total_tm, NULL);
611
612         if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES)
613                 print_progress(prefs);
614         else
615                 print_stats(prefs);
616
617         print_remaining(prefs);
618         print_res(prefs, prefs->total_tm, "Total Requests");
619         return;
620 }
621
622 /*
623  * handle_received: +1 to our received requests.
624  * Do some sanity checks and then check if request is failed.
625  * If not try to verify the request if asked.
626  */
627 static void handle_received(struct peerd *peer, struct peer_req *pr)
628 {
629         //FIXME: handle null pointer
630         struct bench *prefs = peer->priv;
631         struct timer *rec = prefs->rec_tm;
632
633         prefs->status->received++;
634         if (!pr->req) {
635                 //This is a serious error, so we must stop
636                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
637                 terminated++;
638                 return;
639         }
640
641         if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
642                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
643                 return;
644         }
645
646         timer_stop(prefs, rec, pr->priv);
647
648         if (!(pr->req->state & XS_SERVED))
649                 prefs->status->failed++;
650         else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
651                 prefs->status->corrupted++;
652
653         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
654                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
655
656         //QUESTION, can't we just keep the malloced memory for future use?
657         free(pr->priv);
658         free_peer_req(peer, pr);
659 }
660
661 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
662                 enum dispatch_reason reason)
663 {
664         switch (reason) {
665                 case dispatch_accept:
666                         //This is wrong, benchmarking peer should not accept requests,
667                         //only receive them.
668                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
669                         complete(peer, pr);
670                         break;
671                 case dispatch_receive:
672                         handle_received(peer, pr);
673                         break;
674                 default:
675                         fail(peer, pr);
676         }
677         return 0;
678 }