Improve LFSR implementation
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN];
52
53 /*
54  * This macro checks two things:
55  * a) If in-flight requests are less than given iodepth
56  * b) If we have submitted all of the requests
57  */
58 #define CAN_SEND_REQUEST(prefs) \
59         prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
60         prefs->sub_tm->completed < prefs->max_requests  \
61
62 void custom_peer_usage()
63 {
64         fprintf(stderr, "Custom peer options: \n"
65                         "  --------------------------------------------\n"
66                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67                         "    --pattern | None    | I/O pattern [seq|rand]\n"
68                         "    -to       | None    | Total objects (not for read/write)\n"
69                         "    -ts       | None    | Total I/O size\n"
70                         "    -os       | 4M      | Object size\n"
71                         "    -bs       | 4k      | Block size\n"
72                         "    -tp       | None    | Target port\n"
73                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
74                         "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75                         "    --seed    | None    | Initialize LFSR and target names\n"
76                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77                         "              |         |     [sane|eccentric|manic|paranoid]\n"
78                         "\n");
79 }
80
81 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82 {
83         struct bench *prefs;
84         char total_objects[MAX_ARG_LEN + 1];
85         char total_size[MAX_ARG_LEN + 1];
86         char object_size[MAX_ARG_LEN + 1];
87         char block_size[MAX_ARG_LEN + 1];
88         char op[MAX_ARG_LEN + 1];
89         char pattern[MAX_ARG_LEN + 1];
90         char insanity[MAX_ARG_LEN + 1];
91         char verify[MAX_ARG_LEN + 1];
92         struct xseg *xseg = peer->xseg;
93         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
94         long iodepth = -1;
95         long dst_port = -1;
96         unsigned long seed = -1;
97         struct timespec timer_seed;
98         int set_by_hand = 0;
99         int r;
100
101         op[0] = 0;
102         pattern[0] = 0;
103         total_objects[0] = 0;
104         total_size[0] = 0;
105         block_size[0] = 0;
106         object_size[0] = 0;
107         insanity[0] = 0;
108         verify[0] = 0;
109
110 #ifdef MT
111         for (i = 0; i < nr_threads; i++) {
112                 prefs = peer->thread[i]->priv;
113                 prefs = malloc(sizeof(struct bench));
114                 if (!prefs) {
115                         perror("malloc");
116                         return -1;
117                 }
118         }
119 #endif
120         prefs = malloc(sizeof(struct bench));
121         if (!prefs) {
122                 perror("malloc");
123                 return -1;
124         }
125         prefs->flags = 0;
126
127         //Begin reading the benchmark-specific arguments
128         BEGIN_READ_ARGS(argc, argv);
129         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
130         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
131         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
132         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
133         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
134         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
135         READ_ARG_ULONG("--iodepth", iodepth);
136         READ_ARG_ULONG("-tp", dst_port);
137         READ_ARG_ULONG("--seed", seed);
138         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
139         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
140         END_READ_ARGS();
141
142         /*****************************\
143          * Check I/O type parameters *
144         \*****************************/
145
146         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
147         //The I/O pattern of these operations can be either sequential (seq) or
148         //random (rand)
149         if (!op[0]) {
150                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
151                 goto arg_fail;
152         }
153         r = read_op(op);
154         if (r < 0) {
155                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
156                 goto arg_fail;
157         }
158         prefs->op = r;
159
160         if (!pattern[0]) {
161                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
162                 goto arg_fail;
163         }
164         r = read_pattern(pattern);
165         if (r < 0) {
166                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
167                 goto arg_fail;
168         }
169         prefs->flags |= (uint8_t)r;
170
171         if (!verify[0])
172                 strcpy(verify, "no");
173         r = read_verify(verify);
174         if (r < 0) {
175                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
176                 goto arg_fail;
177         }
178
179
180         //Default iodepth value is 1
181         if (iodepth < 0)
182                 prefs->iodepth = 1;
183         else
184                 prefs->iodepth = iodepth;
185
186         /**************************\
187          * Check timer parameters *
188         \**************************/
189
190         //Most of the times, not all timers need to be used.
191         //We can choose which timers will be used by adjusting the "insanity"
192         //level of the benchmark i.e. the obscurity of code paths (get request,
193         //submit request) that will be timed.
194         if (!insanity[0])
195                 strcpy(insanity, "sane");
196
197         prefs->insanity = read_insanity(insanity);
198         if (prefs->insanity < 0) {
199                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
200                 goto arg_fail;
201         }
202
203         /*
204          * If we have a request other than read/write, we don't need to check
205          * about size parameters, but only how many objects we want to affect
206          */
207         if (prefs->op != X_READ && prefs->op != X_WRITE) {
208
209                 /***************************\
210                  * Check object parameters *
211                 \***************************/
212
213                 if (!total_objects[0]) {
214                         XSEGLOG2(&lc, E,
215                                         "Total number of objects needs to be supplied\n");
216                         goto arg_fail;
217                 }
218                 prefs->to = str2num(total_objects);
219                 if (!prefs->to) {
220                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
221                         goto arg_fail;
222                 }
223
224                 //In this case, the maximum number of requests is the total number of
225                 //objects we will handle
226                 prefs->max_requests = prefs->to;
227         } else {
228
229                 /*************************\
230                  * Check size parameters *
231                 \*************************/
232
233                 //Block size (bs): Defaults to 4K.
234                 //It must be a number followed by one of these characters:
235                 //                                              [k|K|m|M|g|G]
236                 //If not, it will be considered as size in bytes.
237                 //Must be integer multiple of segment's page size (typically 4k).
238                 if (!block_size[0])
239                         strcpy(block_size,"4k");
240
241                 prefs->bs = str2num(block_size);
242                 if (!prefs->bs) {
243                         XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
244                         goto arg_fail;
245                 } else if (prefs->bs % xseg_page_size) {
246                         XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
247                         goto arg_fail;
248                 }
249
250                 //Total I/O size (ts): Must be supplied by user.
251                 //Must have the same format as "total size"
252                 //Must be integer multiple of "block size"
253                 if (!total_size[0]) {
254                         XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
255                         goto arg_fail;
256                 }
257
258                 prefs->ts = str2num(total_size);
259                 if (!prefs->ts) {
260                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
261                         goto arg_fail;
262                 } else if (prefs->ts % prefs->bs) {
263                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
264                         goto arg_fail;
265                 } else if (prefs->ts > xseg->segment_size) {
266                         XSEGLOG2(&lc, E,
267                                         "Total I/O size exceeds segment size\n", total_size);
268                         goto arg_fail;
269                 }
270
271                 //Object size (os): Defaults to 4M.
272                 //Must have the same format as "total size"
273                 //Must be integer multiple of "block size"
274                 if (!object_size[0])
275                         strcpy(object_size,"4M");
276
277                 prefs->os = str2num(object_size);
278                 if (!prefs->os) {
279                         XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
280                         goto arg_fail;
281                 } else if (prefs->os % prefs->bs) {
282                         XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
283                         goto arg_fail;
284                 }
285
286                 //In this case, the maximum number of requests is the number of blocks
287                 //we need to cover the total I/O size
288                 prefs->max_requests = prefs->ts / prefs->bs;
289         }
290
291         /*************************\
292          * Check port parameters *
293         \*************************/
294
295         if (dst_port < 0){
296                 XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
297                 goto arg_fail;
298         }
299
300         prefs->src_port = peer->portno_start; //TODO: allow user to change this
301         prefs->dst_port = (xport) dst_port;
302
303         /*********************************\
304          * Create timers for all metrics *
305         \*********************************/
306
307         if (init_timer(&prefs->total_tm, TM_SANE))
308                 goto tm_fail;
309         if (init_timer(&prefs->sub_tm, TM_MANIC))
310                 goto tm_fail;
311         if (init_timer(&prefs->get_tm, TM_PARANOID))
312                 goto tm_fail;
313         if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
314                 goto tm_fail;
315
316         /********************************\
317          * Customize struct peerd/prefs *
318         \********************************/
319
320         prefs->peer = peer;
321
322 reseed:
323         //We proceed to initialise the global_id, and seed variables.
324         if (seed == -1) {
325                 clock_gettime(CLOCK_MONOTONIC_RAW, &timer_seed);
326                 seed = timer_seed.tv_nsec;
327         } else {
328                 set_by_hand = 1;
329         }
330         create_id(seed);
331
332         if ((prefs->flags & (1 << PATTERN_FLAG)) == IO_RAND) {
333                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
334                 if (!prefs->lfsr) {
335                         perror("malloc");
336                         goto lfsr_fail;
337                 }
338
339                 r = lfsr_init(prefs->lfsr, prefs->max_requests, seed, seed & 0xF);
340                 if (r && set_by_hand) {
341                         XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
342                         goto lfsr_fail;
343                 } else if (r) {
344                         seed = -1;
345                         goto reseed;
346                 }
347         }
348         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
349
350         peer->peerd_loop = custom_peerd_loop;
351         peer->priv = (void *) prefs;
352         return 0;
353
354 arg_fail:
355         custom_peer_usage();
356 lfsr_fail:
357         free(prefs->lfsr);
358 tm_fail:
359         free(prefs->total_tm);
360         free(prefs->sub_tm);
361         free(prefs->get_tm);
362         free(prefs->rec_tm);
363         free(prefs);
364         return -1;
365 }
366
367
368 static int send_request(struct peerd *peer, struct bench *prefs)
369 {
370         struct xseg_request *req;
371         struct xseg *xseg = peer->xseg;
372         struct peer_req *pr;
373         xport srcport = prefs->src_port;
374         xport dstport = prefs->dst_port;
375         xport p;
376
377         int r;
378         uint64_t new;
379         uint64_t size = prefs->bs;
380
381         //srcport and dstport must already be provided by the user.
382         //returns struct xseg_request with basic initializations
383         XSEGLOG2(&lc, D, "Get new request\n");
384         timer_start(prefs, prefs->get_tm);
385         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
386         if (!req) {
387                 XSEGLOG2(&lc, W, "Cannot get request\n");
388                 return -1;
389         }
390         timer_stop(prefs, prefs->get_tm, NULL);
391
392         //Allocate enough space for the data and the target's name
393         XSEGLOG2(&lc, D, "Prepare new request\n");
394         r = xseg_prep_request(xseg, req, TARGETLEN, size);
395         if (r < 0) {
396                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
397                                 TARGETLEN, (unsigned long long)size);
398                 goto put_xseg_request;
399         }
400
401         //Determine what the next target/chunk will be, based on I/O pattern
402         new = determine_next(prefs);
403         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
404         //Create a target of this format: "bench-<global_id>-<obj_no>"
405         create_target(prefs, req, new);
406
407         if (prefs->op == X_WRITE || prefs->op == X_READ) {
408                 req->size = size;
409                 //Calculate the chunk offset inside the object
410                 req->offset = (new * prefs->bs) % prefs->os;
411                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
412
413                 if (prefs->op == X_WRITE)
414                         create_chunk(prefs, req, new);
415         }
416
417         req->op = prefs->op;
418
419         //Measure this?
420         XSEGLOG2(&lc, D, "Allocate peer request\n");
421         pr = alloc_peer_req(peer);
422         if (!pr) {
423                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
424                                 peer->nr_ops - xq_count(&peer->free_reqs));
425                 goto put_xseg_request;
426         }
427         pr->peer = peer;
428         pr->portno = srcport;
429         pr->req = req;
430         pr->priv = malloc(sizeof(struct timespec));
431         if (!pr->priv) {
432                 perror("malloc");
433                 goto put_peer_request;
434         }
435
436         //XSEGLOG2(&lc, D, "Set request data\n");
437         r = xseg_set_req_data(xseg, req, pr);
438         if (r < 0) {
439                 XSEGLOG2(&lc, W, "Cannot set request data\n");
440                 goto put_peer_request;
441         }
442
443         /*
444          * Start measuring receive time.
445          * When we receive a request, we need to have its submission time to
446          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
447          * QUESTION: Is this the fastest way?
448          */
449         timer_start(prefs, prefs->rec_tm);
450         if (prefs->rec_tm->insanity <= prefs->insanity)
451                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
452
453         //Submit the request from the source port to the target port
454         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
455         timer_start(prefs, prefs->sub_tm);
456         p = xseg_submit(xseg, req, srcport, X_ALLOC);
457         if (p == NoPort) {
458                 XSEGLOG2(&lc, W, "Cannot submit request\n");
459                 goto put_peer_request;
460         }
461         timer_stop(prefs, prefs->sub_tm, NULL);
462
463         //Send SIGIO to the process that has bound this port to inform that
464         //IO is possible
465         r = xseg_signal(xseg, p);
466         //if (r < 0)
467         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
468
469         return 0;
470
471 put_peer_request:
472         free(pr->priv);
473         free_peer_req(peer, pr);
474 put_xseg_request:
475         if (xseg_put_request(xseg, req, srcport))
476                 XSEGLOG2(&lc, W, "Cannot put request\n");
477         return -1;
478 }
479
480 /*
481  * This function substitutes the default generic_peerd_loop of peer.c.
482  * It's plugged to struct peerd at custom peer's initialisation
483  */
484 int custom_peerd_loop(void *arg)
485 {
486 #ifdef MT
487         struct thread *t = (struct thread *) arg;
488         struct peerd *peer = t->peer;
489         char *id = t->arg;
490 #else
491         struct peerd *peer = (struct peerd *) arg;
492         char id[4] = {'P','e','e','r'};
493 #endif
494         struct xseg *xseg = peer->xseg;
495         struct bench *prefs = peer->priv;
496         xport portno_start = peer->portno_start;
497         xport portno_end = peer->portno_end;
498         uint64_t threshold=1000/(1 + portno_end - portno_start);
499         pid_t pid = syscall(SYS_gettid);
500         int r;
501         uint64_t loops;
502
503         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
504         xseg_init_local_signal(xseg, peer->portno_start);
505
506         timer_start(prefs, prefs->total_tm);
507 send_request:
508         while (!isTerminate()) {
509 #ifdef MT
510                 if (t->func) {
511                         XSEGLOG2(&lc, D, "%s executes function\n", id);
512                         xseg_cancel_wait(xseg, peer->portno_start);
513                         t->func(t->arg);
514                         t->func = NULL;
515                         t->arg = NULL;
516                         continue;
517                 }
518 #endif
519                 while (CAN_SEND_REQUEST(prefs)) {
520                         xseg_cancel_wait(xseg, peer->portno_start);
521                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
522                                         prefs->sub_tm->completed - prefs->rec_tm->completed,
523                                         prefs->iodepth, prefs->sub_tm->completed,
524                                         prefs->max_requests);
525                         XSEGLOG2(&lc, D, "Start sending new request\n");
526                         r = send_request(peer, prefs);
527                         if (r < 0)
528                                 break;
529                 }
530                 //Heart of peerd_loop. This loop is common for everyone.
531                 for (loops = threshold; loops > 0; loops--) {
532                         if (loops == 1)
533                                 xseg_prepare_wait(xseg, peer->portno_start);
534
535                         if (check_ports(peer)) {
536                                 //If an old request has just been acked, the most sensible
537                                 //thing to do is to immediately send a new one
538                                 if (prefs->rec_tm->completed < prefs->max_requests)
539                                         goto send_request;
540                                 else
541                                         return 0;
542                         }
543                 }
544                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
545                 //struct xq *q;
546                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
547                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
548                 //              id, xq_count(q));
549                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
550 #ifdef ST_THREADS
551                 if (ta){
552                         st_sleep(0);
553                         continue;
554                 }
555 #endif
556                 xseg_wait_signal(xseg, 10000000UL);
557                 xseg_cancel_wait(xseg, peer->portno_start);
558                 XSEGLOG2(&lc, I, "%s woke up\n", id);
559         }
560
561         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
562                         xq_count(&peer->free_reqs), peer->nr_ops);
563         return 0;
564 }
565
566 void custom_peer_finalize(struct peerd *peer)
567 {
568         struct bench *prefs = peer->priv;
569         //TODO: Measure mean time, standard variation
570
571         if (!prefs->total_tm->completed)
572                 timer_stop(prefs, prefs->total_tm, NULL);
573
574         print_stats(prefs);
575         print_res(prefs, prefs->total_tm, "Total Requests");
576         return;
577 }
578
579
580 static void handle_received(struct peerd *peer, struct peer_req *pr)
581 {
582         //FIXME: handle null pointer
583         struct bench *prefs = peer->priv;
584         struct timer *rec = prefs->rec_tm;
585
586         if (!pr->req) {
587                 //This is a serious error, so we must stop
588                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
589                 terminated++;
590                 return;
591         }
592
593         if (!pr->priv) {
594                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
595                 return;
596         }
597
598         timer_stop(prefs, rec, pr->priv);
599
600         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
601                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
602
603         //QUESTION, can't we just keep the malloced memory for future use?
604         free(pr->priv);
605         free_peer_req(peer, pr);
606 }
607
608 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
609                 enum dispatch_reason reason)
610 {
611         switch (reason) {
612                 case dispatch_accept:
613                         //This is wrong, benchmarking peer should not accept requests,
614                         //only receive them.
615                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
616                         complete(peer, pr);
617                         break;
618                 case dispatch_receive:
619                         handle_received(peer, pr);
620                         break;
621                 default:
622                         fail(peer, pr);
623         }
624         return 0;
625 }