Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / bench-xseg.c @ 4b114b66

History | View | Annotate | Download (13.9 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48

    
49
struct timespec delay = {0, 4000000};
50

    
51
void custom_peer_usage()
52
{
53
        fprintf(stderr, "Custom peer options: \n"
54
                "  --------------------------------------------\n"
55
                "    -op       | None    | XSEG operation [read|write|info|delete]\n"
56
                "    --pattern | None    | I/O pattern [sync|rand]\n"
57
                "    -ts       | None    | Total I/O size\n"
58
                "    -os       | 4M      | Object size\n"
59
                "    -bs       | 4k      | Block size\n"
60
                "    -dp       | None    | Destination port\n"
61
                "    --iodepth | 1       | Number of in-flight I/O requests\n"
62
                "\n");
63
}
64

    
65
/*
66
 * Convert string to size in bytes.
67
 * If syntax is invalid, return 0. Values such as zero and non-integer
68
 * multiples of segment's page size should not be accepted.
69
 */
70
static uint64_t str2num(char *str)
71
{
72
        char *unit;
73
        uint64_t num;
74

    
75
        num = strtoll(str, &unit, 10);
76
        if (strlen(unit) > 1) //Invalid syntax
77
                return 0;
78
        else if (strlen(unit) < 1) //Plain number in bytes
79
                return num;
80

    
81
        switch (*unit) {
82
                case 'g':
83
                case 'G':
84
                        num *= 1024;
85
                case 'm':
86
                case 'M':
87
                        num *= 1024;
88
                case 'k':
89
                case 'K':
90
                        num *= 1024;
91
                        break;
92
                default:
93
                        num = 0;
94
        }
95
        return num;
96
}
97

    
98
int read_op(char *op) {
99
        if (strcmp(op, "read"))
100
                return X_READ;
101
        if (strcmp(op, "write"))
102
                return X_WRITE;
103
        if (strcmp(op, "info"))
104
                return X_INFO;
105
        if (strcmp(op, "delete"))
106
                return X_DELETE;
107
        return -1;
108
}
109

    
110
int read_pattern(char *pattern) {
111
        if (strcmp(pattern, "sync"))
112
                return IO_SYNC;
113
        if (strcmp(pattern, "rand"))
114
                return IO_RAND;
115
        return -1;
116
}
117

    
118
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
119
{
120
        struct bench *prefs;
121
        char total_size[MAX_ARG_LEN + 1];
122
        char object_size[MAX_ARG_LEN + 1];
123
        char block_size[MAX_ARG_LEN + 1];
124
        struct xseg *xseg = peer->xseg;
125
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
126
        long dst_port = -1;
127

    
128
        total_size[0] = 0;
129
        block_size[0] = 0;
130
        object_size[0] = 0;
131

    
132
#ifdef MT
133
        for (i = 0; i < nr_threads; i++) {
134
                prefs = peer->thread[i]->priv;
135
                prefs = malloc(sizeof(struct bench));
136
                if (!prefs) {
137
                        perror("malloc");
138
                        return -1;
139
                }
140
        }
141
#endif
142
        prefs = malloc(sizeof(struct bench));
143
        if (!prefs) {
144
                perror("malloc");
145
                return -1;
146
        }
147

    
148
        //Begin reading the benchmark-specific arguments
149
        BEGIN_READ_ARGS(argc, argv);
150
        READ_ARG_STRING("-op", op, MAX_ARG_LEN);
151
        READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
152
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
153
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
154
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
155
        READ_ARG_ULONG("--iodepth", prefs->iodepth);
156
        READ_ARG_ULONG("-dp", dst_port);
157
        END_READ_ARGS();
158

    
159
        /*****************************
160
         * Check I/O type parameters *
161
         *****************************/
162

    
163
        prefs->op = 
164
        /*************************
165
         * Check size parameters *
166
         *************************/
167

    
168
        //Block size (bs): Defaults to 4K.
169
        //It must be a number followed by one of these characters: [k|K|m|M|g|G].
170
        //If not, it will be considered as size in bytes.
171
        //Must be integer multiple of segment's page size (typically 4k).
172
        if (!block_size[0])
173
                strcpy(block_size,"4k");
174

    
175
        if (!prefs->iodepth)
176
                prefs->iodepth = 1;
177

    
178
        prefs->bs = str2num(block_size);
179
        if (!prefs->bs) {
180
                XSEGLOG2(&lc, E, "Invalid syntax: %s\n", block_size);
181
                goto arg_fail;
182
        } else if (prefs->bs % xseg_page_size) {
183
                XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
184
                goto arg_fail;
185
        }
186

    
187
        //Total I/O size (ts): Must be supplied by user.
188
        //Must have the same format as "total size"
189
        //Must be integer multiple of "block size"
190
        if (!total_size[0]) {
191
                XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
192
                goto arg_fail;
193
        }
194

    
195
        prefs->ts = str2num(total_size);
196
        if (!prefs->ts) {
197
                XSEGLOG2(&lc, E, "Invalid syntax: %s\n", total_size);
198
                goto arg_fail;
199
        } else if (prefs->ts % prefs->bs) {
200
                XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
201
                goto arg_fail;
202
        } else if (prefs->ts > xseg->segment_size) {
203
                XSEGLOG2(&lc, E, "Total I/O size exceeds segment size\n", total_size);
204
                goto arg_fail;
205
        }
206

    
207
        //Object size (os): Defaults to 4M.
208
        //Must have the same format as "total size"
209
        //Must be integer multiple of "block size"
210
        if (!object_size[0])
211
                strcpy(object_size,"4M");
212

    
213
        prefs->os = str2num(object_size);
214
        if (!prefs->os) {
215
                XSEGLOG2(&lc, E, "Invalid syntax: %s\n", object_size);
216
                goto arg_fail;
217
        } else if (prefs->os % prefs->bs) {
218
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
219
                goto arg_fail;
220
        }
221

    
222
        /*************************
223
         * Check port parameters *
224
         *************************/
225

    
226
        if (dst_port < 0){
227
                XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
228
                goto arg_fail;
229
        }
230

    
231
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
232
        prefs->dst_port = (xport) dst_port;
233

    
234
        /*********************************
235
         * Create timers for all metrics *
236
         *********************************/
237

    
238
        if (init_timer(prefs->total_tm, TM_SANE))
239
                goto tm_fail;
240
        if (init_timer(prefs->sub_tm, TM_MANIC))
241
                goto tm_fail;
242
        if (init_timer(prefs->get_tm, TM_PARANOID))
243
                goto tm_fail;
244
        if (init_timer(prefs->rec_tm, TM_ECCENTRIC))
245
                goto tm_fail;
246

    
247
        /**************************
248
         * Customize struct peerd *
249
         **************************/
250

    
251
        peer->peerd_loop = custom_peerd_loop;
252
        peer->priv = (void *) prefs;
253
        return 0;
254

    
255
arg_fail:
256
        custom_peer_usage();
257
tm_fail:
258
        free(prefs->total_tm);
259
        free(prefs->sub_tm);
260
        free(prefs->get_tm);
261
        free(prefs->rec_tm);
262
        free(prefs);
263
        return -1;
264
}
265

    
266

    
267
static int send_request(struct peerd *peer, struct bench *prefs)
268
{
269
        struct xseg_request *req;
270
        struct xseg *xseg = peer->xseg;
271
        struct peer_req *pr;
272
        xport srcport = prefs->src_port;
273
        xport dstport = prefs->dst_port;
274
        xport p;
275

    
276
        int r;
277
        uint32_t targetlen = 10; //FIXME: handle it better
278
        uint64_t size = prefs->bs;
279

    
280
        //srcport and dstport must already be provided by the user.
281
        //returns struct xseg_request with basic initializations
282
        XSEGLOG2(&lc, D, "Get request %lu\n", prefs->get_tm->completed);
283
        timer_start(prefs->get_tm);
284
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
285
        if (!req) {
286
                XSEGLOG2(&lc, W, "Cannot get request\n");
287
                return -1;
288
        }
289
        timer_stop(prefs->get_tm, NULL);
290

    
291
        //Allocate enough space for the data and the target's name
292
        XSEGLOG2(&lc, D, "Prepare request %lu\n", prefs->sub_tm->completed);
293
        r = xseg_prep_request(xseg, req, targetlen, size);
294
        if (r < 0) {
295
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
296
                        (unsigned long)targetlen, (unsigned long long)size);
297
                goto put_xseg_request;
298
        }
299

    
300
#if 0
301
        //TODO: allow strcpy, memcpy
302
        //Copy target's name to the newly allocated space
303
        req_target = xseg_get_target(xseg, req);
304
        strncpy(req_target, target, targetlen);
305

306
        //Copy data buffer to the newly allocated space
307
        req_data = xseg_get_data(xseg, req);
308
        memcpy(req_data, buf, size);
309
        req->offset = offset;
310
        req->size = size;
311
        req->op = X_WRITE;
312
#endif
313

    
314
        //Measure this?
315
        XSEGLOG2(&lc, D, "Allocate peer request\n");
316
        pr = alloc_peer_req(peer);
317
        if (!pr) {
318
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
319
                                peer->nr_ops - xq_count(&peer->free_reqs));
320
                goto put_xseg_request;
321
        }
