Statistics
| Branch: | Revision:

root / xseg / peers / user / bench-xseg.c @ e099873d

History | View | Annotate | Download (16.8 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48
#include <limits.h>
49

    
50
char global_id[IDLEN];
51
uint64_t global_seed;
52

    
53
/*
54
 * This function checks two things:
55
 * a) If in-flight requests are less than given iodepth
56
 * b) If we have submitted all of the requests
57
 */
58
#define CAN_SEND_REQUEST(prefs)        \
59
        prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth &&        \
60
        prefs->sub_tm->completed < prefs->max_requests        \
61

    
62
void custom_peer_usage()
63
{
64
        fprintf(stderr, "Custom peer options: \n"
65
                        "  --------------------------------------------\n"
66
                        "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67
                        "    --pattern | None    | I/O pattern [seq|rand]\n"
68
                        "    -to       | None    | Total objects (not for read/write)\n"
69
                        "    -ts       | None    | Total I/O size\n"
70
                        "    -os       | 4M      | Object size\n"
71
                        "    -bs       | 4k      | Block size\n"
72
                        "    -tp       | None    | Target port\n"
73
                        "    --iodepth | 1       | Number of in-flight I/O requests\n"
74
                        "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75
                        "    --seed    | None    | Inititalize LFSR and target names\n"
76
                        "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77
                        "              |         |     [sane|eccentric|manic|paranoid]\n"
78
                        "\n");
79
}
80

    
81
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82
{
83
        struct bench *prefs;
84
        char total_objects[MAX_ARG_LEN + 1];
85
        char total_size[MAX_ARG_LEN + 1];
86
        char object_size[MAX_ARG_LEN + 1];
87
        char block_size[MAX_ARG_LEN + 1];
88
        char op[MAX_ARG_LEN + 1];
89
        char pattern[MAX_ARG_LEN + 1];
90
        char insanity[MAX_ARG_LEN + 1];
91
        struct xseg *xseg = peer->xseg;
92
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
93
        long iodepth = -1;
94
        long dst_port = -1;
95
        unsigned long seed = -1;
96
        int r;
97

    
98
        op[0] = 0;
99
        pattern[0] = 0;
100
        total_objects[0] = 0;
101
        total_size[0] = 0;
102
        block_size[0] = 0;
103
        object_size[0] = 0;
104
        insanity[0] = 0;
105

    
106
#ifdef MT
107
        for (i = 0; i < nr_threads; i++) {
108
                prefs = peer->thread[i]->priv;
109
                prefs = malloc(sizeof(struct bench));
110
                if (!prefs) {
111
                        perror("malloc");
112
                        return -1;
113
                }
114
        }
115
#endif
116
        prefs = malloc(sizeof(struct bench));
117
        if (!prefs) {
118
                perror("malloc");
119
                return -1;
120
        }
121
        prefs->flags = 0;
122

    
123
        //Begin reading the benchmark-specific arguments
124
        BEGIN_READ_ARGS(argc, argv);
125
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
126
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
127
        READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
128
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
129
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
130
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
131
        READ_ARG_ULONG("--iodepth", iodepth);
132
        READ_ARG_ULONG("-tp", dst_port);
133
        READ_ARG_ULONG("--seed", seed);
134
        READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
135
        END_READ_ARGS();
136

    
137
        /*****************************\
138
         * Check I/O type parameters *
139
        \*****************************/
140

    
141
        //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
142
        //The I/O pattern of thesee operations can be either synchronous (sync) or
143
        //random (rand)
144
        if (!op[0]) {
145
                XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
146
                goto arg_fail;
147
        }
148
        r = read_op(op);
149
        if (r < 0) {
150
                XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
151
                goto arg_fail;
152
        }
153
        prefs->op = r;
154

    
155
        if (!pattern[0]) {
156
                XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
157
                goto arg_fail;
158
        }
159
        r = read_pattern(pattern);
160
        if (r < 0) {
161
                XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
162
                goto arg_fail;
163
        }
164
        prefs->flags |= (uint8_t)r;
165

    
166
        //Defailt iodepth value is 1
167
        if (iodepth < 0)
168
                prefs->iodepth = 1;
169
        else
170
                prefs->iodepth = iodepth;
171

    
172
        /**************************\
173
         * Check timer parameters *
174
        \**************************/
175

    
176
        //Most of the times, not all timers need to be used.
177
        //We can choose which timers will be used by adjusting the "insanity"
178
        //level of the benchmark i.e. the obscurity of code paths (get request,
179
        //submit request) that will be timed.
180
        if (!insanity[0])
181
                strcpy(insanity, "sane");
182

    
183
        prefs->insanity = read_insanity(insanity);
184
        if (prefs->insanity < 0) {
185
                XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
186
                goto arg_fail;
187
        }
188

    
189
        /*
190
         * If we have a request other than read/write, we don't need to check
191
         * about size parameters, but only how many objects we want to affect
192
         */
193
        if (prefs->op != X_READ && prefs->op != X_WRITE) {
194

    
195
                /***************************\
196
                 * Check object parameters *
197
                \***************************/
198

    
199
                if (!total_objects[0]) {
200
                        XSEGLOG2(&lc, E,
201
                                        "Total number of objects needs to be supplied\n");
202
                        goto arg_fail;
203
                }
204
                prefs->to = str2num(total_objects);
205
                if (!prefs->to) {
206
                        XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
207
                        goto arg_fail;
208
                }
209

    
210
                //In this case, the maximum number of requests is the total number of
211
                //objects we will handle
212
                prefs->max_requests = prefs->to;
213
        } else {
214

    
215
                /*************************\
216
                 * Check size parameters *
217
                \*************************/
218

    
219
                //Block size (bs): Defaults to 4K.
220
                //It must be a number followed by one of these characters:
221
                //                                                [k|K|m|M|g|G]
222
                //If not, it will be considered as size in bytes.
223
                //Must be integer multiple of segment's page size (typically 4k).
224
                if (!block_size[0])
225
                        strcpy(block_size,"4k");
226

    
227
                prefs->bs = str2num(block_size);
228
                if (!prefs->bs) {
229
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
230
                        goto arg_fail;
231
                } else if (prefs->bs % xseg_page_size) {
232
                        XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
233
                        goto arg_fail;
234
                }
235

    
236
                //Total I/O size (ts): Must be supplied by user.
237
                //Must have the same format as "total size"
238
                //Must be integer multiple of "block size"
239
                if (!total_size[0]) {
240
                        XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
241
                        goto arg_fail;
242
                }
243

    
244
                prefs->ts = str2num(total_size);
245
                if (!prefs->ts) {
246
                        XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
247
                        goto arg_fail;
248
                } else if (prefs->ts % prefs->bs) {
249
                        XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
250
                        goto arg_fail;
251
                } else if (prefs->ts > xseg->segment_size) {
252
                        XSEGLOG2(&lc, E,
253
                                        "Total I/O size exceeds segment size\n", total_size);
254
                        goto arg_fail;
255
                }
256

    
257
                //Object size (os): Defaults to 4M.
258
                //Must have the same format as "total size"
259
                //Must be integer multiple of "block size"
260
                if (!object_size[0])
261
                        strcpy(object_size,"4M");
262

    
263
                prefs->os = str2num(object_size);
264
                if (!prefs->os) {
265
                        XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
266
                        goto arg_fail;
267
                } else if (prefs->os % prefs->bs) {
268
                        XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
269
                        goto arg_fail;
270
                }
271

    
272
                //In this case, the maximum number of requests is the number of blocks
273
                //we need to cover the total I/O size
274
                prefs->max_requests = prefs->ts / prefs->bs;
275
        }
276

    
277
        /*************************\
278
         * Check port parameters *
279
        \*************************/
280

    
281
        if (dst_port < 0){
282
                XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
283
                goto arg_fail;
284
        }
285

    
286
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
287
        prefs->dst_port = (xport) dst_port;
288

    
289
        /*********************************\
290
         * Create timers for all metrics *
291
        \*********************************/
292

    
293
        if (init_timer(&prefs->total_tm, TM_SANE))
294
                goto tm_fail;
295
        if (init_timer(&prefs->sub_tm, TM_MANIC))
296
                goto tm_fail;
297
        if (init_timer(&prefs->get_tm, TM_PARANOID))
298
                goto tm_fail;
299
        if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
300
                goto tm_fail;
301

    
302
        /********************************\
303
         * Customize struct peerd/prefs *
304
        \********************************/
305

    
306
        prefs->peer = peer;
307

    
308
        //The following function initializes the global_id, global_seed extern
309
        //variables.
310
        create_id(seed);
311

    
312
        if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
313
                prefs->lfsr = malloc(sizeof(struct lfsr));
314
                if (!prefs->lfsr) {
315
                        perror("malloc");
316
                        goto lfsr_fail;
317
                }
318
                //FIXME: handle better the seed passing than just giving UINT64_MAX
319
                if (lfsr_init(prefs->lfsr, prefs->max_requests, seed)) {
320
                        XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
321
                        goto lfsr_fail;
322
                }
323
        }
324

    
325
        peer->peerd_loop = custom_peerd_loop;
326
        peer->priv = (void *) prefs;
327
        return 0;
328

    
329
arg_fail:
330
        custom_peer_usage();
331
lfsr_fail:
332
        free(prefs->lfsr);
333
tm_fail:
334
        free(prefs->total_tm);
335
        free(prefs->sub_tm);
336
        free(prefs->get_tm);
337
        free(prefs->rec_tm);
338
        free(prefs);
339
        return -1;
340
}
341

    
342

    
343
static int send_request(struct peerd *peer, struct bench *prefs)
344
{
345
        struct xseg_request *req;
346
        struct xseg *xseg = peer->xseg;
347
        struct peer_req *pr;
348
        xport srcport = prefs->src_port;
349
        xport dstport = prefs->dst_port;
350
        xport p;
351

    
352
        int r;
353
        uint64_t new;
354
        uint64_t size = prefs->bs;
355

    
356
        //srcport and dstport must already be provided by the user.
357
        //returns struct xseg_request with basic initializations
358
        //XSEGLOG2(&lc, D, "Get new request\n");
359
        timer_start(prefs, prefs->get_tm);
360
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
361
        if (!req) {
362
                XSEGLOG2(&lc, W, "Cannot get request\n");
363
                return -1;
364
        }
365
        timer_stop(prefs, prefs->get_tm, NULL);
366

    
367
        //Allocate enough space for the data and the target's name
368
        //XSEGLOG2(&lc, D, "Prepare new request\n");
369
        r = xseg_prep_request(xseg, req, TARGETLEN, size);
370
        if (r < 0) {
371
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
372
                                TARGETLEN, (unsigned long long)size);
373
                goto put_xseg_request;
374
        }
