Fix LFSR behavior
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <limits.h>
49
50 char global_id[IDLEN];
51 uint64_t global_seed;
52
53 /*
54  * This function checks two things:
55  * a) If in-flight requests are less than given iodepth
56  * b) If we have submitted al of the requests
57  */
58 #define CAN_SEND_REQUEST(prefs) \
59         prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
60         prefs->sub_tm->completed < prefs->max_requests  \
61
62 void custom_peer_usage()
63 {
64         fprintf(stderr, "Custom peer options: \n"
65                         "  --------------------------------------------\n"
66                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67                         "    --pattern | None    | I/O pattern [seq|rand]\n"
68                         "    -to       | None    | Total objects (not for read/write)\n"
69                         "    -ts       | None    | Total I/O size\n"
70                         "    -os       | 4M      | Object size\n"
71                         "    -bs       | 4k      | Block size\n"
72                         "    -dp       | None    | Destination port\n"
73                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
74                         "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75                         "    --seed    | None    | Inititalize LFSR and target names\n"
76                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77                         "              |         |     [sane|eccentric|manic|paranoid]\n"
78                         "\n");
79 }
80
81 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82 {
83         struct bench *prefs;
84         char total_objects[MAX_ARG_LEN + 1];
85         char total_size[MAX_ARG_LEN + 1];
86         char object_size[MAX_ARG_LEN + 1];
87         char block_size[MAX_ARG_LEN + 1];
88         char op[MAX_ARG_LEN + 1];
89         char pattern[MAX_ARG_LEN + 1];
90         char insanity[MAX_ARG_LEN + 1];
91         struct xseg *xseg = peer->xseg;
92         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
93         long dst_port = -1;
94         int r;
95
96         op[0] = 0;
97         pattern[0] = 0;
98         total_objects[0] = 0;
99         total_size[0] = 0;
100         block_size[0] = 0;
101         object_size[0] = 0;
102         insanity[0] = 0;
103
104 #ifdef MT
105         for (i = 0; i < nr_threads; i++) {
106                 prefs = peer->thread[i]->priv;
107                 prefs = malloc(sizeof(struct bench));
108                 if (!prefs) {
109                         perror("malloc");
110                         return -1;
111                 }
112         }
113 #endif
114         prefs = malloc(sizeof(struct bench));
115         if (!prefs) {
116                 perror("malloc");
117                 return -1;
118         }
119         prefs->flags = 0;
120
121         //Begin reading the benchmark-specific arguments
122         BEGIN_READ_ARGS(argc, argv);
123         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
124         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
125         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
126         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
127         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
128         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
129         READ_ARG_ULONG("--iodepth", prefs->iodepth);
130         READ_ARG_ULONG("-dp", dst_port);
131         READ_ARG_STRING("--insanity", block_size, MAX_ARG_LEN);
132         END_READ_ARGS();
133
134         /*****************************\
135          * Check I/O type parameters *
136         \*****************************/
137
138         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
139         //The I/O pattern of thesee operations can be either synchronous (sync) or
140         //random (rand)
141         if (!op[0]) {
142                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
143                 goto arg_fail;
144         }
145         r = read_op(op);
146         if (r < 0) {
147                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
148                 goto arg_fail;
149         }
150         prefs->op = r;
151
152         if (!pattern[0]) {
153                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
154                 goto arg_fail;
155         }
156         r = read_pattern(pattern);
157         if (r < 0) {
158                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
159                 goto arg_fail;
160         }
161         prefs->flags |= (uint8_t)r;
162
163         /**************************\
164          * Check timer parameters *
165         \**************************/
166
167         //Most of the times, not all timers need to be used.
168         //We can choose which timers will be used by adjusting the "insanity"
169         //level of the benchmark i.e. the obscurity of code paths (get request,
170         //submit request) that will be timed.
171         if (!insanity[0])
172                 strcpy(insanity, "sane");
173
174         prefs->insanity = read_insanity(insanity);
175         if (prefs->insanity < 0) {
176                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
177                 goto arg_fail;
178         }
179
180         /*
181          * If we have a request other than read/write, we don't need to check
182          * about size parameters, but only how many objects we want to affect
183          */
184         if (prefs->op != X_READ && prefs->op != X_WRITE) {
185
186                 /***************************\
187                  * Check object parameters *
188                 \***************************/
189
190                 if (!total_objects[0]) {
191                         XSEGLOG2(&lc, E,
192                                         "Total number of objects needs to be supplied\n");
193                         goto arg_fail;
194                 }
195                 prefs->to = str2num(total_objects);
196                 if (!prefs->to) {
197                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
198                         goto arg_fail;
199                 }
200
201                 //In this case, the maximum number of requests is the total number of
202                 //objects we will handle
203                 prefs->max_requests = prefs->to;
204         } else {
205
206                 /*************************\
207                  * Check size parameters *
208                 \*************************/
209
210                 //Block size (bs): Defaults to 4K.
211                 //It must be a number followed by one of these characters:
212                 //                                              [k|K|m|M|g|G]
213                 //If not, it will be considered as size in bytes.
214                 //Must be integer multiple of segment's page size (typically 4k).
215                 if (!block_size[0])
216                         strcpy(block_size,"4k");
217
218                 if (!prefs->iodepth)
219                         prefs->iodepth = 1;
220
221                 prefs->bs = str2num(block_size);
222                 if (!prefs->bs) {
223                         XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
224                         goto arg_fail;
225                 } else if (prefs->bs % xseg_page_size) {
226                         XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
227                         goto arg_fail;
228                 }
229
230                 //Total I/O size (ts): Must be supplied by user.
231                 //Must have the same format as "total size"
232                 //Must be integer multiple of "block size"
233                 if (!total_size[0]) {
234                         XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
235                         goto arg_fail;
236                 }
237
238                 prefs->ts = str2num(total_size);
239                 if (!prefs->ts) {
240                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
241                         goto arg_fail;
242                 } else if (prefs->ts % prefs->bs) {
243                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
244                         goto arg_fail;
245                 } else if (prefs->ts > xseg->segment_size) {
246                         XSEGLOG2(&lc, E,
247                                         "Total I/O size exceeds segment size\n", total_size);
248                         goto arg_fail;
249                 }
250
251                 //Object size (os): Defaults to 4M.
252                 //Must have the same format as "total size"
253                 //Must be integer multiple of "block size"
254                 if (!object_size[0])
255                         strcpy(object_size,"4M");
256
257                 prefs->os = str2num(object_size);
258                 if (!prefs->os) {
259                         XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
260                         goto arg_fail;
261                 } else if (prefs->os % prefs->bs) {
262                         XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
263                         goto arg_fail;
264                 }
265
266                 //In this case, the maximum number of requests is the number of blocks
267                 //we need to cover the total I/O size
268                 prefs->max_requests = prefs->ts / prefs->bs;
269         }
270
271         /*************************
272          * Check port parameters *
273          *************************/
274
275         if (dst_port < 0){
276                 XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
277                 goto arg_fail;
278         }
279
280         prefs->src_port = peer->portno_start; //TODO: allow user to change this
281         prefs->dst_port = (xport) dst_port;
282
283         /*********************************
284          * Create timers for all metrics *
285          *********************************/
286
287         if (init_timer(&prefs->total_tm, TM_SANE))
288                 goto tm_fail;
289         if (init_timer(&prefs->sub_tm, TM_MANIC))
290                 goto tm_fail;
291         if (init_timer(&prefs->get_tm, TM_PARANOID))
292                 goto tm_fail;
293         if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
294                 goto tm_fail;
295
296         /********************************\
297          * Customize struct peerd/prefs *
298          \********************************/
299
300         prefs->peer = peer;
301
302         //The following function initializes the global_id, global_seed extern
303         //variables.
304         create_id();
305
306         if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
307                 prefs->lfsr = malloc(sizeof(struct lfsr));
308                 if (!prefs->lfsr) {
309                         perror("malloc");
310                         goto lfsr_fail;
311                 }
312                 //FIXME: Give a name to max requests, not just prefs->ts / prefs->bs
313                 //FIXME: handle better the seed passing than just giving UINT64_MAX
314                 if (lfsr_init(prefs->lfsr, prefs->max_requests, UINT64_MAX)) {
315                         XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
316                         goto lfsr_fail;
317                 }
318         }
319
320         peer->peerd_loop = custom_peerd_loop;
321         peer->priv = (void *) prefs;
322         return 0;
323
324 arg_fail:
325         custom_peer_usage();
326 lfsr_fail:
327         free(prefs->lfsr);
328 tm_fail:
329         free(prefs->total_tm);
330         free(prefs->sub_tm);
331         free(prefs->get_tm);
332         free(prefs->rec_tm);
333         free(prefs);
334         return -1;
335 }
336
337
338 static int send_request(struct peerd *peer, struct bench *prefs)
339 {
340         struct xseg_request *req;
341         struct xseg *xseg = peer->xseg;
342         struct peer_req *pr;
343         xport srcport = prefs->src_port;
344         xport dstport = prefs->dst_port;
345         xport p;
346
347         int r;
348         uint64_t new;
349         uint64_t size = prefs->bs;
350
351         //srcport and dstport must already be provided by the user.
352         //returns struct xseg_request with basic initializations
353         //XSEGLOG2(&lc, D, "Get new request\n");
354         timer_start(prefs, prefs->get_tm);
355         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
356         if (!req) {
357                 XSEGLOG2(&lc, W, "Cannot get request\n");
358                 return -1;
359         }
360         timer_stop(prefs, prefs->get_tm, NULL);
361
362         //Allocate enough space for the data and the target's name
363         //XSEGLOG2(&lc, D, "Prepare new request\n");
364         r = xseg_prep_request(xseg, req, TARGETLEN, size);
365         if (r < 0) {
366                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
367                                 TARGETLEN, (unsigned long long)size);
368                 goto put_xseg_request;
369         }
370
371         //Determine what the next target/chunk will be, based on I/O pattern
372         new = determine_next(prefs);
373         XSEGLOG2(&lc, D, "Our new request is %lu\n", new);
374         //Create a target of this format: "bench-<obj_no>"
375         create_target(prefs, req, new);
376
377         if(prefs->op == X_WRITE || prefs->op == X_READ) {
378                 req->size = size;
379                 //Calculate the chunk offset inside the object
380                 req->offset = (new * prefs->bs) % prefs->os;
381                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
382
383                 if(prefs->op == X_WRITE)
384                         create_chunk(prefs, req, new);
385         }
386
387         req->op = prefs->op;
388
389         //Measure this?
390         //XSEGLOG2(&lc, D, "Allocate peer request\n");
391         pr = alloc_peer_req(peer);
392         if (!pr) {
393                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
394                                 peer->nr_ops - xq_count(&peer->free_reqs));
395                 goto put_xseg_request;
396         }
397         pr->peer = peer;
398         pr->portno = srcport;
399         pr->req = req;
400         pr->priv = malloc(sizeof(struct timespec));
401         if(!pr->priv) {
402                 perror("malloc");
403                 goto put_peer_request;
404         }
405
406         //XSEGLOG2(&lc, D, "Set request data\n");
407         r = xseg_set_req_data(xseg, req, pr);
408         if (r < 0) {
409                 XSEGLOG2(&lc, W, "Cannot set request data\n");
410                 goto put_peer_request;
411         }
412
413         /*
414          * Start measuring receive time.
415          * When we receive a request, we need to have its submission time to
416          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
417          * QUESTION: Is this the fastest way?
418          */
419         timer_start(prefs, prefs->rec_tm);
420         memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
421
422         //Submit the request from the source port to the target port
423         //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
424         //QUESTION: Can't we just use the submision time calculated previously?
425         timer_start(prefs, prefs->sub_tm);
426         p = xseg_submit(xseg, req, srcport, X_ALLOC);
427         if (p == NoPort) {
428                 XSEGLOG2(&lc, W, "Cannot submit request\n");
429                 goto put_peer_request;
430         }
431         timer_stop(prefs, prefs->sub_tm, NULL);
432
433         //Send SIGIO to the process that has binded this port to inform that
434         //IO is possible
435         xseg_signal(xseg, p);
436
437         return 0;
438
439 put_peer_request:
440         free(pr->priv);
441         free_peer_req(peer, pr);
442 put_xseg_request:
443         if (xseg_put_request(xseg, req, srcport))
444                 XSEGLOG2(&lc, W, "Cannot put request\n");
445         return -1;
446 }
447
448 /*
449  * This function substitutes the default generic_peerd_loop of peer.c.
450  * It's plugged to struct peerd at custom peer's initialisation
451  */
452 int custom_peerd_loop(void *arg)
453 {
454 #ifdef MT
455         struct thread *t = (struct thread *) arg;
456         struct peerd *peer = t->peer;
457         char *id = t->arg;
458 #else
459         struct peerd *peer = (struct peerd *) arg;
460         char id[4] = {'P','e','e','r'};
461 #endif
462         struct xseg *xseg = peer->xseg;
463         struct bench *prefs = peer->priv;
464         xport portno_start = peer->portno_start;
465         xport portno_end = peer->portno_end;
466         uint64_t threshold=1000/(1 + portno_end - portno_start);
467         pid_t pid =syscall(SYS_gettid);
468         int r;
469
470
471         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
472         xseg_init_local_signal(xseg, peer->portno_start);
473         uint64_t loops;
474
475         timer_start(prefs, prefs->total_tm);
476         while (!isTerminate()) {
477 #ifdef MT
478                 if (t->func) {
479                         XSEGLOG2(&lc, D, "%s executes function\n", id);
480                         xseg_cancel_wait(xseg, peer->portno_start);
481                         t->func(t->arg);
482                         t->func = NULL;
483                         t->arg = NULL;
484                         continue;
485                 }
486 #endif
487 send_request:
488                 while (CAN_SEND_REQUEST(prefs)) {
489                         XSEGLOG2(&lc, D, "Because %lu < %lu && %lu < %lu\n",
490                                         prefs->sub_tm->completed - prefs->rec_tm->completed,
491                                         prefs->iodepth, prefs->sub_tm->completed,
492                                         prefs->max_requests);
493                         xseg_cancel_wait(xseg, peer->portno_start);
494                         XSEGLOG2(&lc, D, "Start sending new request\n");
495                         r = send_request(peer, prefs);
496                         if (r < 0)
497                                 break;
498                 }
499                 //Heart of peerd_loop. This loop is common for everyone.
500                 for (loops = threshold; loops > 0; loops--) {
501                         if (check_ports(peer)) {
502                                 if (prefs->max_requests == prefs->rec_tm->completed)
503                                         return 0;
504                                 else
505                                         //If an old request has just been acked, the most sensible
506                                         //thing to do is to immediately send a new one
507                                         goto send_request;
508                         }
509                 }
510                 XSEGLOG2(&lc, I, "%s goes to sleep\n",id);
511                 xseg_prepare_wait(xseg, peer->portno_start);
512 #ifdef ST_THREADS
513                 if (ta){
514                         st_sleep(0);
515                         continue;
516                 }
517 #endif
518                 xseg_wait_signal(xseg, 10000000UL);
519                 xseg_cancel_wait(xseg, peer->portno_start);
520                 XSEGLOG2(&lc, I, "%s woke up\n", id);
521         }
522
523         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
524                         xq_count(&peer->free_reqs), peer->nr_ops);
525         return 0;
526 }
527
528 void custom_peer_finalize(struct peerd *peer)
529 {
530         struct bench *prefs = peer->priv;
531         //TODO: Measure mean time, standard variation
532         struct tm_result total; //mean, std;
533
534         if (!prefs->total_tm->completed)
535                 timer_stop(prefs, prefs->total_tm, NULL);
536
537         separate_by_order(prefs->total_tm->sum, &total);
538         print_res(total, "Total Time");
539         return;
540 }
541
542
543 static void handle_received(struct peerd *peer, struct peer_req *pr)
544 {
545         //FIXME: handle null pointer
546         struct bench *prefs = peer->priv;
547         struct timer *rec = prefs->rec_tm;
548
549         if (!pr->req) {
550                 //This is a serious error, so we must stop
551                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
552                 terminated++;
553                 return;
554         }
555
556         if (!pr->priv) {
557                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
558                 return;
559         }
560
561         timer_stop(prefs, rec, pr->priv);
562
563         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
564                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
565
566         //QUESTION, can't we just keep the malloced memory for future use?
567         free(pr->priv);
568         free_peer_req(peer, pr);
569 }
570
571 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
572                 enum dispatch_reason reason)
573 {
574         switch (reason) {
575                 case dispatch_accept:
576                         //This is wrong, benchmarking peer should not accept requests,
577                         //only receive them.
578                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
579                         complete(peer, pr);
580                         break;
581                 case dispatch_receive:
582                         handle_received(peer, pr);
583                         break;
584                 default:
585                         fail(peer, pr);
586         }
587         return 0;
588 }