Statistics
| Branch: | Revision:

root / xseg / peers / user / bench-xseg.c @ 58b941b9

History | View | Annotate | Download (16.5 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48
#include <limits.h>
49

    
50
char global_id[IDLEN];
51
uint64_t global_seed;
52

    
53
/*
54
 * This function checks two things:
55
 * a) If in-flight requests are less than given iodepth
56
 * b) If we have submitted al of the requests
57
 */
58
#define CAN_SEND_REQUEST(prefs)        \
59
        prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth &&        \
60
        prefs->sub_tm->completed < prefs->max_requests        \
61

    
62
void custom_peer_usage()
63
{
64
        fprintf(stderr, "Custom peer options: \n"
65
                        "  --------------------------------------------\n"
66
                        "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67
                        "    --pattern | None    | I/O pattern [seq|rand]\n"
68
                        "    -to       | None    | Total objects (not for read/write)\n"
69
                        "    -ts       | None    | Total I/O size\n"
70
                        "    -os       | 4M      | Object size\n"
71
                        "    -bs       | 4k      | Block size\n"
72
                        "    -dp       | None    | Destination port\n"
73
                        "    --iodepth | 1       | Number of in-flight I/O requests\n"
74
                        "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75
                        "    --seed    | None    | Inititalize LFSR and target names\n"
76
                        "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77
                        "              |         |     [sane|eccentric|manic|paranoid]\n"
78
                        "\n");
79
}
80

    
81
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82
{
83
        struct bench *prefs;
84
        char total_objects[MAX_ARG_LEN + 1];
85
        char total_size[MAX_ARG_LEN + 1];
86
        char object_size[MAX_ARG_LEN + 1];
87
        char block_size[MAX_ARG_LEN + 1];
88
        char op[MAX_ARG_LEN + 1];
89
        char pattern[MAX_ARG_LEN + 1];
90
        char insanity[MAX_ARG_LEN + 1];
91
        struct xseg *xseg = peer->xseg;
92
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
93
        long dst_port = -1;
94
        int r;
95

    
96
        op[0] = 0;
97
        pattern[0] = 0;
98
        total_objects[0] = 0;
99
        total_size[0] = 0;
100
        block_size[0] = 0;
101
        object_size[0] = 0;
102
        insanity[0] = 0;
103

    
104
#ifdef MT
105
        for (i = 0; i < nr_threads; i++) {
106
                prefs = peer->thread[i]->priv;
107
                prefs = malloc(sizeof(struct bench));
108
                if (!prefs) {
109
                        perror("malloc");
110
                        return -1;
111
                }
112
        }
113
#endif
114
        prefs = malloc(sizeof(struct bench));
115
        if (!prefs) {
116
                perror("malloc");
117
                return -1;
118
        }
119
        prefs->flags = 0;
120

    
121
        //Begin reading the benchmark-specific arguments
122
        BEGIN_READ_ARGS(argc, argv);
123
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
124
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
125
        READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
126
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
127
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
128
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
129
        READ_ARG_ULONG("--iodepth", prefs->iodepth);
130
        READ_ARG_ULONG("-dp", dst_port);
131
        READ_ARG_STRING("--insanity", block_size, MAX_ARG_LEN);
132
        END_READ_ARGS();
133

    
134
        /*****************************\
135
         * Check I/O type parameters *
136
        \*****************************/
137

    
138
        //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
139
        //The I/O pattern of thesee operations can be either synchronous (sync) or
140
        //random (rand)
141
        if (!op[0]) {
142
                XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
143
                goto arg_fail;
144
        }
145
        r = read_op(op);
146
        if (r < 0) {
147
                XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
148
                goto arg_fail;
149
        }
150
        prefs->op = r;
151

    
152
        if (!pattern[0]) {
153
                XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
154
                goto arg_fail;
155
        }
156
        r = read_pattern(pattern);
157
        if (r < 0) {
158
                XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
159
                goto arg_fail;
160
        }
161
        prefs->flags |= (uint8_t)r;
162

    
163
        /**************************\
164
         * Check timer parameters *
165
        \**************************/
166

    
167
        //Most of the times, not all timers need to be used.
168
        //We can choose which timers will be used by adjusting the "insanity"
169
        //level of the benchmark i.e. the obscurity of code paths (get request,
170
        //submit request) that will be timed.
171
        if (!insanity[0])
172
                strcpy(insanity, "sane");
173

    
174
        prefs->insanity = read_insanity(insanity);
175
        if (prefs->insanity < 0) {
176
                XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
177
                goto arg_fail;
178
        }
179

    
180
        /*
181
         * If we have a request other than read/write, we don't need to check
182
         * about size parameters, but only how many objects we want to affect
183
         */
184
        if (prefs->op != X_READ && prefs->op != X_WRITE) {
185

    
186
                /***************************\
187
                 * Check object parameters *
188
                \***************************/
189

    
190
                if (!total_objects[0]) {
191
                        XSEGLOG2(&lc, E,
192
                                        "Total number of objects needs to be supplied\n");
193
                        goto arg_fail;
194
                }
195
                prefs->to = str2num(total_objects);
196
                if (!prefs->to) {
197
                        XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
198
                        goto arg_fail;
199
                }
200

    
201
                //In this case, the maximum number of requests is the total number of
202
                //objects we will handle
203
                prefs->max_requests = prefs->to;
204
        } else {
205

    
206
                /*************************\
207
                 * Check size parameters *
208
                \*************************/
209

    
210
                //Block size (bs): Defaults to 4K.
211
                //It must be a number followed by one of these characters:
212
                //                                                [k|K|m|M|g|G]
213
                //If not, it will be considered as size in bytes.
214
                //Must be integer multiple of segment's page size (typically 4k).
215
                if (!block_size[0])
216
                        strcpy(block_size,"4k");
217

    
218
                if (!prefs->iodepth)
219
                        prefs->iodepth = 1;
220

    
221
                prefs->bs = str2num(block_size);
222
                if (!prefs->bs) {
223
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
224
                        goto arg_fail;
225
                } else if (prefs->bs % xseg_page_size) {
226
                        XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
227
                        goto arg_fail;
228
                }
229

    
230
                //Total I/O size (ts): Must be supplied by user.
231
                //Must have the same format as "total size"
232
                //Must be integer multiple of "block size"
233
                if (!total_size[0]) {
234
                        XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
235
                        goto arg_fail;
236
                }
237

    
238
                prefs->ts = str2num(total_size);
239
                if (!prefs->ts) {
240
                        XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
241
                        goto arg_fail;
242
                } else if (prefs->ts % prefs->bs) {
243
                        XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
244
                        goto arg_fail;
245
                } else if (prefs->ts > xseg->segment_size) {
246
                        XSEGLOG2(&lc, E,
247
                                        "Total I/O size exceeds segment size\n", total_size);
248
                        goto arg_fail;
249
                }
250

    
251
                //Object size (os): Defaults to 4M.
252
                //Must have the same format as "total size"
253
                //Must be integer multiple of "block size"
254
                if (!object_size[0])
255
                        strcpy(object_size,"4M");
256

    
257
                prefs->os = str2num(object_size);
258
                if (!prefs->os) {
259
                        XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
260
                        goto arg_fail;
261
                } else if (prefs->os % prefs->bs) {
262
                        XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
263
                        goto arg_fail;
264
                }
265

    
266
                //In this case, the maximum number of requests is the number of blocks
267
                //we need to cover the total I/O size
268
                prefs->max_requests = prefs->ts / prefs->bs;
269
        }
270

    
271
        /*************************
272
         * Check port parameters *
273
         *************************/
274

    
275
        if (dst_port < 0){
276
                XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
277
                goto arg_fail;
278
        }
279

    
280
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
281
        prefs->dst_port = (xport) dst_port;
282

    
283
        /*********************************
284
         * Create timers for all metrics *
285
         *********************************/
286

    
287
        if (init_timer(&prefs->total_tm, TM_SANE))
288
                goto tm_fail;
289
        if (init_timer(&prefs->sub_tm, TM_MANIC))
290
                goto tm_fail;
291
        if (init_timer(&prefs->get_tm, TM_PARANOID))
292
                goto tm_fail;
293
        if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
294
                goto tm_fail;
295

    
296
        /********************************\
297
         * Customize struct peerd/prefs *
298
         \********************************/
299

    
300
        prefs->peer = peer;
301

    
302
        //The following function initializes the global_id, global_seed extern
303
        //variables.
304
        create_id();
305

    
306
        if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
307
                prefs->lfsr = malloc(sizeof(struct lfsr));
308
                if (!prefs->lfsr) {
309
                        perror("malloc");
310
                        goto lfsr_fail;
311
                }
312
                //FIXME: Give a name to max requests, not just prefs->ts / prefs->bs
313
                //FIXME: handle better the seed passing than just giving UINT64_MAX
314
                if (lfsr_init(prefs->lfsr, prefs->max_requests, UINT64_MAX)) {
315
                        XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
316
                        goto lfsr_fail;
317
                }
318
        }
319

    
320
        peer->peerd_loop = custom_peerd_loop;
321
        peer->priv = (void *) prefs;
322
        return 0;
323

    
324
arg_fail:
325
        custom_peer_usage();
326
lfsr_fail:
327
        free(prefs->lfsr);
328
tm_fail:
329
        free(prefs->total_tm);
330
        free(prefs->sub_tm);
331
        free(prefs->get_tm);
332
        free(prefs->rec_tm);
333
        free(prefs);
334
        return -1;
335
}
336

    
337

    
338
static int send_request(struct peerd *peer, struct bench *prefs)
339
{
340
        struct xseg_request *req;
341
        struct xseg *xseg = peer->xseg;
342
        struct peer_req *pr;
343
        xport srcport = prefs->src_port;
344
        xport dstport = prefs->dst_port;
345
        xport p;
346

    
347
        int r;
348
        uint64_t new;
349
        uint64_t size = prefs->bs;
350

    
351
        //srcport and dstport must already be provided by the user.
352
        //returns struct xseg_request with basic initializations
353
        //XSEGLOG2(&lc, D, "Get new request\n");
354
        timer_start(prefs, prefs->get_tm);
355
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
356
        if (!req) {
357
                XSEGLOG2(&lc, W, "Cannot get request\n");
358
                return -1;
359
        }
360
        timer_stop(prefs, prefs->get_tm, NULL);
361

    
362
        //Allocate enough space for the data and the target's name
363
        //XSEGLOG2(&lc, D, "Prepare new request\n");
364
        r = xseg_prep_request(xseg, req, TARGETLEN, size);
365
        if (r < 0) {
366
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
367
                                TARGETLEN, (unsigned long long)size);
368
                goto put_xseg_request;
369
        }