375

    
376
        //Determine what the next target/chunk will be, based on I/O pattern
377
        new = determine_next(prefs);
378
        XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
379
        //Create a target of this format: "bench-<obj_no>"
380
        create_target(prefs, req, new);
381

    
382
        if (prefs->op == X_WRITE || prefs->op == X_READ) {
383
                req->size = size;
384
                //Calculate the chunk offset inside the object
385
                req->offset = (new * prefs->bs) % prefs->os;
386
                XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
387

    
388
                if (prefs->op == X_WRITE)
389
                        create_chunk(prefs, req, new);
390
        }
391

    
392
        req->op = prefs->op;
393

    
394
        //Measure this?
395
        //XSEGLOG2(&lc, D, "Allocate peer request\n");
396
        pr = alloc_peer_req(peer);
397
        if (!pr) {
398
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
399
                                peer->nr_ops - xq_count(&peer->free_reqs));
400
                goto put_xseg_request;
401
        }
402
        pr->peer = peer;
403
        pr->portno = srcport;
404
        pr->req = req;
405
        pr->priv = malloc(sizeof(struct timespec));
406
        if (!pr->priv) {
407
                perror("malloc");
408
                goto put_peer_request;
409
        }
410

    
411
        //XSEGLOG2(&lc, D, "Set request data\n");
