Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / bench-xseg.c @ 48a66f3b

History | View | Annotate | Download (10.9 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <unistd.h>
39
#include <sys/syscall.h>
40
#include <sys/types.h>
41
#include <pthread.h>
42
#include <xseg/xseg.h>
43
#include <peer.h>
44
#include <time.h>
45
#include <sys/util.h>
46
#include <signal.h>
47
#include <bench-xseg.h>
48

    
49
struct timespec delay = {0, 4000000};
50

    
51
void custom_peer_usage()
52
{
53
        fprintf(stderr, "Custom peer options: \n"
54
                "  --------------------------------------------\n"
55
                "    -ts       | None    | Total I/O size\n"
56
                "    -os       | 4M      | Object size\n"
57
                "    -bs       | 4k      | Block size\n"
58
                "    -dp       | None    | Destination port\n"
59
                "    --iodepth | 1       | Number of in-flight I/O requests\n"
60
                "\n");
61
}
62

    
63
/*
64
 * Convert string to size in bytes.
65
 * If syntax is invalid, return 0. Values such as zero and non-integer
66
 * multiples of segment's page size should not be accepted.
67
 */
68
static uint64_t str2num(char *str)
69
{
70
        char *unit;
71
        uint64_t num;
72

    
73
        num = strtoll(str, &unit, 10);
74
        if (strlen(unit) > 1) //Invalid syntax
75
                return 0;
76
        else if (strlen(unit) < 1) //Plain number in bytes
77
                return num;
78

    
79
        switch (*unit) {
80
                case 'g':
81
                case 'G':
82
                        num *= 1024;
83
                case 'm':
84
                case 'M':
85
                        num *= 1024;
86
                case 'k':
87
                case 'K':
88
                        num *= 1024;
89
                        break;
90
                default:
91
                        num = 0;
92
        }
93
        return num;
94
}
95

    
96
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
97
{
98
        struct bench *prefs;
99
        char total_size[MAX_ARG_LEN + 1];
100
        char object_size[MAX_ARG_LEN + 1];
101
        char block_size[MAX_ARG_LEN + 1];
102
        struct xseg *xseg = peer->xseg;
103
        unsigned int xseg_page_size = 1 << xseg->config.page_shift;
104
        long dst_port = -1;
105

    
106
        total_size[0] = 0;
107
        block_size[0] = 0;
108
        object_size[0] = 0;
109

    
110
        prefs = malloc(sizeof(struct bench));
111
        if (!prefs) {
112
                perror("malloc");
113
                return -1;
114
        }
115

    
116
        //Begin reading the benchmark-specific arguments
117
        BEGIN_READ_ARGS(argc, argv);
118
        READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
119
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
120
        READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
121
        READ_ARG_ULONG("--iodepth", prefs->iodepth);
122
        READ_ARG_ULONG("-dp", dst_port);
123
        END_READ_ARGS();
124

    
125
        /*************************
126
         * Check size parameters *
127
         *************************/
128

    
129
        //Block size (bs): Defaults to 4K.
130
        //It must be a number followed by one of these characters: [k|K|m|M|g|G].
131
        //If not, it will be considered as size in bytes.
132
        //Must be integer multiple of segment's page size (typically 4k).
133
        if (!block_size[0])
134
                strcpy(block_size,"4k");
135

    
136
        if (!prefs->iodepth)
137
                prefs->iodepth = 1;
138

    
139
        prefs->bs = str2num(block_size);
140
        if (!prefs->bs) {
141
                XSEGLOG2(&lc, E, "Invalid syntax: %s\n", block_size);
142
                goto arg_fail;
143
        } else if (prefs->bs % xseg_page_size) {
144
                XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
145
                goto arg_fail;
146
        }
147

    
148
        //Total I/O size (ts): Must be supplied by user.
149
        //Must have the same format as "total size"
150
        //Must be integer multiple of "block size"
151
        if (!total_size[0]) {
152
                XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
153
                goto arg_fail;
154
        }
155

    
156
        prefs->ts = str2num(total_size);
157
        if (!prefs->ts) {
158
                XSEGLOG2(&lc, E, "Invalid syntax: %s\n", total_size);
159
                goto arg_fail;
160
        } else if (prefs->ts % prefs->bs) {
161
                XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
162
                goto arg_fail;
163
        } else if (prefs->ts > xseg->segment_size) {
164
                XSEGLOG2(&lc, E, "Total I/O size exceeds segment size\n", total_size);
165
                goto arg_fail;
166
        }
167

    
168
        //Object size (os): Defaults to 4M.
169
        //Must have the same format as "total size"
170
        //Must be integer multiple of "block size"
171
        if (!object_size[0])
172
                strcpy(object_size,"4M");
173

    
174
        prefs->os = str2num(object_size);
175
        if (!prefs->os) {
176
                XSEGLOG2(&lc, E, "Invalid syntax: %s\n", object_size);
177
                goto arg_fail;
178
        } else if (prefs->os % prefs->bs) {
179
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
180
                goto arg_fail;
181
        }
182

    
183
        /*************************
184
         * Check port parameters *
185
         *************************/
186

    
187
        if (dst_port < 0){
188
                XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
189
                goto arg_fail;
190
        }
191

    
192
        prefs->src_port = peer->portno_start; //TODO: allow user to change this
193
        prefs->dst_port = (xport) dst_port;
194

    
195
        /**************************
196
         * Customize struct peerd *
197
         **************************/
198

    
199
        prefs->total_tm = malloc(sizeof(struct timer));
200
        prefs->get_tm = malloc(sizeof(struct timer));
201
        prefs->sub_tm = malloc(sizeof(struct timer));
202
        prefs->rec_tm = malloc(sizeof(struct timer));
203
        if (!prefs->total_tm || !prefs->get_tm || !prefs->sub_tm ||
204
                        !prefs->rec_tm) {
205
                perror("malloc");
206
                return -1;
207
        }
208
        memset(prefs->total_tm, 0, sizeof(struct timer));
209
        memset(prefs->get_tm, 0, sizeof(struct timer));
210
        memset(prefs->sub_tm, 0, sizeof(struct timer));
211
        memset(prefs->rec_tm, 0, sizeof(struct timer));
212

    
213
        peer->peerd_loop = custom_peerd_loop;
214
        peer->priv = (void *) prefs;
215
        return 0;
216

    
217
arg_fail:
218
        free(prefs);
219
        custom_peer_usage();
220
        return -1;
221
}
222

    
223

    
224
int send_request(struct peerd *peer, struct bench *prefs)
225
{
226
        struct xseg_request *req;
227
        struct xseg *xseg = peer->xseg;
228
        xport srcport = prefs->src_port;
229
        xport dstport = prefs->dst_port;
230
        xport p;
231

    
232
        int r;
233
        uint32_t targetlen=10; //FIXME: handle it better
234
        uint64_t size = prefs->bs;
235

    
236
        //srcport and dstport must already be provided by the user.
237
        //returns struct xseg_request with basic initializations
238
        XSEGLOG2(&lc, I, "Get request %lu\n", prefs->get_tm->completed);
239
        timer_start(prefs->get_tm);
240
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
241
        if (!req) {
242
                fprintf(stderr, "No request\n");
243
                return -1;
244
        }
245
        timer_stop(prefs->get_tm);
246

    
247
        XSEGLOG2(&lc, I, "Prepare request\n");
248
        //Allocate enough space for the data and the target's name
249
        r = xseg_prep_request(xseg, req, targetlen, size);
250
        if (r < 0) {
251
                fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
252
                        (unsigned long)targetlen, (unsigned long long)size);
253
                xseg_put_request(xseg, req, srcport);
254
                return -1;
255
        }
256

    
257
#if 0
258
        //TODO: allow strcpy, memcpy
259
        //Copy target's name to the newly allocated space
260
        req_target = xseg_get_target(xseg, req);
261
        strncpy(req_target, target, targetlen);
262

263
        //Copy data buffer to the newly allocated space
264
        req_data = xseg_get_data(xseg, req);
265
        memcpy(req_data, buf, size);
266
        req->offset = offset;
267
        req->size = size;
268
        req->op = X_WRITE;
269
#endif
270

    
271
        XSEGLOG2(&lc, I, "Submit request %lu\n", prefs->sub_tm->completed);
272
        //Submit the request from the source port to the target port
273
        timer_start(prefs->sub_tm);
274
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
275
        if (p == NoPort) {
276
                fprintf(stderr, "Cannot submit\n");
277
                return -1;
278
        }
279
        timer_stop(prefs->sub_tm);
280
        timer_start(prefs->rec_tm);
281
        //Send SIGIO to the process that has binded this port to inform that
282
        //IO is possible
283
        xseg_signal(xseg, p);
284

    
285
        return 0;
286
}
287

    
288
/*
289
 * This function substitutes the default peerd_loop of peer.c.
290
 * It's plugged to struct peerd at custom peer's initialisation
291
 */
