Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / bench-xseg.c @ e252784d

History | View | Annotate | Download (16.8 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48
#include <limits.h>
49

    
50
char global_id[IDLEN];
51
uint64_t global_seed;
52

    
53
/*
54
 * This function checks two things:
55
 * a) If in-flight requests are less than given iodepth
56
 * b) If we have submitted al of the requests
57
 */
58
#define CAN_SEND_REQUEST(prefs)        \
59
        prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth &&        \
60
        prefs->sub_tm->completed < prefs->max_requests        \
61

    
62
void custom_peer_usage()
63
{
64
        fprintf(stderr, "Custom peer options: \n"
65
                        "  --------------------------------------------\n"
66
                        "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67
                        "    --pattern | None    | I/O pattern [seq|rand]\n"
68
                        "    -to       | None    | Total objects (not for read/write)\n"
69
                        "    -ts       | None    | Total I/O size\n"
70
                        "    -os       | 4M      | Object size\n"
71
                        "    -bs       | 4k      | Block size\n"
72
                        "    -dp       | None    | Destination port\n"
73
                        "    --iodepth | 1       | Number of in-flight I/O requests\n"
74
                        "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75
                        "    --seed    | None    | Inititalize LFSR and target names\n"
76
                        "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77
                        "              |         |     [sane|eccentric|manic|paranoid]\n"
78
                        "\n");
79
}
80

    
81
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82
{
83
        struct bench *prefs;
84
        char total_objects[MAX_ARG_LEN + 1];
85
        char total_size[MAX_ARG_LEN + 1];
86
        char object_size[MAX_ARG_LEN + 1];
87
        char block_size[MAX_ARG_LEN + 1];
88
        char op[MAX_ARG_LEN + 1];
89
        char pattern[MAX_ARG_LEN + 1];
90
        char insanity[MAX_ARG_LEN + 1];
91
        struct xseg *xseg = peer->xseg;
92
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
93
        long iodepth = -1;
94
        long dst_port = -1;
95
        int r;
96

    
97
        op[0] = 0;
98
        pattern[0] = 0;
99
        total_objects[0] = 0;
100
        total_size[0] = 0;
101
        block_size[0] = 0;
102
        object_size[0] = 0;
103
        insanity[0] = 0;
104

    
105
#ifdef MT
106
        for (i = 0; i < nr_threads; i++) {
107
                prefs = peer->thread[i]->priv;
108
                prefs = malloc(sizeof(struct bench));
109
                if (!prefs) {
110
                        perror("malloc");
111
                        return -1;
112
                }
113
        }
114
#endif
115
        prefs = malloc(sizeof(struct bench));
116
        if (!prefs) {
117
                perror("malloc");
118
                return -1;
119
        }
120
        prefs->flags = 0;
121

    
122
        //Begin reading the benchmark-specific arguments
123
        BEGIN_READ_ARGS(argc, argv);
124
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
125
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
126
        READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
127
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
128
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
129
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
130
        READ_ARG_ULONG("--iodepth", iodepth);
131
        READ_ARG_ULONG("-dp", dst_port);
132
        READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
133
        END_READ_ARGS();
134

    
135
        /*****************************\
136
         * Check I/O type parameters *
137
        \*****************************/
138

    
139
        //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
140
        //The I/O pattern of thesee operations can be either synchronous (sync) or
141
        //random (rand)
142
        if (!op[0]) {
143
                XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
144
                goto arg_fail;
145
        }
146
        r = read_op(op);
147
        if (r < 0) {
148
                XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
149
                goto arg_fail;
150
        }
151
        prefs->op = r;
152

    
153
        if (!pattern[0]) {
154
                XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
155
                goto arg_fail;
156
        }
157
        r = read_pattern(pattern);
158
        if (r < 0) {
159
                XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
160
                goto arg_fail;
161
        }
162
        prefs->flags |= (uint8_t)r;
163

    
164
        //Defailt iodepth value is 1
165
        if (iodepth < -1)
166
                prefs->iodepth = 1;
167
        else
168
                prefs->iodepth = iodepth;
169

    
170
        /**************************\
171
         * Check timer parameters *
172
        \**************************/
173

    
174
        //Most of the times, not all timers need to be used.
175
        //We can choose which timers will be used by adjusting the "insanity"
176
        //level of the benchmark i.e. the obscurity of code paths (get request,
177
        //submit request) that will be timed.
178
        if (!insanity[0])
179
                strcpy(insanity, "sane");
180

    
181
        prefs->insanity = read_insanity(insanity);
182
        if (prefs->insanity < 0) {
183
                XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
184
                goto arg_fail;
185
        }
186

    
187
        /*
188
         * If we have a request other than read/write, we don't need to check
189
         * about size parameters, but only how many objects we want to affect
190
         */
191
        if (prefs->op != X_READ && prefs->op != X_WRITE) {
192

    
193
                /***************************\
194
                 * Check object parameters *
195
                \***************************/
196

    
197
                if (!total_objects[0]) {
198
                        XSEGLOG2(&lc, E,
199
                                        "Total number of objects needs to be supplied\n");
200
                        goto arg_fail;
201
                }
202
                prefs->to = str2num(total_objects);
203
                if (!prefs->to) {
204
                        XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
205
                        goto arg_fail;
206
                }
207

    
208
                //In this case, the maximum number of requests is the total number of
209
                //objects we will handle
210
                prefs->max_requests = prefs->to;
211
        } else {
212

    
213
                /*************************\
214
                 * Check size parameters *
215
                \*************************/
216

    
217
                //Block size (bs): Defaults to 4K.
218
                //It must be a number followed by one of these characters:
219
                //                                                [k|K|m|M|g|G]
220
                //If not, it will be considered as size in bytes.
221
                //Must be integer multiple of segment's page size (typically 4k).
222
                if (!block_size[0])
223
                        strcpy(block_size,"4k");
224

    
225
                prefs->bs = str2num(block_size);
226
                if (!prefs->bs) {
227
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
228
                        goto arg_fail;
229
                } else if (prefs->bs % xseg_page_size) {
230
                        XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
231
                        goto arg_fail;
232
                }
233

    
234
                //Total I/O size (ts): Must be supplied by user.
235
                //Must have the same format as "total size"
236
                //Must be integer multiple of "block size"
237
                if (!total_size[0]) {
238
                        XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
239
                        goto arg_fail;
240
                }
241

    
242
                prefs->ts = str2num(total_size);
243
                if (!prefs->ts) {
244
                        XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
245
                        goto arg_fail;
246
                } else if (prefs->ts % prefs->bs) {
247
                        XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
248
                        goto arg_fail;
249
                } else if (prefs->ts > xseg->segment_size) {
250
                        XSEGLOG2(&lc, E,
251
                                        "Total I/O size exceeds segment size\n", total_size);
252
                        goto arg_fail;
253
                }
254

    
255
                //Object size (os): Defaults to 4M.
256
                //Must have the same format as "total size"
257
                //Must be integer multiple of "block size"
258
                if (!object_size[0])
259
                        strcpy(object_size,"4M");
260

    
261
                prefs->os = str2num(object_size);
262
                if (!prefs->os) {
263
                        XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
264
                        goto arg_fail;
265
                } else if (prefs->os % prefs->bs) {
266
                        XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
267
                        goto arg_fail;
268
                }
269

    
270
                //In this case, the maximum number of requests is the number of blocks
271
                //we need to cover the total I/O size
272
                prefs->max_requests = prefs->ts / prefs->bs;
273
        }
274

    
275
        /*************************\
276
         * Check port parameters *
277
        \*************************/
278

    
279
        if (dst_port < 0){
280
                XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
281
                goto arg_fail;
282
        }
283

    
284
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
285
        prefs->dst_port = (xport) dst_port;
286

    
287
        /*********************************\
288
         * Create timers for all metrics *
289
        \*********************************/
290

    
291
        if (init_timer(&prefs->total_tm, TM_SANE))
292
                goto tm_fail;
293
        if (init_timer(&prefs->sub_tm, TM_MANIC))
294
                goto tm_fail;
295
        if (init_timer(&prefs->get_tm, TM_PARANOID))
296
                goto tm_fail;
297
        if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
298
                goto tm_fail;
299

    
300
        /********************************\
301
         * Customize struct peerd/prefs *
302
        \********************************/
303

    
304
        prefs->peer = peer;
305

    
306
        //The following function initializes the global_id, global_seed extern
307
        //variables.
308
        create_id();
309

    
310
        if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
311
                prefs->lfsr = malloc(sizeof(struct lfsr));
312
                if (!prefs->lfsr) {
313
                        perror("malloc");
314
                        goto lfsr_fail;
315
                }
316
                //FIXME: Give a name to max requests, not just prefs->ts / prefs->bs
317
                //FIXME: handle better the seed passing than just giving UINT64_MAX
318
                if (lfsr_init(prefs->lfsr, prefs->max_requests, UINT64_MAX)) {
319
                        XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
320
                        goto lfsr_fail;
321
                }
322
        }
323

    
324
        peer->peerd_loop = custom_peerd_loop;
325
        peer->priv = (void *) prefs;
326
        return 0;
327

    
328
arg_fail:
329
        custom_peer_usage();
330
lfsr_fail:
331
        free(prefs->lfsr);
332
tm_fail:
333
        free(prefs->total_tm);
334
        free(prefs->sub_tm);
335
        free(prefs->get_tm);
336
        free(prefs->rec_tm);
337
        free(prefs);
338
        return -1;
339
}
340

    
341

    
342
static int send_request(struct peerd *peer, struct bench *prefs)
343
{
344
        struct xseg_request *req;
345
        struct xseg *xseg = peer->xseg;
346
        struct peer_req *pr;
347
        xport srcport = prefs->src_port;
348
        xport dstport = prefs->dst_port;
349
        xport p;
350

    
351
        int r;
352
        uint64_t new;
353
        uint64_t size = prefs->bs;
354

    
355
        //srcport and dstport must already be provided by the user.
356
        //returns struct xseg_request with basic initializations
357
        //XSEGLOG2(&lc, D, "Get new request\n");
358
        timer_start(prefs, prefs->get_tm);
359
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
360
        if (!req) {
361
                XSEGLOG2(&lc, W, "Cannot get request\n");
362
                return -1;
363
        }
364
        timer_stop(prefs, prefs->get_tm, NULL);
365

    
366
        //Allocate enough space for the data and the target's name
367
        //XSEGLOG2(&lc, D, "Prepare new request\n");
368
        r = xseg_prep_request(xseg, req, TARGETLEN, size);
369
        if (r < 0) {
370
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
371
                                TARGETLEN, (unsigned long long)size);
372
                goto put_xseg_request;
373
        }