370

    
371
        //Determine what the next target/chunk will be, based on I/O pattern
372
        new = determine_next(prefs);
373
        XSEGLOG2(&lc, D, "Our new request is %lu\n", new);
374
        //Create a target of this format: "bench-<obj_no>"
375
        create_target(prefs, req, new);
376

    
377
        if(prefs->op == X_WRITE || prefs->op == X_READ) {
378
                req->size = size;
379
                //Calculate the chunk offset inside the object
380
                req->offset = (new * prefs->bs) % prefs->os;
381
                XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
382

    
383
                if(prefs->op == X_WRITE)
384
                        create_chunk(prefs, req, new);
385
        }
386

    
387
        req->op = prefs->op;
388

    
389
        //Measure this?
390
        //XSEGLOG2(&lc, D, "Allocate peer request\n");
391
        pr = alloc_peer_req(peer);
392
        if (!pr) {
393
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
394
                                peer->nr_ops - xq_count(&peer->free_reqs));
395
                goto put_xseg_request;
396
        }
397
        pr->peer = peer;
398
        pr->portno = srcport;
399
        pr->req = req;
400
        pr->priv = malloc(sizeof(struct timespec));
401
        if(!pr->priv) {
402
                perror("malloc");
403
                goto put_peer_request;
404
        }