292
int custom_peerd_loop(void *arg)
293
{
294
#ifdef MT
295
        struct thread *t = (struct thread *) arg;
296
        struct peerd *peer = t->peer;
297
        char *id = t->arg;
298
#else
299
        struct peerd *peer = (struct peerd *) arg;
300
        char id[4] = {'P','e','e','r'};
301
#endif
302
        struct xseg *xseg = peer->xseg;
303
        struct bench *prefs = peer->priv;
304
        xport portno_start = peer->portno_start;
305
        xport portno_end = peer->portno_end;
306
        uint64_t threshold=1000/(1 + portno_end - portno_start);
307
        pid_t pid =syscall(SYS_gettid);
308

    
309
        XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
310
        xseg_init_local_signal(xseg, peer->portno_start);
311
        uint64_t loops;
312

    
313
        timer_start(prefs->total_tm);
314

    
315
        while (!isTerminate()
316
                        && xq_count(&peer->free_reqs) == peer->nr_ops
317
                        && prefs->rec_tm->completed != prefs->ts / prefs->bs ) {
318
#if 0
319
                //#pragma GCC push_options
320
                //#pragma GCC optimize ("O0")
321
                timer_start(prefs->total_tm);
322
                //for(int i = 0; i<1000; i++){}
323
                /*while(prefs->sub_tm->completed < 10000) {
324
                        timer_start(prefs->sub_tm);
325
                        timer_stop(prefs->sub_tm);
326
                }*/
327
                usleep(500000);
328
                timer_stop(prefs->total_tm);
329
                break;
330
                //#pragma GCC pop_options
331

332
                while (prefs->sub_tm->completed - prefs->rec_tm->completed <
333
                                prefs->iodepth){
334
                        XSEGLOG2(&lc, I, "Start sending new request\n");
335
                        send_request(peer, prefs);
336
                }
337
#endif
338
                for (loops = threshold; loops > 0; loops--) {
339
                        if (loops == 1)
340
                                xseg_prepare_wait(xseg, peer->portno_start);
341
                        if (check_ports(peer))
342
                                loops = threshold;
343
                }
344
#ifdef ST_THREADS
345
                if (ta){
346
                        st_sleep(0);
347
                        continue;
348
                }
349
#endif
350
                XSEGLOG2(&lc, I, "%s goes to sleep\n",id);
351
                xseg_wait_signal(xseg, 10000000UL);
352
                xseg_cancel_wait(xseg, peer->portno_start);
353
                XSEGLOG2(&lc, I, "%s woke up\n", pid);
354
        }
355
        return 0;
356
}
357

    
358
void custom_peer_finalize(struct peerd *peer)
359
{
360
        struct bench *prefs = peer->priv;
361
        unsigned int s, ms, us, ns;
362

    
363
        if (!prefs->total_tm->completed)
364
                timer_stop(prefs->total_tm);
365

    
366
        struct timespec tm = prefs->total_tm->sum;
367
        ns = tm.tv_nsec % 1000;
368
        tm.tv_nsec /= 1000;
369
        us = tm.tv_nsec % 1000;
370
        ms = tm.tv_nsec / 1000;
371
        s = tm.tv_sec;
372

    
373
        printf("\n");
374
        printf("          Total time spent\n");
375
        printf("================================\n");
376
        printf("      |-s-||-ms-|-us-|-ns-|\n");
377
        printf("Time:  %03u, %03u  %03u  %03u\n", s, ms, us, ns);
378
        return;
379
}
380

    
381

    
382
void handle_received(struct peerd *peer, struct peer_req *pr)
383
{
384
        //FIXME: handle null pointer
385
        struct bench *prefs = peer->priv;
386
        struct timer *rec = prefs->rec_tm;
387

    
388
        timer_stop(rec);
389
}
390

    
391
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
392
                enum dispatch_reason reason)
393
{
394
        switch (reason) {
395
                case dispatch_accept:
396
                        //This is wrong, benchmarking peer should not accept requests,
397
                        //only receive them.
398
                        complete(peer, pr);
399
                case dispatch_receive:
400
                        handle_received(peer, pr);
401
                default:
402
                        fail(peer, pr);
403
        }
404
        return 0;
405
}