374

    
375
        //Determine what the next target/chunk will be, based on I/O pattern
376
        new = determine_next(prefs);
377
        XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
378
        //Create a target of this format: "bench-<obj_no>"
379
        create_target(prefs, req, new);
380

    
381
        if (prefs->op == X_WRITE || prefs->op == X_READ) {
382
                req->size = size;
383
                //Calculate the chunk offset inside the object
384
                req->offset = (new * prefs->bs) % prefs->os;
385
                XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
386

    
387
                if (prefs->op == X_WRITE)
388
                        create_chunk(prefs, req, new);
389
        }
390

    
391
        req->op = prefs->op;
392

    
393
        //Measure this?
394
        //XSEGLOG2(&lc, D, "Allocate peer request\n");
395
        pr = alloc_peer_req(peer);
396
        if (!pr) {
397
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
398
                                peer->nr_ops - xq_count(&peer->free_reqs));
399
                goto put_xseg_request;
400
        }
401
        pr->peer = peer;
402
        pr->portno = srcport;
403
        pr->req = req;
404
        pr->priv = malloc(sizeof(struct timespec));
405
        if (!pr->priv) {
406
                perror("malloc");
407
                goto put_peer_request;
408
        }
409

    
410
        //XSEGLOG2(&lc, D, "Set request data\n");
