Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / bench-xseg.c @ b74307a4

History | View | Annotate | Download (19.7 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48
#include <bench-lfsr.h>
49
#include <limits.h>
50

    
51
char global_id[IDLEN];
52
/*
53
 * This macro checks two things:
54
 * a) If in-flight requests are less than given iodepth
55
 * b) If we have submitted all of the requests
56
 */
57
#define CAN_SEND_REQUEST(__p)                                                                                        \
58
        ((__p->status->submitted - __p->status->received < __p->iodepth) && \
59
        (__p->status->submitted < __p->status->max) &&                                                \
60
        !isTerminate())
61

    
62
#define CAN_VERIFY(__p)                                                                                                        \
63
        ((GET_FLAG(VERIFY, __p->flags) != VERIFY_NO) && __p->op == X_READ)
64

    
65
#define CAN_PRINT_PROGRESS(__p, __q)                                                                        \
66
        ((GET_FLAG(PROGRESS, __p->flags) == PROGRESS_YES) &&                                \
67
        (__p->status->received == __q))
68

    
69
void custom_peer_usage()
70
{
71
        fprintf(stderr, "Custom peer options: \n"
72
                        "  --------------------------------------------\n"
73
                        "    -op       | None    | XSEG operation [read|write|info|delete]\n"
74
                        "    --pattern | None    | I/O pattern [seq|rand]\n"
75
                        "    --verify  | no      | Verify written requests [no|meta|full]\n"
76
                        "    -rc       | None    | Request cap\n"
77
                        "    -to       | None    | Total objects\n"
78
                        "    -ts       | None    | Total I/O size\n"
79
                        "    -os       | 4M      | Object size\n"
80
                        "    -bs       | 4k      | Block size\n"
81
                        "    -tp       | None    | Target port\n"
82
                        "    --iodepth | 1       | Number of in-flight I/O requests\n"
83
                        "    --seed    | None    | Initialize LFSR and target names\n"
84
                        "    --insanity| sane    | Adjust insanity level of benchmark:\n"
85
                        "              |         |     [sane|eccentric|manic|paranoid]\n"
86
                        "    --progress| yes     | Show progress of requests\n"
87
                        "\n"
88
                        "Additional information:\n"
89
                        "  --------------------------------------------\n"
90
                        "  The -to and -ts options are mutually exclusive\n"
91
                        "\n");
92
}
93

    
94
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
95
{
96
        struct bench *prefs;
97
        char request_cap[MAX_ARG_LEN + 1];
98
        char total_objects[MAX_ARG_LEN + 1];
99
        char total_size[MAX_ARG_LEN + 1];
100
        char object_size[MAX_ARG_LEN + 1];
101
        char block_size[MAX_ARG_LEN + 1];
102
        char op[MAX_ARG_LEN + 1];
103
        char pattern[MAX_ARG_LEN + 1];
104
        char insanity[MAX_ARG_LEN + 1];
105
        char verify[MAX_ARG_LEN + 1];
106
        char progress[MAX_ARG_LEN + 1];
107
        struct xseg *xseg = peer->xseg;
108
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
109
        long iodepth = -1;
110
        long dst_port = -1;
111
        unsigned long seed = -1;
112
        uint64_t rc;
113
        struct timespec timer_seed;
114
        struct timespec *ts;
115
        int set_by_hand = 0;
116
        int i, r;
117

    
118
        op[0] = 0;
119
        pattern[0] = 0;
120
        total_objects[0] = 0;
121
        total_size[0] = 0;
122
        block_size[0] = 0;
123
        object_size[0] = 0;
124
        insanity[0] = 0;
125
        verify[0] = 0;
126
        request_cap[0] = 0;
127
        progress[0] = 0;
128

    
129
#ifdef MT
130
        for (i = 0; i < nr_threads; i++) {
131
                prefs = peer->thread[i]->priv;
132
                prefs = malloc(sizeof(struct bench));
133
                if (!prefs) {
134
                        perror("malloc");
135
                        return -1;
136
                }
137
        }
138
#endif
139
        prefs = malloc(sizeof(struct bench));
140
        if (!prefs) {
141
                perror("malloc");
142
                return -1;
143
        }
144
        memset(prefs, 0, sizeof(struct bench));
145

    
146
        prefs->status = malloc(sizeof(struct req_status));
147
        if (!prefs->status) {
148
                perror("malloc");
149
                return -1;
150
        }
151

    
152
        memset(prefs->status, 0, sizeof(struct req_status));
153

    
154
        //Begin reading the benchmark-specific arguments
155
        BEGIN_READ_ARGS(argc, argv);
156
        READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
157
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
158
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
159
        READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
160
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
161
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
162
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
163
        READ_ARG_ULONG("--iodepth", iodepth);
164
        READ_ARG_ULONG("-tp", dst_port);
165
        READ_ARG_ULONG("--seed", seed);
166
        READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
167
        READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
168
        READ_ARG_STRING("--progress", progress, MAX_ARG_LEN);
169
        END_READ_ARGS();
170

    
171
        /*****************************\
172
         * Check I/O type parameters *
173
        \*****************************/
174

    
175
        //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
176
        //The I/O pattern of these operations can be either sequential (seq) or
177
        //random (rand)
178
        if (!op[0]) {
179
                XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
180
                goto arg_fail;
181
        }
182
        r = read_op(op);
183
        if (r < 0) {
184
                XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
185
                goto arg_fail;
186
        }
187
        prefs->op = r;
188

    
189
        if (!pattern[0]) {
190
                XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
191
                goto arg_fail;
192
        }
193
        r = read_pattern(pattern);
194
        if (r < 0) {
195
                XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
196
                goto arg_fail;
197
        }
198
        SET_FLAG(PATTERN, prefs->flags, r);
199

    
200
        if (!verify[0])
201
                strcpy(verify, "no");
202
        r = read_verify(verify);
203
        if (r < 0) {
204
                XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
205
                goto arg_fail;
206
        }
207
        SET_FLAG(VERIFY, prefs->flags, r);
208

    
209
        //Default iodepth value is 1
210
        if (iodepth < 0)
211
                prefs->iodepth = 1;
212
        else
213
                prefs->iodepth = iodepth;
214

    
215
        /**************************\
216
         * Check timer parameters *
217
        \**************************/
218

    
219
        //Most of the times, not all timers need to be used.
220
        //We can choose which timers will be used by adjusting the "insanity"
221
        //level of the benchmark i.e. the obscurity of code paths (get request,
222
        //submit request) that will be timed.
223
        if (!insanity[0])
224
                strcpy(insanity, "sane");
225

    
226
        r = read_insanity(insanity);
227
        if (r < 0) {
228
                XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
229
                goto arg_fail;
230
        }
231
        SET_FLAG(INSANITY, prefs->flags, r);
232

    
233
        /*****************************\
234
         * Check I/O size parameters *
235
        \*****************************/
236

    
237
        //Block size (bs): Defaults to 4K.
238
        //It must be a number followed by one of these characters:
239
        //                                                [k|K|m|M|g|G]
240
        //If not, it will be considered as size in bytes.
241
        //Must be integer multiple of segment's page size (typically 4k).
242
        if (!block_size[0])
243
                strcpy(block_size,"4k");
244

    
245
        prefs->bs = str2num(block_size);
246
        if (!prefs->bs) {
247
                XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
248
                goto arg_fail;
249
        } else if (prefs->bs % xseg_page_size) {
250
                XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
251
                goto arg_fail;
252
        }
253

    
254
        //Total objects (to) or total I/O size (ts).
255
        //Must have the same format as "block size"
256
        //They are mutually exclusive
257
        if (total_objects[0] && total_size[0]) {
258
                XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
259
                goto arg_fail;
260
        } else if (total_objects[0]) {
261
                prefs->to = str2num(total_objects);
262
                if (!prefs->to) {
263
                        XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
264
                        goto arg_fail;
265
                }
266
                //In this case, the maximum number of requests is the total number of
267
                //objects we will handle
268
                prefs->status->max = prefs->to;
269
        } else if (total_size[0]) {
270
                if (prefs->op != X_READ && prefs->op != X_WRITE) {
271
                        XSEGLOG2(&lc, E,
272
                                        "Total objects must be supplied (required by -op %s)\n", op);
273
                        goto arg_fail;
274
                }
275
                prefs->ts = str2num(total_size);
276
                if (!prefs->ts) {
277
                        XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
278
                        goto arg_fail;
279
                } else if (prefs->ts % prefs->bs) {
280
                        XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
281
                        goto arg_fail;
282
                }
283
                //In this case, the maximum number of requests is the number of blocks
284
                //we need to cover the total I/O size
285
                prefs->status->max = prefs->ts / prefs->bs;
286
        } else {
287
                XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
288
                goto arg_fail;
289
        }
290

    
291
        //Object size (os): Defaults to 4M.
292
        //Must have the same format as "block size"
293
        //Must be integer multiple of "block size"
294
        if (!object_size[0])
295
                strcpy(object_size,"4M");
296

    
297
        prefs->os = str2num(object_size);
298
        if (!prefs->os) {
299
                XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
300
                goto arg_fail;
301
        } else if (prefs->os % prefs->bs) {
302
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
303
                goto arg_fail;
304
        }
305

    
306

    
307
        /*************************\
308
         * Check port parameters *
309
        \*************************/
310

    
311
        if (dst_port < 0){
312
                XSEGLOG2(&lc, E, "Target port must be supplied\n");
313
                goto arg_fail;
314
        }
315

    
316
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
317
        prefs->dst_port = (xport) dst_port;
318

    
319
        /*********************************\
320
         * Create timers for all metrics *
321
        \*********************************/
322

    
323
        if (init_timer(&prefs->total_tm, INSANITY_SANE))
324
                goto tm_fail;
325
        if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
326
                goto tm_fail;
327
        if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
328
                goto tm_fail;
329
        if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
330
                goto tm_fail;
331

    
332
        if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
333
                for (i = 0; i < peer->nr_ops; i++) {
334
                        ts = malloc(sizeof(struct timespec));
335
                        if (!ts) {
336
                                XSEGLOG2(&lc, E, "Timespec allocation failed\n");
337
                                goto tm_fail;
338
                        }
339
                        peer->peer_reqs[i].priv = ts;
340
                }
341
        }
342

    
343
        /*************************************\
344
         * Initialize the LFSR and global_id *
345
        \*************************************/
346
reseed:
347
        //We proceed to initialise the global_id, and seed variables.
348
        if (seed == -1) {
349
                clock_gettime(CLOCK_BENCH, &timer_seed);
350
                seed = timer_seed.tv_nsec;
351
        } else {
352
                set_by_hand = 1;
353
        }
354
        create_id(seed);
355

    
356
        if (prefs->status->max == 1)
357
                SET_FLAG(PATTERN, prefs->flags, PATTERN_SEQ);
358

    
359
        if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
360
                prefs->lfsr = malloc(sizeof(struct bench_lfsr));
361
                if (!prefs->lfsr) {
362
                        perror("malloc");
363
                        goto lfsr_fail;
364
                }
365

    
366
                r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
367
                if (r && set_by_hand) {
368
                        XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
369
                        goto lfsr_fail;
370
                } else if (r) {
371
                        seed = -1;
372
                        goto reseed;
373
                }
374
        }