322
        pr->peer = peer;
323
        pr->portno = srcport;
324
        pr->req = req;
325
        pr->priv = malloc(sizeof(struct timespec));
326

    
327
        XSEGLOG2(&lc, D, "Set request data\n");
328
        r = xseg_set_req_data(xseg, req, pr);
329
        if (r<0) {
330
                XSEGLOG2(&lc, W, "Cannot set request data\n");
331
                goto put_peer_request;
332
        }
333

    
334
        /*
335
         * Start measuring receive time.
336
         * When we receive a request, we need to have its submission time to
337
         * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
338
         * QUESTION: Is this the fastest way?
339
         */
340
        timer_start(prefs->rec_tm);
341
        memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
342

    
343
        //Submit the request from the source port to the target port
344
        XSEGLOG2(&lc, D, "Submit request %lu\n", prefs->sub_tm->completed);
345
        //QUESTION: Can't we just use the submision time calculated previously?
346
        timer_start(prefs->sub_tm);
347
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
348
        if (p == NoPort) {
349
                XSEGLOG2(&lc, W, "Cannot submit request\n");
350
                goto put_peer_request;
351
        }
352
        timer_stop(prefs->sub_tm, NULL);
353

    
354
        //Send SIGIO to the process that has binded this port to inform that
355
        //IO is possible
