Add skeleton for verification in write path
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN];
52
53 /*
54  * This macro checks two things:
55  * a) If in-flight requests are less than given iodepth
56  * b) If we have submitted all of the requests
57  */
58 #define CAN_SEND_REQUEST(prefs) \
59         prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
60         prefs->sub_tm->completed < prefs->max_requests  \
61
62 void custom_peer_usage()
63 {
64         fprintf(stderr, "Custom peer options: \n"
65                         "  --------------------------------------------\n"
66                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67                         "    --pattern | None    | I/O pattern [seq|rand]\n"
68                         "    --verify  | no      | Verify written requests [no|meta|hash]\n"
69                         "    -to       | None    | Total objects (not for read/write)\n"
70                         "    -ts       | None    | Total I/O size\n"
71                         "    -os       | 4M      | Object size\n"
72                         "    -bs       | 4k      | Block size\n"
73                         "    -tp       | None    | Target port\n"
74                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
75                         "    --seed    | None    | Initialize LFSR and target names\n"
76                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77                         "              |         |     [sane|eccentric|manic|paranoid]\n"
78                         "\n");
79 }
80
81 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82 {
83         struct bench *prefs;
84         char total_objects[MAX_ARG_LEN + 1];
85         char total_size[MAX_ARG_LEN + 1];
86         char object_size[MAX_ARG_LEN + 1];
87         char block_size[MAX_ARG_LEN + 1];
88         char op[MAX_ARG_LEN + 1];
89         char pattern[MAX_ARG_LEN + 1];
90         char insanity[MAX_ARG_LEN + 1];
91         char verify[MAX_ARG_LEN + 1];
92         struct xseg *xseg = peer->xseg;
93         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
94         long iodepth = -1;
95         long dst_port = -1;
96         unsigned long seed = -1;
97         struct timespec timer_seed;
98         int set_by_hand = 0;
99         int r;
100
101         op[0] = 0;
102         pattern[0] = 0;
103         total_objects[0] = 0;
104         total_size[0] = 0;
105         block_size[0] = 0;
106         object_size[0] = 0;
107         insanity[0] = 0;
108         verify[0] = 0;
109
110 #ifdef MT
111         for (i = 0; i < nr_threads; i++) {
112                 prefs = peer->thread[i]->priv;
113                 prefs = malloc(sizeof(struct bench));
114                 if (!prefs) {
115                         perror("malloc");
116                         return -1;
117                 }
118         }
119 #endif
120         prefs = malloc(sizeof(struct bench));
121         if (!prefs) {
122                 perror("malloc");
123                 return -1;
124         }
125         prefs->flags = 0;
126
127         //Begin reading the benchmark-specific arguments
128         BEGIN_READ_ARGS(argc, argv);
129         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
130         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
131         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
132         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
133         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
134         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
135         READ_ARG_ULONG("--iodepth", iodepth);
136         READ_ARG_ULONG("-tp", dst_port);
137         READ_ARG_ULONG("--seed", seed);
138         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
139         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
140         END_READ_ARGS();
141
142         /*****************************\
143          * Check I/O type parameters *
144         \*****************************/
145
146         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
147         //The I/O pattern of these operations can be either sequential (seq) or
148         //random (rand)
149         if (!op[0]) {
150                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
151                 goto arg_fail;
152         }
153         r = read_op(op);
154         if (r < 0) {
155                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
156                 goto arg_fail;
157         }
158         prefs->op = r;
159
160         if (!pattern[0]) {
161                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
162                 goto arg_fail;
163         }
164         r = read_pattern(pattern);
165         if (r < 0) {
166                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
167                 goto arg_fail;
168         }
169         prefs->flags |= (uint8_t)r;
170
171         if (!verify[0])
172                 strcpy(verify, "no");
173         r = read_verify(verify);
174         if (r < 0) {
175                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
176                 goto arg_fail;
177         }
178
179         //Default iodepth value is 1
180         if (iodepth < 0)
181                 prefs->iodepth = 1;
182         else
183                 prefs->iodepth = iodepth;
184
185         /**************************\
186          * Check timer parameters *
187         \**************************/
188
189         //Most of the times, not all timers need to be used.
190         //We can choose which timers will be used by adjusting the "insanity"
191         //level of the benchmark i.e. the obscurity of code paths (get request,
192         //submit request) that will be timed.
193         if (!insanity[0])
194                 strcpy(insanity, "sane");
195
196         prefs->insanity = read_insanity(insanity);
197         if (prefs->insanity < 0) {
198                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
199                 goto arg_fail;
200         }
201
202         /*
203          * If we have a request other than read/write, we don't need to check
204          * about size parameters, but only how many objects we want to affect
205          */
206         if (prefs->op != X_READ && prefs->op != X_WRITE) {
207
208                 /***************************\
209                  * Check object parameters *
210                 \***************************/
211
212                 if (!total_objects[0]) {
213                         XSEGLOG2(&lc, E,
214                                         "Total number of objects needs to be supplied\n");
215                         goto arg_fail;
216                 }
217                 prefs->to = str2num(total_objects);
218                 if (!prefs->to) {
219                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
220                         goto arg_fail;
221                 }
222
223                 //In this case, the maximum number of requests is the total number of
224                 //objects we will handle
225                 prefs->max_requests = prefs->to;
226         } else {
227
228                 /*************************\
229                  * Check size parameters *
230                 \*************************/
231
232                 //Block size (bs): Defaults to 4K.
233                 //It must be a number followed by one of these characters:
234                 //                                              [k|K|m|M|g|G]
235                 //If not, it will be considered as size in bytes.
236                 //Must be integer multiple of segment's page size (typically 4k).
237                 if (!block_size[0])
238                         strcpy(block_size,"4k");
239
240                 prefs->bs = str2num(block_size);
241                 if (!prefs->bs) {
242                         XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
243                         goto arg_fail;
244                 } else if (prefs->bs % xseg_page_size) {
245                         XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
246                         goto arg_fail;
247                 }
248
249                 //Total I/O size (ts): Must be supplied by user.
250                 //Must have the same format as "total size"
251                 //Must be integer multiple of "block size"
252                 if (!total_size[0]) {
253                         XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
254                         goto arg_fail;
255                 }
256
257                 prefs->ts = str2num(total_size);
258                 if (!prefs->ts) {
259                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
260                         goto arg_fail;
261                 } else if (prefs->ts % prefs->bs) {
262                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
263                         goto arg_fail;
264                 } else if (prefs->ts > xseg->segment_size) {
265                         XSEGLOG2(&lc, E,
266                                         "Total I/O size exceeds segment size\n", total_size);
267                         goto arg_fail;
268                 }
269
270                 //Object size (os): Defaults to 4M.
271                 //Must have the same format as "total size"
272                 //Must be integer multiple of "block size"
273                 if (!object_size[0])
274                         strcpy(object_size,"4M");
275
276                 prefs->os = str2num(object_size);
277                 if (!prefs->os) {
278                         XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
279                         goto arg_fail;
280                 } else if (prefs->os % prefs->bs) {
281                         XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
282                         goto arg_fail;
283                 }
284
285                 //In this case, the maximum number of requests is the number of blocks
286                 //we need to cover the total I/O size
287                 prefs->max_requests = prefs->ts / prefs->bs;
288         }
289
290         /*************************\
291          * Check port parameters *
292         \*************************/
293
294         if (dst_port < 0){
295                 XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
296                 goto arg_fail;
297         }
298
299         prefs->src_port = peer->portno_start; //TODO: allow user to change this
300         prefs->dst_port = (xport) dst_port;
301
302         /*********************************\
303          * Create timers for all metrics *
304         \*********************************/
305
306         if (init_timer(&prefs->total_tm, TM_SANE))
307                 goto tm_fail;
308         if (init_timer(&prefs->sub_tm, TM_MANIC))
309                 goto tm_fail;
310         if (init_timer(&prefs->get_tm, TM_PARANOID))
311                 goto tm_fail;
312         if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
313                 goto tm_fail;
314
315         /********************************\
316          * Customize struct peerd/prefs *
317         \********************************/
318
319         prefs->peer = peer;
320
321 reseed:
322         //We proceed to initialise the global_id, and seed variables.
323         if (seed == -1) {
324                 clock_gettime(CLOCK_MONOTONIC_RAW, &timer_seed);
325                 seed = timer_seed.tv_nsec;
326         } else {
327                 set_by_hand = 1;
328         }
329         create_id(seed);
330
331         if ((prefs->flags & (1 << PATTERN_FLAG)) == IO_RAND) {
332                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
333                 if (!prefs->lfsr) {
334                         perror("malloc");
335                         goto lfsr_fail;
336                 }
337
338                 r = lfsr_init(prefs->lfsr, prefs->max_requests, seed, seed & 0xF);
339                 if (r && set_by_hand) {
340                         XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
341                         goto lfsr_fail;
342                 } else if (r) {
343                         seed = -1;
344                         goto reseed;
345                 }
346         }
347         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
348
349         peer->peerd_loop = custom_peerd_loop;
350         peer->priv = (void *) prefs;
351         return 0;
352
353 arg_fail:
354         custom_peer_usage();
355 lfsr_fail:
356         free(prefs->lfsr);
357 tm_fail:
358         free(prefs->total_tm);
359         free(prefs->sub_tm);
360         free(prefs->get_tm);
361         free(prefs->rec_tm);
362         free(prefs);
363         return -1;
364 }
365
366
367 static int send_request(struct peerd *peer, struct bench *prefs)
368 {
369         struct xseg_request *req;
370         struct xseg *xseg = peer->xseg;
371         struct peer_req *pr;
372         xport srcport = prefs->src_port;
373         xport dstport = prefs->dst_port;
374         xport p;
375
376         int r;
377         uint64_t new;
378         uint64_t size = prefs->bs;
379
380         //srcport and dstport must already be provided by the user.
381         //returns struct xseg_request with basic initializations
382         XSEGLOG2(&lc, D, "Get new request\n");
383         timer_start(prefs, prefs->get_tm);
384         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
385         if (!req) {
386                 XSEGLOG2(&lc, W, "Cannot get request\n");
387                 return -1;
388         }
389         timer_stop(prefs, prefs->get_tm, NULL);
390
391         //Allocate enough space for the data and the target's name
392         XSEGLOG2(&lc, D, "Prepare new request\n");
393         r = xseg_prep_request(xseg, req, TARGETLEN, size);
394         if (r < 0) {
395                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
396                                 TARGETLEN, (unsigned long long)size);
397                 goto put_xseg_request;
398         }
399
400         //Determine what the next target/chunk will be, based on I/O pattern
401         new = determine_next(prefs);
402         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
403         //Create a target of this format: "bench-<global_id>-<obj_no>"
404         create_target(prefs, req, new);
405
406         if (prefs->op == X_WRITE || prefs->op == X_READ) {
407                 req->size = size;
408                 //Calculate the chunk offset inside the object
409                 req->offset = (new * prefs->bs) % prefs->os;
410                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
411
412                 if (prefs->op == X_WRITE)
413                         create_chunk(prefs, req, new);
414         }
415
416         req->op = prefs->op;
417
418         //Measure this?
419         XSEGLOG2(&lc, D, "Allocate peer request\n");
420         pr = alloc_peer_req(peer);
421         if (!pr) {
422                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
423                                 peer->nr_ops - xq_count(&peer->free_reqs));
424                 goto put_xseg_request;
425         }
426         pr->peer = peer;
427         pr->portno = srcport;
428         pr->req = req;
429         pr->priv = malloc(sizeof(struct timespec));
430         if (!pr->priv) {
431                 perror("malloc");
432                 goto put_peer_request;
433         }
434
435         //XSEGLOG2(&lc, D, "Set request data\n");
436         r = xseg_set_req_data(xseg, req, pr);
437         if (r < 0) {
438                 XSEGLOG2(&lc, W, "Cannot set request data\n");
439                 goto put_peer_request;
440         }
441
442         /*
443          * Start measuring receive time.
444          * When we receive a request, we need to have its submission time to
445          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
446          * QUESTION: Is this the fastest way?
447          */
448         timer_start(prefs, prefs->rec_tm);
449         if (prefs->rec_tm->insanity <= prefs->insanity)
450                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
451
452         //Submit the request from the source port to the target port
453         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
454         timer_start(prefs, prefs->sub_tm);
455         p = xseg_submit(xseg, req, srcport, X_ALLOC);
456         if (p == NoPort) {
457                 XSEGLOG2(&lc, W, "Cannot submit request\n");
458                 goto put_peer_request;
459         }
460         timer_stop(prefs, prefs->sub_tm, NULL);
461
462         //Send SIGIO to the process that has bound this port to inform that
463         //IO is possible
464         r = xseg_signal(xseg, p);
465         //if (r < 0)
466         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
467
468         return 0;
469
470 put_peer_request:
471         free(pr->priv);
472         free_peer_req(peer, pr);
473 put_xseg_request:
474         if (xseg_put_request(xseg, req, srcport))
475                 XSEGLOG2(&lc, W, "Cannot put request\n");
476         return -1;
477 }
478
479 /*
480  * This function substitutes the default generic_peerd_loop of peer.c.
481  * It's plugged to struct peerd at custom peer's initialisation
482  */
483 int custom_peerd_loop(void *arg)
484 {
485 #ifdef MT
486         struct thread *t = (struct thread *) arg;
487         struct peerd *peer = t->peer;
488         char *id = t->arg;
489 #else
490         struct peerd *peer = (struct peerd *) arg;
491         char id[4] = {'P','e','e','r'};
492 #endif
493         struct xseg *xseg = peer->xseg;
494         struct bench *prefs = peer->priv;
495         xport portno_start = peer->portno_start;
496         xport portno_end = peer->portno_end;
497         uint64_t threshold=1000/(1 + portno_end - portno_start);
498         pid_t pid = syscall(SYS_gettid);
499         int r;
500         uint64_t loops;
501
502         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
503         xseg_init_local_signal(xseg, peer->portno_start);
504
505         timer_start(prefs, prefs->total_tm);
506 send_request:
507         while (!isTerminate()) {
508 #ifdef MT
509                 if (t->func) {
510                         XSEGLOG2(&lc, D, "%s executes function\n", id);
511                         xseg_cancel_wait(xseg, peer->portno_start);
512                         t->func(t->arg);
513                         t->func = NULL;
514                         t->arg = NULL;
515                         continue;
516                 }
517 #endif
518                 while (CAN_SEND_REQUEST(prefs)) {
519                         xseg_cancel_wait(xseg, peer->portno_start);
520                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
521                                         prefs->sub_tm->completed - prefs->rec_tm->completed,
522                                         prefs->iodepth, prefs->sub_tm->completed,
523                                         prefs->max_requests);
524                         XSEGLOG2(&lc, D, "Start sending new request\n");
525                         r = send_request(peer, prefs);
526                         if (r < 0)
527                                 break;
528                 }
529                 //Heart of peerd_loop. This loop is common for everyone.
530                 for (loops = threshold; loops > 0; loops--) {
531                         if (loops == 1)
532                                 xseg_prepare_wait(xseg, peer->portno_start);
533
534                         if (check_ports(peer)) {
535                                 //If an old request has just been acked, the most sensible
536                                 //thing to do is to immediately send a new one
537                                 if (prefs->rec_tm->completed < prefs->max_requests)
538                                         goto send_request;
539                                 else
540                                         return 0;
541                         }
542                 }
543                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
544                 //struct xq *q;
545                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
546                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
547                 //              id, xq_count(q));
548                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
549 #ifdef ST_THREADS
550                 if (ta){
551                         st_sleep(0);
552                         continue;
553                 }
554 #endif
555                 xseg_wait_signal(xseg, 10000000UL);
556                 xseg_cancel_wait(xseg, peer->portno_start);
557                 XSEGLOG2(&lc, I, "%s woke up\n", id);
558         }
559
560         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
561                         xq_count(&peer->free_reqs), peer->nr_ops);
562         return 0;
563 }
564
565 void custom_peer_finalize(struct peerd *peer)
566 {
567         struct bench *prefs = peer->priv;
568         //TODO: Measure mean time, standard variation
569
570         if (!prefs->total_tm->completed)
571                 timer_stop(prefs, prefs->total_tm, NULL);
572
573         print_stats(prefs);
574         print_res(prefs, prefs->total_tm, "Total Requests");
575         return;
576 }
577
578
579 static void handle_received(struct peerd *peer, struct peer_req *pr)
580 {
581         //FIXME: handle null pointer
582         struct bench *prefs = peer->priv;
583         struct timer *rec = prefs->rec_tm;
584
585         if (!pr->req) {
586                 //This is a serious error, so we must stop
587                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
588                 terminated++;
589                 return;
590         }
591
592         if (!pr->priv) {
593                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
594                 return;
595         }
596
597         timer_stop(prefs, rec, pr->priv);
598
599         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
600                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
601
602         //QUESTION, can't we just keep the malloced memory for future use?
603         free(pr->priv);
604         free_peer_req(peer, pr);
605 }
606
607 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
608                 enum dispatch_reason reason)
609 {
610         switch (reason) {
611                 case dispatch_accept:
612                         //This is wrong, benchmarking peer should not accept requests,
613                         //only receive them.
614                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
615                         complete(peer, pr);
616                         break;
617                 case dispatch_receive:
618                         handle_received(peer, pr);
619                         break;
620                 default:
621                         fail(peer, pr);
622         }
623         return 0;
624 }