375

    
376
        /*********************************\
377
         * Miscellaneous initializations *
378
        \*********************************/
379

    
380
        /* The request cap must be enforced only after the LFSR is initialized */
381
        if (request_cap[0]) {
382
                rc = str2num(request_cap);
383
                if (!rc) {
384
                        XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
385
                        goto arg_fail;
386
                } else if (rc > prefs->status->max) {
387
                        XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
388
                        goto arg_fail;
389
                }
390
                prefs->status->max = rc;
391
        }
392

    
393
        /* Benchmarking progress printing is on by default */
394
        if (!progress[0])
395
                strcpy(progress, "yes");
396
        r = read_progress(progress);
397
        if (r < 0) {
398
                XSEGLOG2(&lc, E, "Invalid syntax: --progress %s\n", progress);
399
                goto arg_fail;
400
        }
401
        SET_FLAG(PROGRESS, prefs->flags, r);
402

    
403
        prefs->peer = peer;
404
        peer->peerd_loop = bench_peerd_loop;
405
        peer->priv = (void *) prefs;
406
        XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
407
        return 0;
408

    
409
arg_fail:
410
        custom_peer_usage();
411
lfsr_fail:
412
        free(prefs->lfsr);
413
tm_fail:
414
        free(prefs->total_tm);
415
        free(prefs->sub_tm);