405

    
406
        //XSEGLOG2(&lc, D, "Set request data\n");
407
        r = xseg_set_req_data(xseg, req, pr);
408
        if (r < 0) {
409
                XSEGLOG2(&lc, W, "Cannot set request data\n");
410
                goto put_peer_request;
411
        }
412

    
413
        /*
414
         * Start measuring receive time.
415
         * When we receive a request, we need to have its submission time to
416
         * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
417
         * QUESTION: Is this the fastest way?
418
         */
419
        timer_start(prefs, prefs->rec_tm);
420
        memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
421

    
422
        //Submit the request from the source port to the target port
423
        //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
424
        //QUESTION: Can't we just use the submision time calculated previously?
425
        timer_start(prefs, prefs->sub_tm);
426
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
427
        if (p == NoPort) {
428
                XSEGLOG2(&lc, W, "Cannot submit request\n");
429
                goto put_peer_request;
430
        }
431
        timer_stop(prefs, prefs->sub_tm, NULL);
432

    
433
        //Send SIGIO to the process that has binded this port to inform that
434
        //IO is possible
435
        xseg_signal(xseg, p);
436

    
437
        return 0;
438

    
439
put_peer_request:
440
        free(pr->priv);
441
        free_peer_req(peer, pr);
442
put_xseg_request:
443
        if (xseg_put_request(xseg, req, srcport))
444
                XSEGLOG2(&lc, W, "Cannot put request\n");
445
        return -1;
446
}
447

    
448
/*
449
 * This function substitutes the default generic_peerd_loop of peer.c.
450
 * It's plugged to struct peerd at custom peer's initialisation
451
 */
