Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / bench-xseg.c @ 0ef9c7ee

History | View | Annotate | Download (14.6 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48

    
49
struct timespec delay = {0, 4000000};
50

    
51
void custom_peer_usage()
52
{
53
        fprintf(stderr, "Custom peer options: \n"
54
                "  --------------------------------------------\n"
55
                "    -op       | None    | XSEG operation [read|write|info|delete]\n"
56
                "    --pattern | None    | I/O pattern [sync|rand]\n"
57
                "    -ts       | None    | Total I/O size\n"
58
                "    -os       | 4M      | Object size\n"
59
                "    -bs       | 4k      | Block size\n"
60
                "    -dp       | None    | Destination port\n"
61
                "    --iodepth | 1       | Number of in-flight I/O requests\n"
62
                "\n");
63
}
64

    
65
/*
66
 * Convert string to size in bytes.
67
 * If syntax is invalid, return 0. Values such as zero and non-integer
68
 * multiples of segment's page size should not be accepted.
69
 */
70
static uint64_t str2num(char *str)
71
{
72
        char *unit;
73
        uint64_t num;
74

    
75
        num = strtoll(str, &unit, 10);
76
        if (strlen(unit) > 1) //Invalid syntax
77
                return 0;
78
        else if (strlen(unit) < 1) //Plain number in bytes
79
                return num;
80

    
81
        switch (*unit) {
82
                case 'g':
83
                case 'G':
84
                        num *= 1024;
85
                case 'm':
86
                case 'M':
87
                        num *= 1024;
88
                case 'k':
89
                case 'K':
90
                        num *= 1024;
91
                        break;
92
                default:
93
                        num = 0;
94
        }
95
        return num;
96
}
97

    
98
int read_op(char *op) {
99
        if (strcmp(op, "read") == 0)
100
                return X_READ;
101
        if (strcmp(op, "write") == 0)
102
                return X_WRITE;
103
        if (strcmp(op, "info") == 0)
104
                return X_INFO;
105
        if (strcmp(op, "delete") == 0)
106
                return X_DELETE;
107
        return -1;
108
}
109

    
110
int read_pattern(char *pattern) {
111
        if (strcmp(pattern, "sync") == 0)
112
                return IO_SYNC;
113
        if (strcmp(pattern, "rand") == 0)
114
                return IO_RAND;
115
        return -1;
116
}
117

    
118
static void print_res(struct tm_result res, char *type)
119
{
120
        printf("\n");
121
        printf("      %s\n", type);
122
        printf("================================\n");
123
        printf("       |-s-||-ms-|-us-|-ns-|\n");
124
        printf("Time:   %03lu. %03lu  %03lu  %03lu\n",
125
                        res.s, res.ms, res.us, res.ns);
126
}
127

    
128
/*
129
 * Seperates a timespec struct in seconds, msec, usec, nsec
130
 */
131
static void separate_by_order(struct timespec src, struct tm_result *res)
132
{
133
        res->ns = src.tv_nsec % 1000;
134
        src.tv_nsec /= 1000;
135
        res->us = src.tv_nsec % 1000;
136
        res->ms = src.tv_nsec / 1000;
137
        res->s = src.tv_sec;
138
}
139

    
140
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
141
{
142
        struct bench *prefs;
143
        char total_size[MAX_ARG_LEN + 1];
144
        char object_size[MAX_ARG_LEN + 1];
145
        char block_size[MAX_ARG_LEN + 1];
146
        char op[MAX_ARG_LEN + 1];
147
        char pattern[MAX_ARG_LEN + 1];
148
        struct xseg *xseg = peer->xseg;
149
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
150
        long dst_port = -1;
151
        int r;
152

    
153
        op[0] = 0;
154
        pattern[0] = 0;
155
        total_size[0] = 0;
156
        block_size[0] = 0;
157
        object_size[0] = 0;
158

    
159
#ifdef MT
160
        for (i = 0; i < nr_threads; i++) {
161
                prefs = peer->thread[i]->priv;
162
                prefs = malloc(sizeof(struct bench));
163
                if (!prefs) {
164
                        perror("malloc");
165
                        return -1;
166
                }
167
        }
168
#endif
169
        prefs = malloc(sizeof(struct bench));
170
        if (!prefs) {
171
                perror("malloc");
172
                return -1;
173
        }
174
        prefs->flags = 0;
175

    
176
        //Begin reading the benchmark-specific arguments
177
        BEGIN_READ_ARGS(argc, argv);
178
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
179
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
180
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
181
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
182
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
183
        READ_ARG_ULONG("--iodepth", prefs->iodepth);
184
        READ_ARG_ULONG("-dp", dst_port);
185
        END_READ_ARGS();
186

    
187
        /*****************************\
188
         * Check I/O type parameters *
189
        \*****************************/
190

    
191
        if (!op[0]) {
192
                XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
193
                goto arg_fail;
194
        }
195
        r = read_op(op);
196
        if (r < 0) {
197
                XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
198
                goto arg_fail;
199
        }
200
        prefs->op = r;
201

    
202
        if (!pattern[0]) {
203
                XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
204
                goto arg_fail;
205
        }
206
        r = read_pattern(pattern);
207
        if (r < 0) {
208
                XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
209
                goto arg_fail;
210
        }
211
        prefs->flags |= r << PATTERN_FLAG;
212

    
213
        /*************************
214
         * Check size parameters *
215
         *************************/
216

    
217
        //Block size (bs): Defaults to 4K.
218
        //It must be a number followed by one of these characters: [k|K|m|M|g|G].
219
        //If not, it will be considered as size in bytes.
220
        //Must be integer multiple of segment's page size (typically 4k).
221
        if (!block_size[0])
222
                strcpy(block_size,"4k");
223

    
224
        if (!prefs->iodepth)
225
                prefs->iodepth = 1;
226

    
227
        prefs->bs = str2num(block_size);
228
        if (!prefs->bs) {
229
                XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
230
                goto arg_fail;
231
        } else if (prefs->bs % xseg_page_size) {
232
                XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
233
                goto arg_fail;
234
        }
235

    
236
        //Total I/O size (ts): Must be supplied by user.
237
        //Must have the same format as "total size"
238
        //Must be integer multiple of "block size"
239
        if (!total_size[0]) {
240
                XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
241
                goto arg_fail;
242
        }
243

    
244
        prefs->ts = str2num(total_size);
245
        if (!prefs->ts) {
246
                XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
247
                goto arg_fail;
248
        } else if (prefs->ts % prefs->bs) {
249
                XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
250
                goto arg_fail;
251
        } else if (prefs->ts > xseg->segment_size) {
252
                XSEGLOG2(&lc, E, "Total I/O size exceeds segment size\n", total_size);
253
                goto arg_fail;
254
        }
255

    
256
        //Object size (os): Defaults to 4M.
257
        //Must have the same format as "total size"
258
        //Must be integer multiple of "block size"
259
        if (!object_size[0])
260
                strcpy(object_size,"4M");
261

    
262
        prefs->os = str2num(object_size);
263
        if (!prefs->os) {
264
                XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
265
                goto arg_fail;
266
        } else if (prefs->os % prefs->bs) {
267
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
268
                goto arg_fail;
269
        }
270

    
271
        /*************************
272
         * Check port parameters *
273
         *************************/
274

    
275
        if (dst_port < 0){
276
                XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
277
                goto arg_fail;
278
        }
279

    
280
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
281
        prefs->dst_port = (xport) dst_port;
282

    
283
        /*********************************
284
         * Create timers for all metrics *
285
         *********************************/
286

    
287
        if (init_timer(&prefs->total_tm, TM_SANE))
288
                goto tm_fail;
289
        if (init_timer(&prefs->sub_tm, TM_MANIC))
290
                goto tm_fail;
291
        if (init_timer(&prefs->get_tm, TM_PARANOID))
292
                goto tm_fail;
293
        if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
294
                goto tm_fail;
295

    
296
        /**************************
297
         * Customize struct peerd *
298
         **************************/
299

    
300
        peer->peerd_loop = custom_peerd_loop;
301
        peer->priv = (void *) prefs;
302
        return 0;
303

    
304
arg_fail:
305
        custom_peer_usage();
306
tm_fail:
307
        free(prefs->total_tm);
308
        free(prefs->sub_tm);
309
        free(prefs->get_tm);
310
        free(prefs->rec_tm);
311
        free(prefs);
312
        return -1;
313
}
314

    
315

    
316
static int send_request(struct peerd *peer, struct bench *prefs)
317
{
318
        struct xseg_request *req;
319
        struct xseg *xseg = peer->xseg;
320
        struct peer_req *pr;
321
        xport srcport = prefs->src_port;
322
        xport dstport = prefs->dst_port;
323
        xport p;
324

    
325
        int r;
326
        uint32_t targetlen = 10; //FIXME: handle it better
327
        uint64_t size = prefs->bs;
328

    
329
        //srcport and dstport must already be provided by the user.
330
        //returns struct xseg_request with basic initializations
331
        XSEGLOG2(&lc, D, "Get request %lu\n", prefs->get_tm->completed);
332
        timer_start(prefs->get_tm);
333
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
334
        if (!req) {
335
                XSEGLOG2(&lc, W, "Cannot get request\n");
336
                return -1;
337
        }
338
        timer_stop(prefs->get_tm, NULL);
339

    
340
        //Allocate enough space for the data and the target's name
341
        XSEGLOG2(&lc, D, "Prepare request %lu\n", prefs->sub_tm->completed);
342
        r = xseg_prep_request(xseg, req, targetlen, size);
343
        if (r < 0) {
344
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
345
                        (unsigned long)targetlen, (unsigned long long)size);
346
                goto put_xseg_request;
347
        }
348

    
349
#if 0
350
        //TODO: allow strcpy, memcpy
351
        //Copy target's name to the newly allocated space
352
        req_target = xseg_get_target(xseg, req);
353
        strncpy(req_target, target, targetlen);
354

355
        //Copy data buffer to the newly allocated space
356
        req_data = xseg_get_data(xseg, req);
357
        memcpy(req_data, buf, size);
358
        req->offset = offset;
359
        req->size = size;
360
        req->op = X_WRITE;
361
#endif
362

    
363
        //Measure this?
364
        XSEGLOG2(&lc, D, "Allocate peer request\n");
365
        pr = alloc_peer_req(peer);
366
        if (!pr) {
367
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
368
                                peer->nr_ops - xq_count(&peer->free_reqs));
369
                goto put_xseg_request;
370
        }