416
        free(prefs->get_tm);
417
        free(prefs->rec_tm);
418
        free(prefs);
419
        return -1;
420
}
421

    
422

    
423
static int send_request(struct peerd *peer, struct bench *prefs)
424
{
425
        struct xseg_request *req;
426
        struct xseg *xseg = peer->xseg;
427
        struct peer_req *pr;
428
        xport srcport = prefs->src_port;
429
        xport dstport = prefs->dst_port;
430
        xport p;
431

    
432
        int r;
433
        uint64_t new;
434
        uint64_t size = prefs->bs;
435
        struct timespec *ts;
436

    
437
        //srcport and dstport must already be provided by the user.
438
        //returns struct xseg_request with basic initializations
439
        XSEGLOG2(&lc, D, "Get new request\n");
440
        timer_start(prefs, prefs->get_tm);
441
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
442
        if (!req) {
443
                XSEGLOG2(&lc, W, "Cannot get request\n");
444
                return -1;
445
        }
446
        timer_stop(prefs, prefs->get_tm, NULL);
447

    
448
        //Allocate enough space for the data and the target's name
449
        XSEGLOG2(&lc, D, "Prepare new request\n");
450
        r = xseg_prep_request(xseg, req, TARGETLEN, size);
451
        if (r < 0) {
452
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
453
                                TARGETLEN, (unsigned long long)size);
454
                goto put_xseg_request;
455
        }