452
int custom_peerd_loop(void *arg)
453
{
454
#ifdef MT
455
        struct thread *t = (struct thread *) arg;
456
        struct peerd *peer = t->peer;
457
        char *id = t->arg;
458
#else
459
        struct peerd *peer = (struct peerd *) arg;
460
        char id[4] = {'P','e','e','r'};
461
#endif
462
        struct xseg *xseg = peer->xseg;
463
        struct bench *prefs = peer->priv;
464
        xport portno_start = peer->portno_start;
465
        xport portno_end = peer->portno_end;
466
        uint64_t threshold=1000/(1 + portno_end - portno_start);
467
        pid_t pid =syscall(SYS_gettid);
468
        int r;
469

    
470

    
471
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
472
        xseg_init_local_signal(xseg, peer->portno_start);
473
        uint64_t loops;
474

    
475
        timer_start(prefs, prefs->total_tm);
476
        while (!isTerminate()) {
477
#ifdef MT
478
                if (t->func) {
479
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
480
                        xseg_cancel_wait(xseg, peer->portno_start);
481
                        t->func(t->arg);
482
                        t->func = NULL;
483
                        t->arg = NULL;
484
                        continue;
485
                }
486
#endif
487
send_request:
488
                while (CAN_SEND_REQUEST(prefs)) {
489
                        XSEGLOG2(&lc, D, "Because %lu < %lu && %lu < %lu\n",
490
                                        prefs->sub_tm->completed - prefs->rec_tm->completed,
491
                                        prefs->iodepth, prefs->sub_tm->completed,
492
                                        prefs->max_requests);
493
                        xseg_cancel_wait(xseg, peer->portno_start);
494
                        XSEGLOG2(&lc, D, "Start sending new request\n");
495
                        r = send_request(peer, prefs);
496
                        if (r < 0)
497
                                break;
498
                }
499
                //Heart of peerd_loop. This loop is common for everyone.
500
                for (loops = threshold; loops > 0; loops--) {
501
                        if (check_ports(peer)) {
502
                                if (prefs->max_requests == prefs->rec_tm->completed)
503
                                        return 0;
504
                                else
505
                                        //If an old request has just been acked, the most sensible
506
                                        //thing to do is to immediately send a new one
507
                                        goto send_request;
508
                        }
509
                }
510
                XSEGLOG2(&lc, I, "%s goes to sleep\n",id);
511
                xseg_prepare_wait(xseg, peer->portno_start);
512
#ifdef ST_THREADS
513
                if (ta){
514
                        st_sleep(0);
515
                        continue;
516
                }
517
#endif
518
                xseg_wait_signal(xseg, 10000000UL);
519
                xseg_cancel_wait(xseg, peer->portno_start);
520
                XSEGLOG2(&lc, I, "%s woke up\n", id);
521
        }
522

    
523
        XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
524
                        xq_count(&peer->free_reqs), peer->nr_ops);
525
        return 0;
526
}
527

    
528
void custom_peer_finalize(struct peerd *peer)
529
{
530
        struct bench *prefs = peer->priv;
531
        //TODO: Measure mean time, standard variation
532
        struct tm_result total; //mean, std;
533

    
534
        if (!prefs->total_tm->completed)
535
                timer_stop(prefs, prefs->total_tm, NULL);
536

    
537
        separate_by_order(prefs->total_tm->sum, &total);
538
        print_res(total, "Total Time");
539
        return;
540
}
541

    
542

    
543
static void handle_received(struct peerd *peer, struct peer_req *pr)
544
{
545
        //FIXME: handle null pointer
546
        struct bench *prefs = peer->priv;
547
        struct timer *rec = prefs->rec_tm;
548

    
549
        if (!pr->req) {
550
                //This is a serious error, so we must stop
551
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
552
                terminated++;
553
                return;
554
        }
555

    
556
        if (!pr->priv) {
557
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
558
                return;
559
        }
560

    
561
        timer_stop(prefs, rec, pr->priv);
562

    
563
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
564
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");
565

    
566
        //QUESTION, can't we just keep the malloced memory for future use?
567
        free(pr->priv);
568
        free_peer_req(peer, pr);
569
}
570

    
571
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
572
                enum dispatch_reason reason)
573
{
574
        switch (reason) {
575
                case dispatch_accept:
576
                        //This is wrong, benchmarking peer should not accept requests,
577
                        //only receive them.
578
                        XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
579
                        complete(peer, pr);
580
                        break;
581
                case dispatch_receive:
582
                        handle_received(peer, pr);
583
                        break;
584
                default:
585
                        fail(peer, pr);
586
        }
587
        return 0;
588
}