371
        pr->peer = peer;
372
        pr->portno = srcport;
373
        pr->req = req;
374
        pr->priv = malloc(sizeof(struct timespec));
375

    
376
        XSEGLOG2(&lc, D, "Set request data\n");
377
        r = xseg_set_req_data(xseg, req, pr);
378
        if (r<0) {
379
                XSEGLOG2(&lc, W, "Cannot set request data\n");
380
                goto put_peer_request;
381
        }
382

    
383
        /*
384
         * Start measuring receive time.
385
         * When we receive a request, we need to have its submission time to
386
         * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
387
         * QUESTION: Is this the fastest way?
388
         */
389
        timer_start(prefs->rec_tm);
390
        memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
391

    
392
        //Submit the request from the source port to the target port
393
        XSEGLOG2(&lc, D, "Submit request %lu\n", prefs->sub_tm->completed);
394
        //QUESTION: Can't we just use the submision time calculated previously?
395
        timer_start(prefs->sub_tm);
396
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
397
        if (p == NoPort) {
398
                XSEGLOG2(&lc, W, "Cannot submit request\n");
399
                goto put_peer_request;
400
        }
401
        timer_stop(prefs->sub_tm, NULL);
402

    
403
        //Send SIGIO to the process that has binded this port to inform that
404
        //IO is possible