456

    
457
        //Determine what the next target/chunk will be, based on I/O pattern
458
        new = determine_next(prefs);
459
        req->op = prefs->op;
460
        XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
461
        //Create a target of this format: "bench-<global_id>-<obj_no>"
462
        create_target(prefs, req, new);
463

    
464
        if (prefs->op == X_WRITE || prefs->op == X_READ) {
465
                req->size = size;
466
                //Calculate the chunk's offset inside the object
467
                req->offset = calculate_offset(prefs, new);
468
                XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
469

    
470
                if (prefs->op == X_WRITE)
471
                        create_chunk(prefs, req, new);
472
        }
473

    
474
        XSEGLOG2(&lc, D, "Allocate peer request\n");
475
        pr = alloc_peer_req(peer);
476
        if (!pr) {
477
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
478
                                peer->nr_ops - xq_count(&peer->free_reqs));
479
                goto put_xseg_request;
480
        }
481
        pr->peer = peer;
482
        pr->portno = srcport;
483
        pr->req = req;
484

    
485
        //XSEGLOG2(&lc, D, "Set request data\n");
486
        r = xseg_set_req_data(xseg, req, pr);
487
        if (r < 0) {
488
                XSEGLOG2(&lc, W, "Cannot set request data\n");
489
                goto put_peer_request;
490
        }
491

    
492
        /*
493
         * Start measuring receive time.
494
         * When we receive a request, we need to have its submission time to
495
         * measure elapsed time. Thus, we copy its submission time to pr->priv.
496
         * QUESTION: Is this the fastest way?
497
         */
498
        timer_start(prefs, prefs->rec_tm);
499
        if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
500
                ts = (struct timespec *)pr->priv;
501
                ts->tv_sec = prefs->rec_tm->start_time.tv_sec;
502
                ts->tv_nsec = prefs->rec_tm->start_time.tv_nsec;
503
        }
504

    
505
        //Submit the request from the source port to the target port
506
        XSEGLOG2(&lc, D, "Submit request %lu\n", new);
507
        timer_start(prefs, prefs->sub_tm);
508
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
509
        if (p == NoPort) {
510
                XSEGLOG2(&lc, W, "Cannot submit request\n");
511
                goto put_peer_request;
512
        }
513
        prefs->status->submitted++;
514
        timer_stop(prefs, prefs->sub_tm, NULL);
515

    
516
        //Send SIGIO to the process that has bound this port to inform that
517
        //IO is possible
518
        r = xseg_signal(xseg, p);
519
        //if (r < 0)
520
        //        XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
521

    
522
        return 0;
523

    
524
put_peer_request:
525
        free(pr->priv);
526
        free_peer_req(peer, pr);
527
put_xseg_request:
528
        if (xseg_put_request(xseg, req, srcport))
529
                XSEGLOG2(&lc, W, "Cannot put request\n");
530
        return -1;
531
}
532

    
533
/*
534
 * This function substitutes the default generic_peerd_loop of peer.c.
535
 * It's plugged to struct peerd at custom peer's initialisation
536
 */
537
int bench_peerd_loop(void *arg)
538
{
539
#ifdef MT
540
        struct thread *t = (struct thread *) arg;
541
        struct peerd *peer = t->peer;
542
        char *id = t->arg;
543
#else
544
        struct peerd *peer = (struct peerd *) arg;
545
        char id[4] = {'P','e','e','r'};
546
#endif
547
        struct xseg *xseg = peer->xseg;
548
        struct bench *prefs = peer->priv;
549
        xport portno_start = peer->portno_start;
550
        xport portno_end = peer->portno_end;
551
        pid_t pid = syscall(SYS_gettid);
552
        uint64_t threshold=1000/(1 + portno_end - portno_start);
553
        uint64_t cached_prog_quantum = 0;
554
        uint64_t prog_quantum = 0;
555
        int r;
556
        uint64_t loops;
557

    
558
        if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES) {
559
                prog_quantum = calculate_prog_quantum(prefs);
560
                cached_prog_quantum = prog_quantum;
561
                print_stats(prefs);
562
        }