412
        r = xseg_set_req_data(xseg, req, pr);
413
        if (r < 0) {
414
                XSEGLOG2(&lc, W, "Cannot set request data\n");
415
                goto put_peer_request;
416
        }
417

    
418
        /*
419
         * Start measuring receive time.
420
         * When we receive a request, we need to have its submission time to
421
         * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
422
         * QUESTION: Is this the fastest way?
423
         */
424
        timer_start(prefs, prefs->rec_tm);
425
        if (prefs->rec_tm->insanity <= prefs->insanity)
426
                memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
427

    
428
        //Submit the request from the source port to the target port
429
        //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
430
        timer_start(prefs, prefs->sub_tm);
431
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
432
        if (p == NoPort) {
433
                XSEGLOG2(&lc, W, "Cannot submit request\n");
434
                goto put_peer_request;
435
        }
436
        timer_stop(prefs, prefs->sub_tm, NULL);
437

    
438
        //Send SIGIO to the process that has bound this port to inform that
439
        //IO is possible
440
        r = xseg_signal(xseg, p);
441
        //if (r < 0)
442
        //        XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
443

    
444
        return 0;
445

    
446
put_peer_request:
447
        free(pr->priv);
448
        free_peer_req(peer, pr);
449
put_xseg_request:
450
        if (xseg_put_request(xseg, req, srcport))
451
                XSEGLOG2(&lc, W, "Cannot put request\n");
452
        return -1;
453
}
454

    
455
/*
456
 * This function substitutes the default generic_peerd_loop of peer.c.
457
 * It's plugged to struct peerd at custom peer's initialisation
458
 */
