332bd3f57560aaccd90328dd9b40f45896b414fd
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN];
52 /*
53  * This macro checks two things:
54  * a) If in-flight requests are less than given iodepth
55  * b) If we have submitted all of the requests
56  */
57 #define CAN_SEND_REQUEST(prefs)                                                                                           \
58         ((prefs->status->submitted - prefs->status->received < prefs->iodepth) && \
59         (prefs->status->submitted < prefs->status->max))
60
61 #define CAN_VERIFY(prefs)                                                                                                         \
62         ((GET_FLAG(VERIFY, prefs->flags) != VERIFY_NO) && prefs->op == X_READ)
63
64 void custom_peer_usage()
65 {
66         fprintf(stderr, "Custom peer options: \n"
67                         "  --------------------------------------------\n"
68                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
69                         "    --pattern | None    | I/O pattern [seq|rand]\n"
70                         "    --verify  | no      | Verify written requests [no|meta|full]\n"
71                         "    -rc       | None    | Request cap\n"
72                         "    -to       | None    | Total objects\n"
73                         "    -ts       | None    | Total I/O size\n"
74                         "    -os       | 4M      | Object size\n"
75                         "    -bs       | 4k      | Block size\n"
76                         "    -tp       | None    | Target port\n"
77                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
78                         "    --seed    | None    | Initialize LFSR and target names\n"
79                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
80                         "              |         |     [sane|eccentric|manic|paranoid]\n"
81                         "\n"
82                         "Additional information:\n"
83                         "  --------------------------------------------\n"
84                         "  The -to and -ts options are mutually exclusive\n"
85                         "\n");
86 }
87
88 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
89 {
90         struct bench *prefs;
91         char request_cap[MAX_ARG_LEN + 1];
92         char total_objects[MAX_ARG_LEN + 1];
93         char total_size[MAX_ARG_LEN + 1];
94         char object_size[MAX_ARG_LEN + 1];
95         char block_size[MAX_ARG_LEN + 1];
96         char op[MAX_ARG_LEN + 1];
97         char pattern[MAX_ARG_LEN + 1];
98         char insanity[MAX_ARG_LEN + 1];
99         char verify[MAX_ARG_LEN + 1];
100         struct xseg *xseg = peer->xseg;
101         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
102         long iodepth = -1;
103         long dst_port = -1;
104         unsigned long seed = -1;
105         uint64_t rc;
106         struct timespec timer_seed;
107         int set_by_hand = 0;
108         int r;
109
110         op[0] = 0;
111         pattern[0] = 0;
112         total_objects[0] = 0;
113         total_size[0] = 0;
114         block_size[0] = 0;
115         object_size[0] = 0;
116         insanity[0] = 0;
117         verify[0] = 0;
118         request_cap[0] = 0;
119
120 #ifdef MT
121         for (i = 0; i < nr_threads; i++) {
122                 prefs = peer->thread[i]->priv;
123                 prefs = malloc(sizeof(struct bench));
124                 if (!prefs) {
125                         perror("malloc");
126                         return -1;
127                 }
128         }
129 #endif
130         prefs = malloc(sizeof(struct bench));
131         if (!prefs) {
132                 perror("malloc");
133                 return -1;
134         }
135         prefs->flags = 0;
136
137         prefs->status = malloc(sizeof(struct req_status));
138         if (!prefs->status) {
139                 perror("malloc");
140                 return -1;
141         }
142         memset(prefs->status, 0, sizeof(struct req_status));
143
144         //Begin reading the benchmark-specific arguments
145         BEGIN_READ_ARGS(argc, argv);
146         READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
147         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
148         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
149         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
150         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
151         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
152         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
153         READ_ARG_ULONG("--iodepth", iodepth);
154         READ_ARG_ULONG("-tp", dst_port);
155         READ_ARG_ULONG("--seed", seed);
156         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
157         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
158         END_READ_ARGS();
159
160         /*****************************\
161          * Check I/O type parameters *
162         \*****************************/
163
164         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
165         //The I/O pattern of these operations can be either sequential (seq) or
166         //random (rand)
167         if (!op[0]) {
168                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
169                 goto arg_fail;
170         }
171         r = read_op(op);
172         if (r < 0) {
173                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
174                 goto arg_fail;
175         }
176         prefs->op = r;
177
178         if (!pattern[0]) {
179                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
180                 goto arg_fail;
181         }
182         r = read_pattern(pattern);
183         if (r < 0) {
184                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
185                 goto arg_fail;
186         }
187         SET_FLAG(PATTERN, prefs->flags, r);
188
189         if (!verify[0])
190                 strcpy(verify, "no");
191         r = read_verify(verify);
192         if (r < 0) {
193                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
194                 goto arg_fail;
195         }
196         SET_FLAG(VERIFY, prefs->flags, r);
197
198         //Default iodepth value is 1
199         if (iodepth < 0)
200                 prefs->iodepth = 1;
201         else
202                 prefs->iodepth = iodepth;
203
204         /**************************\
205          * Check timer parameters *
206         \**************************/
207
208         //Most of the times, not all timers need to be used.
209         //We can choose which timers will be used by adjusting the "insanity"
210         //level of the benchmark i.e. the obscurity of code paths (get request,
211         //submit request) that will be timed.
212         if (!insanity[0])
213                 strcpy(insanity, "sane");
214
215         r = read_insanity(insanity);
216         if (r < 0) {
217                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
218                 goto arg_fail;
219         }
220         SET_FLAG(INSANITY, prefs->flags, r);
221
222         /*****************************\
223          * Check I/O size parameters *
224         \*****************************/
225
226         //Block size (bs): Defaults to 4K.
227         //It must be a number followed by one of these characters:
228         //                                              [k|K|m|M|g|G]
229         //If not, it will be considered as size in bytes.
230         //Must be integer multiple of segment's page size (typically 4k).
231         if (!block_size[0])
232                 strcpy(block_size,"4k");
233
234         prefs->bs = str2num(block_size);
235         if (!prefs->bs) {
236                 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
237                 goto arg_fail;
238         } else if (prefs->bs % xseg_page_size) {
239                 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
240                 goto arg_fail;
241         }
242
243         //Total objects (to) or total I/O size (ts).
244         //Must have the same format as "block size"
245         //They are mutually exclusive
246         if (total_objects[0] && total_size[0]) {
247                 XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
248                 goto arg_fail;
249         } else if (total_objects[0]) {
250                 prefs->to = str2num(total_objects);
251                 if (!prefs->to) {
252                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
253                         goto arg_fail;
254                 }
255                 //In this case, the maximum number of requests is the total number of
256                 //objects we will handle
257                 prefs->status->max = prefs->to;
258         } else if (total_size[0]) {
259                 if (prefs->op != X_READ && prefs->op != X_WRITE) {
260                         XSEGLOG2(&lc, E,
261                                         "Total objects must be supplied (required by op %s)\n", op);
262                         goto arg_fail;
263                 }
264                 prefs->ts = str2num(total_size);
265                 if (!prefs->ts) {
266                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
267                         goto arg_fail;
268                 } else if (prefs->ts % prefs->bs) {
269                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
270                         goto arg_fail;
271                 }
272                 //In this case, the maximum number of requests is the number of blocks
273                 //we need to cover the total I/O size
274                 prefs->status->max = prefs->ts / prefs->bs;
275         } else {
276                 XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
277                 goto arg_fail;
278         }
279
280         //Object size (os): Defaults to 4M.
281         //Must have the same format as "block size"
282         //Must be integer multiple of "block size"
283         if (!object_size[0])
284                 strcpy(object_size,"4M");
285
286         prefs->os = str2num(object_size);
287         if (!prefs->os) {
288                 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
289                 goto arg_fail;
290         } else if (prefs->os % prefs->bs) {
291                 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
292                 goto arg_fail;
293         }
294
295
296         /*************************\
297          * Check port parameters *
298         \*************************/
299
300         if (dst_port < 0){
301                 XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
302                 goto arg_fail;
303         }
304
305         prefs->src_port = peer->portno_start; //TODO: allow user to change this
306         prefs->dst_port = (xport) dst_port;
307
308         /*********************************\
309          * Create timers for all metrics *
310         \*********************************/
311
312         if (init_timer(&prefs->total_tm, INSANITY_SANE))
313                 goto tm_fail;
314         if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
315                 goto tm_fail;
316         if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
317                 goto tm_fail;
318         if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
319                 goto tm_fail;
320
321         /*************************************\
322          * Initialize the LFSR and global_id *
323         \*************************************/
324 reseed:
325         //We proceed to initialise the global_id, and seed variables.
326         if (seed == -1) {
327                 clock_gettime(CLOCK_MONOTONIC_RAW, &timer_seed);
328                 seed = timer_seed.tv_nsec;
329         } else {
330                 set_by_hand = 1;
331         }
332         create_id(seed);
333
334         if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
335                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
336                 if (!prefs->lfsr) {
337                         perror("malloc");
338                         goto lfsr_fail;
339                 }
340
341                 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
342                 if (r && set_by_hand) {
343                         XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
344                         goto lfsr_fail;
345                 } else if (r) {
346                         seed = -1;
347                         goto reseed;
348                 }
349         }
350
351         /****************************\
352          * Finalize initializations *
353         \****************************/
354
355         /* The request cap must be enforced only after the LFSR is initialized */
356         if (request_cap[0]) {
357                 rc = str2num(request_cap);
358                 if (!rc) {
359                         XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
360                         goto arg_fail;
361                 } else if (rc > prefs->status->max) {
362                         XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
363                         goto arg_fail;
364                 }
365                 prefs->status->max = rc;
366         }
367
368         prefs->peer = peer;
369         peer->peerd_loop = bench_peerd_loop;
370         peer->priv = (void *) prefs;
371         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
372         return 0;
373
374 arg_fail:
375         custom_peer_usage();
376 lfsr_fail:
377         free(prefs->lfsr);
378 tm_fail:
379         free(prefs->total_tm);
380         free(prefs->sub_tm);
381         free(prefs->get_tm);
382         free(prefs->rec_tm);
383         free(prefs);
384         return -1;
385 }
386
387
388 static int send_request(struct peerd *peer, struct bench *prefs)
389 {
390         struct xseg_request *req;
391         struct xseg *xseg = peer->xseg;
392         struct peer_req *pr;
393         xport srcport = prefs->src_port;
394         xport dstport = prefs->dst_port;
395         xport p;
396
397         int r;
398         uint64_t new;
399         uint64_t size = prefs->bs;
400
401         //srcport and dstport must already be provided by the user.
402         //returns struct xseg_request with basic initializations
403         XSEGLOG2(&lc, D, "Get new request\n");
404         timer_start(prefs, prefs->get_tm);
405         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
406         if (!req) {
407                 XSEGLOG2(&lc, W, "Cannot get request\n");
408                 return -1;
409         }
410         timer_stop(prefs, prefs->get_tm, NULL);
411
412         //Allocate enough space for the data and the target's name
413         XSEGLOG2(&lc, D, "Prepare new request\n");
414         r = xseg_prep_request(xseg, req, TARGETLEN, size);
415         if (r < 0) {
416                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
417                                 TARGETLEN, (unsigned long long)size);
418                 goto put_xseg_request;
419         }
420
421         //Determine what the next target/chunk will be, based on I/O pattern
422         new = determine_next(prefs);
423         req->op = prefs->op;
424         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
425         //Create a target of this format: "bench-<global_id>-<obj_no>"
426         create_target(prefs, req, new);
427
428         if (prefs->op == X_WRITE || prefs->op == X_READ) {
429                 req->size = size;
430                 //Calculate the chunk's offset inside the object
431                 req->offset = calculate_offset(prefs, new);
432                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
433
434                 if (prefs->op == X_WRITE)
435                         create_chunk(prefs, req, new);
436         }
437
438
439         //Measure this?
440         XSEGLOG2(&lc, D, "Allocate peer request\n");
441         pr = alloc_peer_req(peer);
442         if (!pr) {
443                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
444                                 peer->nr_ops - xq_count(&peer->free_reqs));
445                 goto put_xseg_request;
446         }
447         pr->peer = peer;
448         pr->portno = srcport;
449         pr->req = req;
450         pr->priv = malloc(sizeof(struct timespec));
451         if (!pr->priv) {
452                 perror("malloc");
453                 goto put_peer_request;
454         }
455
456         //XSEGLOG2(&lc, D, "Set request data\n");
457         r = xseg_set_req_data(xseg, req, pr);
458         if (r < 0) {
459                 XSEGLOG2(&lc, W, "Cannot set request data\n");
460                 goto put_peer_request;
461         }
462
463         /*
464          * Start measuring receive time.
465          * When we receive a request, we need to have its submission time to
466          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
467          * QUESTION: Is this the fastest way?
468          */
469         timer_start(prefs, prefs->rec_tm);
470         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags))
471                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
472
473         //Submit the request from the source port to the target port
474         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
475         timer_start(prefs, prefs->sub_tm);
476         p = xseg_submit(xseg, req, srcport, X_ALLOC);
477         if (p == NoPort) {
478                 XSEGLOG2(&lc, W, "Cannot submit request\n");
479                 goto put_peer_request;
480         }
481         prefs->status->submitted++;
482         timer_stop(prefs, prefs->sub_tm, NULL);
483
484         //Send SIGIO to the process that has bound this port to inform that
485         //IO is possible
486         r = xseg_signal(xseg, p);
487         //if (r < 0)
488         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
489
490         return 0;
491
492 put_peer_request:
493         free(pr->priv);
494         free_peer_req(peer, pr);
495 put_xseg_request:
496         if (xseg_put_request(xseg, req, srcport))
497                 XSEGLOG2(&lc, W, "Cannot put request\n");
498         return -1;
499 }
500
501 /*
502  * This function substitutes the default generic_peerd_loop of peer.c.
503  * It's plugged to struct peerd at custom peer's initialisation
504  */
505 int bench_peerd_loop(void *arg)
506 {
507 #ifdef MT
508         struct thread *t = (struct thread *) arg;
509         struct peerd *peer = t->peer;
510         char *id = t->arg;
511 #else
512         struct peerd *peer = (struct peerd *) arg;
513         char id[4] = {'P','e','e','r'};
514 #endif
515         struct xseg *xseg = peer->xseg;
516         struct bench *prefs = peer->priv;
517         xport portno_start = peer->portno_start;
518         xport portno_end = peer->portno_end;
519         uint64_t threshold=1000/(1 + portno_end - portno_start);
520         pid_t pid = syscall(SYS_gettid);
521         int r;
522         uint64_t loops;
523
524         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
525         xseg_init_local_signal(xseg, peer->portno_start);
526
527         timer_start(prefs, prefs->total_tm);
528 send_request:
529         while (!isTerminate()) {
530 #ifdef MT
531                 if (t->func) {
532                         XSEGLOG2(&lc, D, "%s executes function\n", id);
533                         xseg_cancel_wait(xseg, peer->portno_start);
534                         t->func(t->arg);
535                         t->func = NULL;
536                         t->arg = NULL;
537                         continue;
538                 }
539 #endif
540                 while (CAN_SEND_REQUEST(prefs)) {
541                         xseg_cancel_wait(xseg, peer->portno_start);
542                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
543                                         prefs->status->submitted - prefs->status->received,
544                                         prefs->iodepth, prefs->status->received,
545                                         prefs->status->max);
546                         XSEGLOG2(&lc, D, "Start sending new request\n");
547                         r = send_request(peer, prefs);
548                         if (r < 0)
549                                 break;
550                 }
551                 //Heart of peerd_loop. This loop is common for everyone.
552                 for (loops = threshold; loops > 0; loops--) {
553                         if (loops == 1)
554                                 xseg_prepare_wait(xseg, peer->portno_start);
555
556                         if (check_ports(peer)) {
557                                 //If an old request has just been acked, the most sensible
558                                 //thing to do is to immediately send a new one
559                                 if (prefs->status->received < prefs->status->max)
560                                         goto send_request;
561                                 else
562                                         return 0;
563                         }
564                 }
565                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
566                 //struct xq *q;
567                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
568                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
569                 //              id, xq_count(q));
570                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
571 #ifdef ST_THREADS
572                 if (ta){
573                         st_sleep(0);
574                         continue;
575                 }
576 #endif
577                 xseg_wait_signal(xseg, 10000000UL);
578                 xseg_cancel_wait(xseg, peer->portno_start);
579                 XSEGLOG2(&lc, I, "%s woke up\n", id);
580         }
581
582         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
583                         xq_count(&peer->free_reqs), peer->nr_ops);
584         return 0;
585 }
586
587 void custom_peer_finalize(struct peerd *peer)
588 {
589         struct bench *prefs = peer->priv;
590         //TODO: Measure mean time, standard variation
591
592         if (!prefs->total_tm->completed)
593                 timer_stop(prefs, prefs->total_tm, NULL);
594
595         print_stats(prefs);
596         print_res(prefs, prefs->total_tm, "Total Requests");
597         return;
598 }
599
600 /*
601  * handle_received: +1 to our received requests.
602  * Do some sanity checks and then check if request is failed.
603  * If not try to verify the request if asked.
604  */
605 static void handle_received(struct peerd *peer, struct peer_req *pr)
606 {
607         //FIXME: handle null pointer
608         struct bench *prefs = peer->priv;
609         struct timer *rec = prefs->rec_tm;
610
611         prefs->status->received++;
612         if (!pr->req) {
613                 //This is a serious error, so we must stop
614                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
615                 terminated++;
616                 return;
617         }
618
619         if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
620                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
621                 return;
622         }
623
624         timer_stop(prefs, rec, pr->priv);
625
626         if (!(pr->req->state & XS_SERVED))
627                 prefs->status->failed++;
628         else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
629                 prefs->status->corrupted++;
630
631         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
632                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
633
634         //QUESTION, can't we just keep the malloced memory for future use?
635         free(pr->priv);
636         free_peer_req(peer, pr);
637 }
638
639 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
640                 enum dispatch_reason reason)
641 {
642         switch (reason) {
643                 case dispatch_accept:
644                         //This is wrong, benchmarking peer should not accept requests,
645                         //only receive them.
646                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
647                         complete(peer, pr);
648                         break;
649                 case dispatch_receive:
650                         handle_received(peer, pr);
651                         break;
652                 default:
653                         fail(peer, pr);
654         }
655         return 0;
656 }