563

    
564
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
565
        xseg_init_local_signal(xseg, peer->portno_start);
566

    
567
        timer_start(prefs, prefs->total_tm);
568
send_request:
569
        while (!(isTerminate() && all_peer_reqs_free(peer))) {
570
                while (CAN_SEND_REQUEST(prefs)) {
571
                        xseg_cancel_wait(xseg, peer->portno_start);
572
                        XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
573
                                        prefs->status->submitted - prefs->status->received,
574
                                        prefs->iodepth, prefs->status->received,
575
                                        prefs->status->max);
576
                        XSEGLOG2(&lc, D, "Start sending new request\n");
577
                        r = send_request(peer, prefs);
578
                        if (r < 0)
579
                                break;
580
                }
581
                //Heart of peerd_loop. This loop is common for everyone.
582
                for (loops = threshold; loops > 0; loops--) {
583
                        if (loops == 1)
584
                                xseg_prepare_wait(xseg, peer->portno_start);
585

    
586
                        if (UNLIKELY(CAN_PRINT_PROGRESS(prefs, prog_quantum))) {
587
                                prog_quantum += cached_prog_quantum;
588
                                print_progress(prefs);
589
                        }
590

    
591
                        if (check_ports(peer)) {
592
                                //If an old request has just been acked, the most sensible
593
                                //thing to do is to immediately send a new one
594
                                if (prefs->status->received < prefs->status->max)
595
                                        goto send_request;
596
                                else
597
                                        return 0;
598
                        }
599
                }
600
                //struct xseg_port *port = xseg_get_port(xseg, portno_start);
601
                //struct xq *q;
602
                //q = XPTR_TAKE(port->request_queue, xseg->segment);
603
                //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
604
                //                id, xq_count(q));
605
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
606
                xseg_wait_signal(xseg, 10000000UL);
607
                xseg_cancel_wait(xseg, peer->portno_start);
608
                XSEGLOG2(&lc, I, "%s woke up\n", id);
609
        }
610

    
611
        XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
612
                        xq_count(&peer->free_reqs), peer->nr_ops);
613
        return 0;
614
}
615

    
616
void custom_peer_finalize(struct peerd *peer)
617
{
618
        struct bench *prefs = peer->priv;
619
        //TODO: Measure mean time, standard variation
620

    
621
        if (!prefs->total_tm->completed)
622
                timer_stop(prefs, prefs->total_tm, NULL);
623

    
624
        if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES)
625
                print_progress(prefs);
626
        else
627
                print_stats(prefs);
628

    
629
        print_remaining(prefs);
630
        print_res(prefs);
631
        return;
632
}
633

    
634
/*
635
 * handle_received: +1 to our received requests.
636
 * Do some sanity checks and then check if request is failed.
637
 * If not try to verify the request if asked.
638
 */
639
static void handle_received(struct peerd *peer, struct peer_req *pr)
640
{
641
        //FIXME: handle null pointer
642
        struct bench *prefs = peer->priv;
643
        struct timer *rec = prefs->rec_tm;
644

    
645
        prefs->status->received++;
646
        if (!pr->req) {
647
                //This is a serious error, so we must stop
648
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
649
                terminated++;
650
                return;
651
        }
652

    
653
        if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
654
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
655
                return;
656
        }
657

    
658
        timer_stop(prefs, rec, (struct timespec *)pr->priv);
659

    
660
        if (!(pr->req->state & XS_SERVED))
661
                prefs->status->failed++;
662
        else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
663
                prefs->status->corrupted++;
664

    
665
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
666
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");
667

    
668
        //QUESTION, can't we just keep the malloced memory for future use?
669
        free(pr->priv);
670
        free_peer_req(peer, pr);
671
}
672

    
673
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
674
                enum dispatch_reason reason)
675
{
676
        switch (reason) {
677
                case dispatch_accept:
678
                        //This is wrong, benchmarking peer should not accept requests,
679
                        //only receive them.
680
                        XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
681
                        complete(peer, pr);
682
                        break;
683
                case dispatch_receive:
684
                        handle_received(peer, pr);
685
                        break;
686
                default:
687
                        fail(peer, pr);
688
        }
689
        return 0;
690
}