920c3e2d002bc77348e01be717722227e00b4a09
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <limits.h>
49
50 char global_id[IDLEN];
51 uint64_t global_seed;
52
53 /*
54  * This function checks two things:
55  * a) If in-flight requests are less than given iodepth
56  * b) If we have submitted all of the requests
57  */
58 #define CAN_SEND_REQUEST(prefs) \
59         prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
60         prefs->sub_tm->completed < prefs->max_requests  \
61
62 void custom_peer_usage()
63 {
64         fprintf(stderr, "Custom peer options: \n"
65                         "  --------------------------------------------\n"
66                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67                         "    --pattern | None    | I/O pattern [seq|rand]\n"
68                         "    -to       | None    | Total objects (not for read/write)\n"
69                         "    -ts       | None    | Total I/O size\n"
70                         "    -os       | 4M      | Object size\n"
71                         "    -bs       | 4k      | Block size\n"
72                         "    -tp       | None    | Target port\n"
73                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
74                         "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75                         "    --seed    | None    | Inititalize LFSR and target names\n"
76                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77                         "              |         |     [sane|eccentric|manic|paranoid]\n"
78                         "\n");
79 }
80
81 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82 {
83         struct bench *prefs;
84         char total_objects[MAX_ARG_LEN + 1];
85         char total_size[MAX_ARG_LEN + 1];
86         char object_size[MAX_ARG_LEN + 1];
87         char block_size[MAX_ARG_LEN + 1];
88         char op[MAX_ARG_LEN + 1];
89         char pattern[MAX_ARG_LEN + 1];
90         char insanity[MAX_ARG_LEN + 1];
91         struct xseg *xseg = peer->xseg;
92         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
93         long iodepth = -1;
94         long dst_port = -1;
95         unsigned long seed = -1;
96         int r;
97
98         op[0] = 0;
99         pattern[0] = 0;
100         total_objects[0] = 0;
101         total_size[0] = 0;
102         block_size[0] = 0;
103         object_size[0] = 0;
104         insanity[0] = 0;
105
106 #ifdef MT
107         for (i = 0; i < nr_threads; i++) {
108                 prefs = peer->thread[i]->priv;
109                 prefs = malloc(sizeof(struct bench));
110                 if (!prefs) {
111                         perror("malloc");
112                         return -1;
113                 }
114         }
115 #endif
116         prefs = malloc(sizeof(struct bench));
117         if (!prefs) {
118                 perror("malloc");
119                 return -1;
120         }
121         prefs->flags = 0;
122
123         //Begin reading the benchmark-specific arguments
124         BEGIN_READ_ARGS(argc, argv);
125         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
126         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
127         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
128         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
129         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
130         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
131         READ_ARG_ULONG("--iodepth", iodepth);
132         READ_ARG_ULONG("-tp", dst_port);
133         READ_ARG_ULONG("--seed", seed);
134         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
135         END_READ_ARGS();
136
137         /*****************************\
138          * Check I/O type parameters *
139         \*****************************/
140
141         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
142         //The I/O pattern of thesee operations can be either synchronous (sync) or
143         //random (rand)
144         if (!op[0]) {
145                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
146                 goto arg_fail;
147         }
148         r = read_op(op);
149         if (r < 0) {
150                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
151                 goto arg_fail;
152         }
153         prefs->op = r;
154
155         if (!pattern[0]) {
156                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
157                 goto arg_fail;
158         }
159         r = read_pattern(pattern);
160         if (r < 0) {
161                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
162                 goto arg_fail;
163         }
164         prefs->flags |= (uint8_t)r;
165
166         //Defailt iodepth value is 1
167         if (iodepth < 0)
168                 prefs->iodepth = 1;
169         else
170                 prefs->iodepth = iodepth;
171
172         /**************************\
173          * Check timer parameters *
174         \**************************/
175
176         //Most of the times, not all timers need to be used.
177         //We can choose which timers will be used by adjusting the "insanity"
178         //level of the benchmark i.e. the obscurity of code paths (get request,
179         //submit request) that will be timed.
180         if (!insanity[0])
181                 strcpy(insanity, "sane");
182
183         prefs->insanity = read_insanity(insanity);
184         if (prefs->insanity < 0) {
185                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
186                 goto arg_fail;
187         }
188
189         /*
190          * If we have a request other than read/write, we don't need to check
191          * about size parameters, but only how many objects we want to affect
192          */
193         if (prefs->op != X_READ && prefs->op != X_WRITE) {
194
195                 /***************************\
196                  * Check object parameters *
197                 \***************************/
198
199                 if (!total_objects[0]) {
200                         XSEGLOG2(&lc, E,
201                                         "Total number of objects needs to be supplied\n");
202                         goto arg_fail;
203                 }
204                 prefs->to = str2num(total_objects);
205                 if (!prefs->to) {
206                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
207                         goto arg_fail;
208                 }
209
210                 //In this case, the maximum number of requests is the total number of
211                 //objects we will handle
212                 prefs->max_requests = prefs->to;
213         } else {
214
215                 /*************************\
216                  * Check size parameters *
217                 \*************************/
218
219                 //Block size (bs): Defaults to 4K.
220                 //It must be a number followed by one of these characters:
221                 //                                              [k|K|m|M|g|G]
222                 //If not, it will be considered as size in bytes.
223                 //Must be integer multiple of segment's page size (typically 4k).
224                 if (!block_size[0])
225                         strcpy(block_size,"4k");
226
227                 prefs->bs = str2num(block_size);
228                 if (!prefs->bs) {
229                         XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
230                         goto arg_fail;
231                 } else if (prefs->bs % xseg_page_size) {
232                         XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
233                         goto arg_fail;
234                 }
235
236                 //Total I/O size (ts): Must be supplied by user.
237                 //Must have the same format as "total size"
238                 //Must be integer multiple of "block size"
239                 if (!total_size[0]) {
240                         XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
241                         goto arg_fail;
242                 }
243
244                 prefs->ts = str2num(total_size);
245                 if (!prefs->ts) {
246                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
247                         goto arg_fail;
248                 } else if (prefs->ts % prefs->bs) {
249                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
250                         goto arg_fail;
251                 } else if (prefs->ts > xseg->segment_size) {
252                         XSEGLOG2(&lc, E,
253                                         "Total I/O size exceeds segment size\n", total_size);
254                         goto arg_fail;
255                 }
256
257                 //Object size (os): Defaults to 4M.
258                 //Must have the same format as "total size"
259                 //Must be integer multiple of "block size"
260                 if (!object_size[0])
261                         strcpy(object_size,"4M");
262
263                 prefs->os = str2num(object_size);
264                 if (!prefs->os) {
265                         XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
266                         goto arg_fail;
267                 } else if (prefs->os % prefs->bs) {
268                         XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
269                         goto arg_fail;
270                 }
271
272                 //In this case, the maximum number of requests is the number of blocks
273                 //we need to cover the total I/O size
274                 prefs->max_requests = prefs->ts / prefs->bs;
275         }
276
277         /*************************\
278          * Check port parameters *
279         \*************************/
280
281         if (dst_port < 0){
282                 XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
283                 goto arg_fail;
284         }
285
286         prefs->src_port = peer->portno_start; //TODO: allow user to change this
287         prefs->dst_port = (xport) dst_port;
288
289         /*********************************\
290          * Create timers for all metrics *
291         \*********************************/
292
293         if (init_timer(&prefs->total_tm, TM_SANE))
294                 goto tm_fail;
295         if (init_timer(&prefs->sub_tm, TM_MANIC))
296                 goto tm_fail;
297         if (init_timer(&prefs->get_tm, TM_PARANOID))
298                 goto tm_fail;
299         if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
300                 goto tm_fail;
301
302         /********************************\
303          * Customize struct peerd/prefs *
304         \********************************/
305
306         prefs->peer = peer;
307
308         //The following function initializes the global_id, global_seed extern
309         //variables.
310         create_id(seed);
311
312         if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
313                 prefs->lfsr = malloc(sizeof(struct lfsr));
314                 if (!prefs->lfsr) {
315                         perror("malloc");
316                         goto lfsr_fail;
317                 }
318                 //FIXME: handle better the seed passing than just giving UINT64_MAX
319                 if (lfsr_init(prefs->lfsr, prefs->max_requests, seed)) {
320                         XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
321                         goto lfsr_fail;
322                 }
323         }
324
325         peer->peerd_loop = custom_peerd_loop;
326         peer->priv = (void *) prefs;
327         return 0;
328
329 arg_fail:
330         custom_peer_usage();
331 lfsr_fail:
332         free(prefs->lfsr);
333 tm_fail:
334         free(prefs->total_tm);
335         free(prefs->sub_tm);
336         free(prefs->get_tm);
337         free(prefs->rec_tm);
338         free(prefs);
339         return -1;
340 }
341
342
343 static int send_request(struct peerd *peer, struct bench *prefs)
344 {
345         struct xseg_request *req;
346         struct xseg *xseg = peer->xseg;
347         struct peer_req *pr;
348         xport srcport = prefs->src_port;
349         xport dstport = prefs->dst_port;
350         xport p;
351
352         int r;
353         uint64_t new;
354         uint64_t size = prefs->bs;
355
356         //srcport and dstport must already be provided by the user.
357         //returns struct xseg_request with basic initializations
358         //XSEGLOG2(&lc, D, "Get new request\n");
359         timer_start(prefs, prefs->get_tm);
360         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
361         if (!req) {
362                 XSEGLOG2(&lc, W, "Cannot get request\n");
363                 return -1;
364         }
365         timer_stop(prefs, prefs->get_tm, NULL);
366
367         //Allocate enough space for the data and the target's name
368         //XSEGLOG2(&lc, D, "Prepare new request\n");
369         r = xseg_prep_request(xseg, req, TARGETLEN, size);
370         if (r < 0) {
371                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
372                                 TARGETLEN, (unsigned long long)size);
373                 goto put_xseg_request;
374         }
375
376         //Determine what the next target/chunk will be, based on I/O pattern
377         new = determine_next(prefs);
378         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
379         //Create a target of this format: "bench-<obj_no>"
380         create_target(prefs, req, new);
381
382         if (prefs->op == X_WRITE || prefs->op == X_READ) {
383                 req->size = size;
384                 //Calculate the chunk offset inside the object
385                 req->offset = (new * prefs->bs) % prefs->os;
386                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
387
388                 if (prefs->op == X_WRITE)
389                         create_chunk(prefs, req, new);
390         }
391
392         req->op = prefs->op;
393
394         //Measure this?
395         //XSEGLOG2(&lc, D, "Allocate peer request\n");
396         pr = alloc_peer_req(peer);
397         if (!pr) {
398                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
399                                 peer->nr_ops - xq_count(&peer->free_reqs));
400                 goto put_xseg_request;
401         }
402         pr->peer = peer;
403         pr->portno = srcport;
404         pr->req = req;
405         pr->priv = malloc(sizeof(struct timespec));
406         if (!pr->priv) {
407                 perror("malloc");
408                 goto put_peer_request;
409         }
410
411         //XSEGLOG2(&lc, D, "Set request data\n");
412         r = xseg_set_req_data(xseg, req, pr);
413         if (r < 0) {
414                 XSEGLOG2(&lc, W, "Cannot set request data\n");
415                 goto put_peer_request;
416         }
417
418         /*
419          * Start measuring receive time.
420          * When we receive a request, we need to have its submission time to
421          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
422          * QUESTION: Is this the fastest way?
423          */
424         timer_start(prefs, prefs->rec_tm);
425         if (prefs->rec_tm->insanity <= prefs->insanity)
426                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
427
428         //Submit the request from the source port to the target port
429         //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
430         timer_start(prefs, prefs->sub_tm);
431         p = xseg_submit(xseg, req, srcport, X_ALLOC);
432         if (p == NoPort) {
433                 XSEGLOG2(&lc, W, "Cannot submit request\n");
434                 goto put_peer_request;
435         }
436         timer_stop(prefs, prefs->sub_tm, NULL);
437
438         //Send SIGIO to the process that has bound this port to inform that
439         //IO is possible
440         r = xseg_signal(xseg, p);
441         //if (r < 0)
442         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
443
444         return 0;
445
446 put_peer_request:
447         free(pr->priv);
448         free_peer_req(peer, pr);
449 put_xseg_request:
450         if (xseg_put_request(xseg, req, srcport))
451                 XSEGLOG2(&lc, W, "Cannot put request\n");
452         return -1;
453 }
454
455 /*
456  * This function substitutes the default generic_peerd_loop of peer.c.
457  * It's plugged to struct peerd at custom peer's initialisation
458  */
459 int custom_peerd_loop(void *arg)
460 {
461 #ifdef MT
462         struct thread *t = (struct thread *) arg;
463         struct peerd *peer = t->peer;
464         char *id = t->arg;
465 #else
466         struct peerd *peer = (struct peerd *) arg;
467         char id[4] = {'P','e','e','r'};
468 #endif
469         struct xseg *xseg = peer->xseg;
470         struct bench *prefs = peer->priv;
471         xport portno_start = peer->portno_start;
472         xport portno_end = peer->portno_end;
473         uint64_t threshold=1000/(1 + portno_end - portno_start);
474         pid_t pid = syscall(SYS_gettid);
475         int r;
476         uint64_t loops;
477
478         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
479         xseg_init_local_signal(xseg, peer->portno_start);
480
481         timer_start(prefs, prefs->total_tm);
482 send_request:
483         while (!isTerminate()) {
484 #ifdef MT
485                 if (t->func) {
486                         XSEGLOG2(&lc, D, "%s executes function\n", id);
487                         xseg_cancel_wait(xseg, peer->portno_start);
488                         t->func(t->arg);
489                         t->func = NULL;
490                         t->arg = NULL;
491                         continue;
492                 }
493 #endif
494                 while (CAN_SEND_REQUEST(prefs)) {
495                         xseg_cancel_wait(xseg, peer->portno_start);
496                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
497                                         prefs->sub_tm->completed - prefs->rec_tm->completed,
498                                         prefs->iodepth, prefs->sub_tm->completed,
499                                         prefs->max_requests);
500                         XSEGLOG2(&lc, D, "Start sending new request\n");
501                         r = send_request(peer, prefs);
502                         if (r < 0)
503                                 break;
504                 }
505                 //Heart of peerd_loop. This loop is common for everyone.
506                 for (loops = threshold; loops > 0; loops--) {
507                         if (loops == 1)
508                                 xseg_prepare_wait(xseg, peer->portno_start);
509
510                         if (check_ports(peer)) {
511                                 //If an old request has just been acked, the most sensible
512                                 //thing to do is to immediately send a new one
513                                 if (prefs->rec_tm->completed < prefs->max_requests)
514                                         goto send_request;
515                                 else
516                                         return 0;
517                         }
518                 }
519                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
520                 //struct xq *q;
521                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
522                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
523                 //              id, xq_count(q));
524                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
525 #ifdef ST_THREADS
526                 if (ta){
527                         st_sleep(0);
528                         continue;
529                 }
530 #endif
531                 xseg_wait_signal(xseg, 10000000UL);
532                 xseg_cancel_wait(xseg, peer->portno_start);
533                 XSEGLOG2(&lc, I, "%s woke up\n", id);
534         }
535
536         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
537                         xq_count(&peer->free_reqs), peer->nr_ops);
538         return 0;
539 }
540
541 void custom_peer_finalize(struct peerd *peer)
542 {
543         struct bench *prefs = peer->priv;
544         //TODO: Measure mean time, standard variation
545
546         if (!prefs->total_tm->completed)
547                 timer_stop(prefs, prefs->total_tm, NULL);
548
549         print_stats(prefs);
550         print_res(prefs, prefs->total_tm, "Total Requests");
551         return;
552 }
553
554
555 static void handle_received(struct peerd *peer, struct peer_req *pr)
556 {
557         //FIXME: handle null pointer
558         struct bench *prefs = peer->priv;
559         struct timer *rec = prefs->rec_tm;
560
561         if (!pr->req) {
562                 //This is a serious error, so we must stop
563                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
564                 terminated++;
565                 return;
566         }
567
568         if (!pr->priv) {
569                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
570                 return;
571         }
572
573         timer_stop(prefs, rec, pr->priv);
574
575         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
576                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
577
578         //QUESTION, can't we just keep the malloced memory for future use?
579         free(pr->priv);
580         free_peer_req(peer, pr);
581 }
582
583 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
584                 enum dispatch_reason reason)
585 {
586         switch (reason) {
587                 case dispatch_accept:
588                         //This is wrong, benchmarking peer should not accept requests,
589                         //only receive them.
590                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
591                         complete(peer, pr);
592                         break;
593                 case dispatch_receive:
594                         handle_received(peer, pr);
595                         break;
596                 default:
597                         fail(peer, pr);
598         }
599         return 0;
600 }