459
int custom_peerd_loop(void *arg)
460
{
461
#ifdef MT
462
        struct thread *t = (struct thread *) arg;
463
        struct peerd *peer = t->peer;
464
        char *id = t->arg;
465
#else
466
        struct peerd *peer = (struct peerd *) arg;
467
        char id[4] = {'P','e','e','r'};
468
#endif
469
        struct xseg *xseg = peer->xseg;
470
        struct bench *prefs = peer->priv;
471
        xport portno_start = peer->portno_start;
472
        xport portno_end = peer->portno_end;
473
        uint64_t threshold=1000/(1 + portno_end - portno_start);
474
        pid_t pid = syscall(SYS_gettid);
475
        int r;
476
        uint64_t loops;
477

    
478
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
479
        xseg_init_local_signal(xseg, peer->portno_start);
480

    
481
        timer_start(prefs, prefs->total_tm);
482
send_request:
483
        while (!isTerminate()) {
484
#ifdef MT
485
                if (t->func) {
486
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
487
                        xseg_cancel_wait(xseg, peer->portno_start);
488
                        t->func(t->arg);
489
                        t->func = NULL;
490
                        t->arg = NULL;
491
                        continue;
492
                }
493
#endif
494
                while (CAN_SEND_REQUEST(prefs)) {
495
                        xseg_cancel_wait(xseg, peer->portno_start);
496
                        XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
497
                                        prefs->sub_tm->completed - prefs->rec_tm->completed,
498
                                        prefs->iodepth, prefs->sub_tm->completed,
499
                                        prefs->max_requests);
500
                        XSEGLOG2(&lc, D, "Start sending new request\n");
501
                        r = send_request(peer, prefs);
502
                        if (r < 0)
503
                                break;
504
                }
505
                //Heart of peerd_loop. This loop is common for everyone.
506
                for (loops = threshold; loops > 0; loops--) {
507
                        if (loops == 1)
508
                                xseg_prepare_wait(xseg, peer->portno_start);
509

    
510
                        if (check_ports(peer)) {
511
                                //If an old request has just been acked, the most sensible
512
                                //thing to do is to immediately send a new one
513
                                if (prefs->rec_tm->completed < prefs->max_requests)
514
                                        goto send_request;
515
                                else
516
                                        return 0;
517
                        }
518
                }
519
                //struct xseg_port *port = xseg_get_port(xseg, portno_start);
520
                //struct xq *q;
521
                //q = XPTR_TAKE(port->request_queue, xseg->segment);
522
                //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
523
                //                id, xq_count(q));
524
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
525
#ifdef ST_THREADS
526
                if (ta){
527
                        st_sleep(0);
528
                        continue;
529
                }
530
#endif
531
                xseg_wait_signal(xseg, 10000000UL);
532
                xseg_cancel_wait(xseg, peer->portno_start);
533
                XSEGLOG2(&lc, I, "%s woke up\n", id);
534
        }
535

    
536
        XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
537
                        xq_count(&peer->free_reqs), peer->nr_ops);
538
        return 0;
539
}
540

    
541
void custom_peer_finalize(struct peerd *peer)
542
{
543
        struct bench *prefs = peer->priv;
544
        //TODO: Measure mean time, standard variation
545

    
546
        if (!prefs->total_tm->completed)
547
                timer_stop(prefs, prefs->total_tm, NULL);
548

    
549
        print_stats(prefs);
550
        print_res(prefs, prefs->total_tm, "Total Requests");
551
        return;
552
}
553

    
554

    
555
static void handle_received(struct peerd *peer, struct peer_req *pr)
556
{
557
        //FIXME: handle null pointer
558
        struct bench *prefs = peer->priv;
559
        struct timer *rec = prefs->rec_tm;
560

    
561
        if (!pr->req) {
562
                //This is a serious error, so we must stop
563
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
564
                terminated++;
565
                return;
566
        }
567

    
568
        if (!pr->priv) {
569
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
570
                return;
571
        }
572

    
573
        timer_stop(prefs, rec, pr->priv);
574

    
575
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
576
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");
577

    
578
        //QUESTION, can't we just keep the malloced memory for future use?
579
        free(pr->priv);
580
        free_peer_req(peer, pr);
581
}
582

    
583
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
584
                enum dispatch_reason reason)
585
{
586
        switch (reason) {
587
                case dispatch_accept:
588
                        //This is wrong, benchmarking peer should not accept requests,
589
                        //only receive them.
590
                        XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
591
                        complete(peer, pr);
592
                        break;
593
                case dispatch_receive:
594
                        handle_received(peer, pr);
595
                        break;
596
                default:
597
                        fail(peer, pr);
598
        }
599
        return 0;
600
}