411
        r = xseg_set_req_data(xseg, req, pr);
412
        if (r < 0) {
413
                XSEGLOG2(&lc, W, "Cannot set request data\n");
414
                goto put_peer_request;
415
        }
416

    
417
        /*
418
         * Start measuring receive time.
419
         * When we receive a request, we need to have its submission time to
420
         * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
421
         * QUESTION: Is this the fastest way?
422
         */
423
        timer_start(prefs, prefs->rec_tm);
424
        if (prefs->rec_tm->insanity <= prefs->insanity)
425
                memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
426

    
427
        //Submit the request from the source port to the target port
428
        //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
429
        timer_start(prefs, prefs->sub_tm);
430
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
431
        if (p == NoPort) {
432
                XSEGLOG2(&lc, W, "Cannot submit request\n");
433
                goto put_peer_request;
434
        }
435
        timer_stop(prefs, prefs->sub_tm, NULL);
436

    
437
        //Send SIGIO to the process that has bound this port to inform that
438
        //IO is possible
439
        r = xseg_signal(xseg, p);
440
        if (r < 0)
441
                XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
442

    
443
        return 0;
444

    
445
put_peer_request:
446
        free(pr->priv);
447
        free_peer_req(peer, pr);
448
put_xseg_request:
449
        if (xseg_put_request(xseg, req, srcport))
450
                XSEGLOG2(&lc, W, "Cannot put request\n");
451
        return -1;
452
}
453

    
454
/*
455
 * This function substitutes the default generic_peerd_loop of peer.c.
456
 * It's plugged to struct peerd at custom peer's initialisation
457
 */