405
        xseg_signal(xseg, p);
406

    
407
        return 0;
408

    
409
put_peer_request:
410
        free(pr->priv);
411
        free_peer_req(peer, pr);
412
put_xseg_request:
413
        if (xseg_put_request(xseg, req, srcport))
414
                XSEGLOG2(&lc, W, "Cannot put request\n");
415
        return -1;
416
}
417

    
418
/*
419
 * This function substitutes the default generic_peerd_loop of peer.c.
420
 * It's plugged to struct peerd at custom peer's initialisation
421
 */
422
int custom_peerd_loop(void *arg)
423
{
424
#ifdef MT
425
        struct thread *t = (struct thread *) arg;
426
        struct peerd *peer = t->peer;
427
        char *id = t->arg;
428
#else
429
        struct peerd *peer = (struct peerd *) arg;
430
        char id[4] = {'P','e','e','r'};
431
#endif
432
        struct xseg *xseg = peer->xseg;
433
        struct bench *prefs = peer->priv;
434
        xport portno_start = peer->portno_start;
435
        xport portno_end = peer->portno_end;
436
        uint64_t threshold=1000/(1 + portno_end - portno_start);
437
        pid_t pid =syscall(SYS_gettid);
438
        int r;
439

    
440
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
441
        xseg_init_local_signal(xseg, peer->portno_start);
442
        uint64_t loops;
443
        unsigned long max_completed = prefs->ts / prefs->bs;
444

    
445
        timer_start(prefs->total_tm);
446
        while (!isTerminate()) {
447
#ifdef MT
448
                if (t->func) {
449
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
450
                        xseg_cancel_wait(xseg, peer->portno_start);
451
                        t->func(t->arg);
452
                        t->func = NULL;
453
                        t->arg = NULL;
454
                        continue;
455
                }
456
#endif
457
send_request:
458
                while (prefs->sub_tm->completed - prefs->rec_tm->completed <
459
                                prefs->iodepth && prefs->sub_tm->completed < max_completed){
460
                        XSEGLOG2(&lc, D, "Start sending new request\n");
461
                        r = send_request(peer, prefs);
462
                        if (r<0)
463
                                break;
464
                }
465
                //Heart of peerd_loop. This loop is common for everyone.
466
                for (loops = threshold; loops > 0; loops--) {
467
                        if (check_ports(peer)) {
468
                                if (max_completed == prefs->rec_tm->completed)
469
                                        return 0;
470
                                else
471
                                        //If an old request has just been acked, the most sensible
472
                                        //thing to do is to immediately send a new one
473
                                        goto send_request;
474
                        }
475
                }
476
                xseg_prepare_wait(xseg, peer->portno_start);
477
#ifdef ST_THREADS
478
                if (ta){
479
                        st_sleep(0);
480
                        continue;
481
                }
482
#endif
483
                XSEGLOG2(&lc, I, "%s goes to sleep\n",id);
484
                xseg_wait_signal(xseg, 10000000UL);
485
                xseg_cancel_wait(xseg, peer->portno_start);
486
                XSEGLOG2(&lc, I, "%s woke up\n", id);
487
        }
488

    
489
        XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
490
                        xq_count(&peer->free_reqs), peer->nr_ops);