356
        xseg_signal(xseg, p);
357

    
358
        return 0;
359

    
360
put_peer_request:
361
        free(pr->priv);
362
        free_peer_req(peer, pr);
363
put_xseg_request:
364
        if (xseg_put_request(xseg, req, srcport))
365
                XSEGLOG2(&lc, W, "Cannot put request\n");
366
        return -1;
367
}
368

    
369
/*
370
 * This function substitutes the default generic_peerd_loop of peer.c.
371
 * It's plugged to struct peerd at custom peer's initialisation
372
 */
373
int custom_peerd_loop(void *arg)
374
{
375
#ifdef MT
376
        struct thread *t = (struct thread *) arg;
377
        struct peerd *peer = t->peer;
378
        char *id = t->arg;
379
#else
380
        struct peerd *peer = (struct peerd *) arg;
381
        char id[4] = {'P','e','e','r'};
382
#endif
383
        struct xseg *xseg = peer->xseg;
384
        struct bench *prefs = peer->priv;
385
        xport portno_start = peer->portno_start;
386
        xport portno_end = peer->portno_end;
387
        uint64_t threshold=1000/(1 + portno_end - portno_start);
388
        pid_t pid =syscall(SYS_gettid);
389
        int r;
390

    
391
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
392
        xseg_init_local_signal(xseg, peer->portno_start);
393
        uint64_t loops;
394
        unsigned long max_completed = prefs->ts / prefs->bs;
395

    
396
        timer_start(prefs->total_tm);
397

    
398
        //while (!isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops) {
399
        while (!isTerminate()) {
400
#ifdef MT
401
                if (t->func) {
402
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
403
                        xseg_cancel_wait(xseg, peer->portno_start);
404
                        t->func(t->arg);
405
                        t->func = NULL;
406
                        t->arg = NULL;
407
                        continue;
408
                }
409
#endif
410
send_request:
411
                while (prefs->sub_tm->completed - prefs->rec_tm->completed <
412
                                prefs->iodepth){
413
                        XSEGLOG2(&lc, D, "Start sending new request\n");
414
                        r = send_request(peer, prefs);
415
                        if (r<0)
416
                                break;
417
                }
418

    
419
                //Heart of peerd_loop. This loop is common for everyone.
420
                for (loops = threshold; loops > 0; loops--) {
421
                        if (check_ports(peer)) {
422
                                if (max_completed == prefs->rec_tm->completed)
423
                                        return 0;
424
                                else
425
                                        //If an old request has just been acked, the most sensible
426
                                        //thing to do is to immediately send a new one
427
                                        goto send_request;
428
                        }
429
                }
430
                xseg_prepare_wait(xseg, peer->portno_start);
431
#ifdef ST_THREADS
432
                if (ta){
433
                        st_sleep(0);
434
                        continue;
435
                }
436
#endif
437
                XSEGLOG2(&lc, I, "%s goes to sleep\n",id);
438
                xseg_wait_signal(xseg, 10000000UL);
439
                xseg_cancel_wait(xseg, peer->portno_start);
440
                XSEGLOG2(&lc, I, "%s woke up\n", id);
441
        }
442

    
443
        XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
444
                        xq_count(&peer->free_reqs), peer->nr_ops);