458
int custom_peerd_loop(void *arg)
459
{
460
#ifdef MT
461
        struct thread *t = (struct thread *) arg;
462
        struct peerd *peer = t->peer;
463
        char *id = t->arg;
464
#else
465
        struct peerd *peer = (struct peerd *) arg;
466
        char id[4] = {'P','e','e','r'};
467
#endif
468
        struct xseg *xseg = peer->xseg;
469
        struct bench *prefs = peer->priv;
470
        xport portno_start = peer->portno_start;
471
        xport portno_end = peer->portno_end;
472
        uint64_t threshold=1000/(1 + portno_end - portno_start);
473
        pid_t pid = syscall(SYS_gettid);
474
        int r;
475
        uint64_t loops;
476

    
477
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
478
        xseg_init_local_signal(xseg, peer->portno_start);
479

    
480
        timer_start(prefs, prefs->total_tm);
481
send_request:
482
        while (!isTerminate()) {
483
#ifdef MT
484
                if (t->func) {
485
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
486
                        xseg_cancel_wait(xseg, peer->portno_start);
487
                        t->func(t->arg);
488
                        t->func = NULL;
489
                        t->arg = NULL;
490
                        continue;
491
                }
492
#endif
493
                while (CAN_SEND_REQUEST(prefs)) {
494
                        xseg_cancel_wait(xseg, peer->portno_start);
495
                        XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
496
                                        prefs->sub_tm->completed - prefs->rec_tm->completed,
497
                                        prefs->iodepth, prefs->sub_tm->completed,
498
                                        prefs->max_requests);
499
                        XSEGLOG2(&lc, D, "Start sending new request\n");
500
                        r = send_request(peer, prefs);
501
                        if (r < 0)
502
                                break;
503
                }
504
                //Heart of peerd_loop. This loop is common for everyone.
505
                for (loops = threshold; loops > 0; loops--) {
506
                        if (loops == 1)
507
                                xseg_prepare_wait(xseg, peer->portno_start);
508

    
509
                        if (check_ports(peer)) {
510
                                //If an old request has just been acked, the most sensible
511
                                //thing to do is to immediately send a new one
512
                                if (prefs->rec_tm->completed < prefs->max_requests)
513
                                        goto send_request;
514
                                else
515
                                        return 0;
516
                        }
517
                }
518
                //struct xseg_port *port = xseg_get_port(xseg, portno_start);
519
                //struct xq *q;
520
                //q = XPTR_TAKE(port->request_queue, xseg->segment);
521
                //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
522
                //                id, xq_count(q));
523
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
524
#ifdef ST_THREADS
525
                if (ta){
526
                        st_sleep(0);
527
                        continue;
528
                }
529
#endif
530
                xseg_wait_signal(xseg, 10000000UL);
531
                xseg_cancel_wait(xseg, peer->portno_start);
532
                XSEGLOG2(&lc, I, "%s woke up\n", id);
533
        }
534

    
535
        XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
536
                        xq_count(&peer->free_reqs), peer->nr_ops);
537
        return 0;
538
}
539

    
540
void custom_peer_finalize(struct peerd *peer)
541
{
542
        struct bench *prefs = peer->priv;
543
        //TODO: Measure mean time, standard variation
544

    
545
        if (!prefs->total_tm->completed)
546
                timer_stop(prefs, prefs->total_tm, NULL);
547

    
548
        print_stats(prefs);
549
        print_res(prefs, prefs->total_tm, "Total Requests");
550
        return;
551
}
552

    
553

    
554
static void handle_received(struct peerd *peer, struct peer_req *pr)
555
{
556
        //FIXME: handle null pointer
557
        struct bench *prefs = peer->priv;
558
        struct timer *rec = prefs->rec_tm;
559

    
560
        if (!pr->req) {
561
                //This is a serious error, so we must stop
562
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
563
                terminated++;
564
                return;
565
        }
566

    
567
        if (!pr->priv) {
568
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
569
                return;
570
        }
571

    
572
        timer_stop(prefs, rec, pr->priv);
573

    
574
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
575
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");
576

    
577
        //QUESTION, can't we just keep the malloced memory for future use?
578
        free(pr->priv);
579
        free_peer_req(peer, pr);
580
}
581

    
582
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
583
                enum dispatch_reason reason)
584
{
585
        switch (reason) {
586
                case dispatch_accept:
587
                        //This is wrong, benchmarking peer should not accept requests,
588
                        //only receive them.
589
                        XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
590
                        complete(peer, pr);
591
                        break;
592
                case dispatch_receive:
593
                        handle_received(peer, pr);
594
                        break;
595
                default:
596
                        fail(peer, pr);
597
        }
598
        return 0;
599
}