491
        return 0;
492
}
493

    
494
void custom_peer_finalize(struct peerd *peer)
495
{
496
        struct bench *prefs = peer->priv;
497
        //TODO: Measure mean time, standard variation
498
        struct tm_result total; //mean, std;
499

    
500
        if (!prefs->total_tm->completed)
501
                timer_stop(prefs->total_tm, NULL);
502

    
503
        separate_by_order(prefs->total_tm->sum, &total);
504
        print_res(total, "Total Time");
505
        return;
506
}
507

    
508

    
509
static void handle_received(struct peerd *peer, struct peer_req *pr)
510
{
511
        //FIXME: handle null pointer
512
        struct bench *prefs = peer->priv;
513
        struct timer *rec = prefs->rec_tm;
514

    
515
        if (!pr->req) {
516
                //This is a serious error, so we must stop
517
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
518
                terminated++;
519
                return;
520
        }
521

    
522
        if (!pr->priv) {
523
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
524
                return;
525
        }
526

    
527
        timer_stop(rec, pr->priv);
528

    
529
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
530
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");
531

    
532
        //QUESTION, can't we just keep the malloced memory for future use?
533
        free(pr->priv);
534
        free_peer_req(peer, pr);
535
}
536

    
537
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
538
                enum dispatch_reason reason)
539
{
540
        switch (reason) {
541
                case dispatch_accept:
542
                        //This is wrong, benchmarking peer should not accept requests,
543
                        //only receive them.
544
                        XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
545
                        complete(peer, pr);
546
                        break;
547
                case dispatch_receive:
548
                        handle_received(peer, pr);
549
                        break;
550
                default:
551
                        fail(peer, pr);
552
        }
553
        return 0;
554
}