445
        return 0;
446
}
447

    
448
static void print_res(struct tm_result res, char *type)
449
{
450
        printf("\n");
451
        printf("      %s\n", type);
452
        printf("================================\n");
453
        printf("       |-s-||-ms-|-us-|-ns-|\n");
454
        printf("Time:  %03lu, %03lu  %03lu  %03lu\n",
455
                        res.s, res.ms, res.us, res.ns);
456
}
457

    
458
static void separate_by_order(struct timespec src, struct tm_result *res)
459
{
460
        res->ns = src.tv_nsec % 1000;
461
        src.tv_nsec /= 1000;
462
        res->us = src.tv_nsec % 1000;
463
        res->ms = src.tv_nsec / 1000;
464
        res->s = src.tv_sec;
465
}
466

    
467
void custom_peer_finalize(struct peerd *peer)
468
{
469
        struct bench *prefs = peer->priv;
470
        //TODO: Measure mean time, standard variation
471
        struct tm_result total, mean, std;
472
        unsigned int s, ms, us, ns;
473

    
474
        if (!prefs->total_tm->completed)
475
                timer_stop(prefs->total_tm, NULL);
476

    
477
        separate_by_order(prefs->total_tm->sum, &total);
478
        print_res(total, "Total Time");
479
        return;
480
}
481

    
482

    
483
static void handle_received(struct peerd *peer, struct peer_req *pr)
484
{
485
        //FIXME: handle null pointer
486
        struct bench *prefs = peer->priv;
487
        struct timer *rec = prefs->rec_tm;
488

    
489
        if (!pr->req) {
490
                //This is a serious error, so we must stop
491
                XSEGLOG2(&lc, E, "Received peer request with no xseg request");
492
                terminated++;
493
                return;
494
        }
495

    
496
        if (!pr->priv) {
497
                XSEGLOG2(&lc, W, "Cannot find submission time of request");
498
                return;
499
        }
500

    
501
        timer_stop(rec, pr->priv);
502

    
503
        if (xseg_put_request(peer->xseg, pr->req, pr->portno))
504
                XSEGLOG2(&lc, W, "Cannot put xseg request\n");
505

    
506
        //QUESTION, can't we just keep the malloced memory for future use?
507
        free(pr->priv);
508
        free_peer_req(peer, pr);
509
}
510

    
511
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
512
                enum dispatch_reason reason)
513
{
514
        switch (reason) {
515
                case dispatch_accept:
516
                        //This is wrong, benchmarking peer should not accept requests,
517
                        //only receive them.
518
                        XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
519
                        complete(peer, pr);
520
                        break;
521
                case dispatch_receive:
522
                        handle_received(peer, pr);
523
                        break;
524
                default:
525
                        fail(peer, pr);
526
        }
527
        return 0;
528
}