Get correct iodepth
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <limits.h>
49
50 char global_id[IDLEN];
51 uint64_t global_seed;
52
53 /*
54  * This function checks two things:
55  * a) If in-flight requests are less than given iodepth
56  * b) If we have submitted al of the requests
57  */
58 #define CAN_SEND_REQUEST(prefs) \
59         prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
60         prefs->sub_tm->completed < prefs->max_requests  \
61
62 void custom_peer_usage()
63 {
64         fprintf(stderr, "Custom peer options: \n"
65                         "  --------------------------------------------\n"
66                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
67                         "    --pattern | None    | I/O pattern [seq|rand]\n"
68                         "    -to       | None    | Total objects (not for read/write)\n"
69                         "    -ts       | None    | Total I/O size\n"
70                         "    -os       | 4M      | Object size\n"
71                         "    -bs       | 4k      | Block size\n"
72                         "    -dp       | None    | Destination port\n"
73                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
74                         "    --verify  | no      | Verify written requests [no|meta|hash]\n"
75                         "    --seed    | None    | Inititalize LFSR and target names\n"
76                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
77                         "              |         |     [sane|eccentric|manic|paranoid]\n"
78                         "\n");
79 }
80
81 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
82 {
83         struct bench *prefs;
84         char total_objects[MAX_ARG_LEN + 1];
85         char total_size[MAX_ARG_LEN + 1];
86         char object_size[MAX_ARG_LEN + 1];
87         char block_size[MAX_ARG_LEN + 1];
88         char op[MAX_ARG_LEN + 1];
89         char pattern[MAX_ARG_LEN + 1];
90         char insanity[MAX_ARG_LEN + 1];
91         struct xseg *xseg = peer->xseg;
92         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
93         long iodepth = -1;
94         long dst_port = -1;
95         int r;
96
97         op[0] = 0;
98         pattern[0] = 0;
99         total_objects[0] = 0;
100         total_size[0] = 0;
101         block_size[0] = 0;
102         object_size[0] = 0;
103         insanity[0] = 0;
104
105 #ifdef MT
106         for (i = 0; i < nr_threads; i++) {
107                 prefs = peer->thread[i]->priv;
108                 prefs = malloc(sizeof(struct bench));
109                 if (!prefs) {
110                         perror("malloc");
111                         return -1;
112                 }
113         }
114 #endif
115         prefs = malloc(sizeof(struct bench));
116         if (!prefs) {
117                 perror("malloc");
118                 return -1;
119         }
120         prefs->flags = 0;
121
122         //Begin reading the benchmark-specific arguments
123         BEGIN_READ_ARGS(argc, argv);
124         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
125         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
126         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
127         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
128         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
129         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
130         READ_ARG_ULONG("--iodepth", iodepth);
131         READ_ARG_ULONG("-dp", dst_port);
132         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
133         END_READ_ARGS();
134
135         /*****************************\
136          * Check I/O type parameters *
137         \*****************************/
138
139         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
140         //The I/O pattern of thesee operations can be either synchronous (sync) or
141         //random (rand)
142         if (!op[0]) {
143                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
144                 goto arg_fail;
145         }
146         r = read_op(op);
147         if (r < 0) {
148                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
149                 goto arg_fail;
150         }
151         prefs->op = r;
152
153         if (!pattern[0]) {
154                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
155                 goto arg_fail;
156         }
157         r = read_pattern(pattern);
158         if (r < 0) {
159                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
160                 goto arg_fail;
161         }
162         prefs->flags |= (uint8_t)r;
163
164         //Defailt iodepth value is 1
165         if (iodepth < -1)
166                 prefs->iodepth = 1;
167         else
168                 prefs->iodepth = iodepth;
169
170         /**************************\
171          * Check timer parameters *
172         \**************************/
173
174         //Most of the times, not all timers need to be used.
175         //We can choose which timers will be used by adjusting the "insanity"
176         //level of the benchmark i.e. the obscurity of code paths (get request,
177         //submit request) that will be timed.
178         if (!insanity[0])
179                 strcpy(insanity, "sane");
180
181         prefs->insanity = read_insanity(insanity);
182         if (prefs->insanity < 0) {
183                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
184                 goto arg_fail;
185         }
186
187         /*
188          * If we have a request other than read/write, we don't need to check
189          * about size parameters, but only how many objects we want to affect
190          */
191         if (prefs->op != X_READ && prefs->op != X_WRITE) {
192
193                 /***************************\
194                  * Check object parameters *
195                 \***************************/
196
197                 if (!total_objects[0]) {
198                         XSEGLOG2(&lc, E,
199                                         "Total number of objects needs to be supplied\n");
200                         goto arg_fail;
201                 }
202                 prefs->to = str2num(total_objects);
203                 if (!prefs->to) {
204                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
205                         goto arg_fail;
206                 }
207
208                 //In this case, the maximum number of requests is the total number of
209                 //objects we will handle
210                 prefs->max_requests = prefs->to;
211         } else {
212
213                 /*************************\
214                  * Check size parameters *
215                 \*************************/
216
217                 //Block size (bs): Defaults to 4K.
218                 //It must be a number followed by one of these characters:
219                 //                                              [k|K|m|M|g|G]
220                 //If not, it will be considered as size in bytes.
221                 //Must be integer multiple of segment's page size (typically 4k).
222                 if (!block_size[0])
223                         strcpy(block_size,"4k");
224
225                 prefs->bs = str2num(block_size);
226                 if (!prefs->bs) {
227                         XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
228                         goto arg_fail;
229                 } else if (prefs->bs % xseg_page_size) {
230                         XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
231                         goto arg_fail;
232                 }
233
234                 //Total I/O size (ts): Must be supplied by user.
235                 //Must have the same format as "total size"
236                 //Must be integer multiple of "block size"
237                 if (!total_size[0]) {
238                         XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
239                         goto arg_fail;
240                 }
241
242                 prefs->ts = str2num(total_size);
243                 if (!prefs->ts) {
244                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
245                         goto arg_fail;
246                 } else if (prefs->ts % prefs->bs) {
247                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
248                         goto arg_fail;
249                 } else if (prefs->ts > xseg->segment_size) {
250                         XSEGLOG2(&lc, E,
251                                         "Total I/O size exceeds segment size\n", total_size);
252                         goto arg_fail;
253                 }
254
255                 //Object size (os): Defaults to 4M.
256                 //Must have the same format as "total size"
257                 //Must be integer multiple of "block size"
258                 if (!object_size[0])
259                         strcpy(object_size,"4M");
260
261                 prefs->os = str2num(object_size);
262                 if (!prefs->os) {
263                         XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
264                         goto arg_fail;
265                 } else if (prefs->os % prefs->bs) {
266                         XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
267                         goto arg_fail;
268                 }
269
270                 //In this case, the maximum number of requests is the number of blocks
271                 //we need to cover the total I/O size
272                 prefs->max_requests = prefs->ts / prefs->bs;
273         }
274
275         /*************************\
276          * Check port parameters *
277         \*************************/
278
279         if (dst_port < 0){
280                 XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
281                 goto arg_fail;
282         }
283
284         prefs->src_port = peer->portno_start; //TODO: allow user to change this
285         prefs->dst_port = (xport) dst_port;
286
287         /*********************************\
288          * Create timers for all metrics *
289         \*********************************/
290
291         if (init_timer(&prefs->total_tm, TM_SANE))
292                 goto tm_fail;
293         if (init_timer(&prefs->sub_tm, TM_MANIC))
294                 goto tm_fail;
295         if (init_timer(&prefs->get_tm, TM_PARANOID))
296                 goto tm_fail;
297         if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
298                 goto tm_fail;
299
300         /********************************\
301          * Customize struct peerd/prefs *
302         \********************************/
303
304         prefs->peer = peer;
305
306         //The following function initializes the global_id, global_seed extern
307         //variables.
308         create_id();
309
310         if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
311                 prefs->lfsr = malloc(sizeof(struct lfsr));
312                 if (!prefs->lfsr) {
313                         perror("malloc");
314                         goto lfsr_fail;
315                 }
316                 //FIXME: Give a name to max requests, not just prefs->ts / prefs->bs
317                 //FIXME: handle better the seed passing than just giving UINT64_MAX
318                 if (lfsr_init(prefs->lfsr, prefs->max_requests, UINT64_MAX)) {
319                         XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
320                         goto lfsr_fail;
321                 }
322         }
323
324         peer->peerd_loop = custom_peerd_loop;
325         peer->priv = (void *) prefs;
326         return 0;
327
328 arg_fail:
329         custom_peer_usage();
330 lfsr_fail:
331         free(prefs->lfsr);
332 tm_fail:
333         free(prefs->total_tm);
334         free(prefs->sub_tm);
335         free(prefs->get_tm);
336         free(prefs->rec_tm);
337         free(prefs);
338         return -1;
339 }
340
341
342 static int send_request(struct peerd *peer, struct bench *prefs)
343 {
344         struct xseg_request *req;
345         struct xseg *xseg = peer->xseg;
346         struct peer_req *pr;
347         xport srcport = prefs->src_port;
348         xport dstport = prefs->dst_port;
349         xport p;
350
351         int r;
352         uint64_t new;
353         uint64_t size = prefs->bs;
354
355         //srcport and dstport must already be provided by the user.
356         //returns struct xseg_request with basic initializations
357         //XSEGLOG2(&lc, D, "Get new request\n");
358         timer_start(prefs, prefs->get_tm);
359         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
360         if (!req) {
361                 XSEGLOG2(&lc, W, "Cannot get request\n");
362                 return -1;
363         }
364         timer_stop(prefs, prefs->get_tm, NULL);
365
366         //Allocate enough space for the data and the target's name
367         //XSEGLOG2(&lc, D, "Prepare new request\n");
368         r = xseg_prep_request(xseg, req, TARGETLEN, size);
369         if (r < 0) {
370                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
371                                 TARGETLEN, (unsigned long long)size);
372                 goto put_xseg_request;
373         }
374
375         //Determine what the next target/chunk will be, based on I/O pattern
376         new = determine_next(prefs);
377         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
378         //Create a target of this format: "bench-<obj_no>"
379         create_target(prefs, req, new);
380
381         if (prefs->op == X_WRITE || prefs->op == X_READ) {
382                 req->size = size;
383                 //Calculate the chunk offset inside the object
384                 req->offset = (new * prefs->bs) % prefs->os;
385                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
386
387                 if (prefs->op == X_WRITE)
388                         create_chunk(prefs, req, new);
389         }
390
391         req->op = prefs->op;
392
393         //Measure this?
394         //XSEGLOG2(&lc, D, "Allocate peer request\n");
395         pr = alloc_peer_req(peer);
396         if (!pr) {
397                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
398                                 peer->nr_ops - xq_count(&peer->free_reqs));
399                 goto put_xseg_request;
400         }
401         pr->peer = peer;
402         pr->portno = srcport;
403         pr->req = req;
404         pr->priv = malloc(sizeof(struct timespec));
405         if (!pr->priv) {
406                 perror("malloc");
407                 goto put_peer_request;
408         }
409
410         //XSEGLOG2(&lc, D, "Set request data\n");
411         r = xseg_set_req_data(xseg, req, pr);
412         if (r < 0) {
413                 XSEGLOG2(&lc, W, "Cannot set request data\n");
414                 goto put_peer_request;
415         }
416
417         /*
418          * Start measuring receive time.
419          * When we receive a request, we need to have its submission time to
420          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
421          * QUESTION: Is this the fastest way?
422          */
423         timer_start(prefs, prefs->rec_tm);
424         if (prefs->rec_tm->insanity <= prefs->insanity)
425                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
426
427         //Submit the request from the source port to the target port
428         //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
429         timer_start(prefs, prefs->sub_tm);
430         p = xseg_submit(xseg, req, srcport, X_ALLOC);
431         if (p == NoPort) {
432                 XSEGLOG2(&lc, W, "Cannot submit request\n");
433                 goto put_peer_request;
434         }
435         timer_stop(prefs, prefs->sub_tm, NULL);
436
437         //Send SIGIO to the process that has bound this port to inform that
438         //IO is possible
439         r = xseg_signal(xseg, p);
440         if (r < 0)
441                 XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
442
443         return 0;
444
445 put_peer_request:
446         free(pr->priv);
447         free_peer_req(peer, pr);
448 put_xseg_request:
449         if (xseg_put_request(xseg, req, srcport))
450                 XSEGLOG2(&lc, W, "Cannot put request\n");
451         return -1;
452 }
453
454 /*
455  * This function substitutes the default generic_peerd_loop of peer.c.
456  * It's plugged to struct peerd at custom peer's initialisation
457  */
458 int custom_peerd_loop(void *arg)
459 {
460 #ifdef MT
461         struct thread *t = (struct thread *) arg;
462         struct peerd *peer = t->peer;
463         char *id = t->arg;
464 #else
465         struct peerd *peer = (struct peerd *) arg;
466         char id[4] = {'P','e','e','r'};
467 #endif
468         struct xseg *xseg = peer->xseg;
469         struct bench *prefs = peer->priv;
470         xport portno_start = peer->portno_start;
471         xport portno_end = peer->portno_end;
472         uint64_t threshold=1000/(1 + portno_end - portno_start);
473         pid_t pid = syscall(SYS_gettid);
474         int r;
475         uint64_t loops;
476
477         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
478         xseg_init_local_signal(xseg, peer->portno_start);
479
480         timer_start(prefs, prefs->total_tm);
481 send_request:
482         while (!isTerminate()) {
483 #ifdef MT
484                 if (t->func) {
485                         XSEGLOG2(&lc, D, "%s executes function\n", id);
486                         xseg_cancel_wait(xseg, peer->portno_start);
487                         t->func(t->arg);
488                         t->func = NULL;
489                         t->arg = NULL;
490                         continue;
491                 }
492 #endif
493                 while (CAN_SEND_REQUEST(prefs)) {
494                         xseg_cancel_wait(xseg, peer->portno_start);
495                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
496                                         prefs->sub_tm->completed - prefs->rec_tm->completed,
497                                         prefs->iodepth, prefs->sub_tm->completed,
498                                         prefs->max_requests);
499                         XSEGLOG2(&lc, D, "Start sending new request\n");
500                         r = send_request(peer, prefs);
501                         if (r < 0)
502                                 break;
503                 }
504                 //Heart of peerd_loop. This loop is common for everyone.
505                 for (loops = threshold; loops > 0; loops--) {
506                         if (loops == 1)
507                                 xseg_prepare_wait(xseg, peer->portno_start);
508
509                         if (check_ports(peer)) {
510                                 //If an old request has just been acked, the most sensible
511                                 //thing to do is to immediately send a new one
512                                 if (prefs->rec_tm->completed < prefs->max_requests)
513                                         goto send_request;
514                                 else
515                                         return 0;
516                         }
517                 }
518                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
519                 //struct xq *q;
520                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
521                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
522                 //              id, xq_count(q));
523                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
524 #ifdef ST_THREADS
525                 if (ta){
526                         st_sleep(0);
527                         continue;
528                 }
529 #endif
530                 xseg_wait_signal(xseg, 10000000UL);
531                 xseg_cancel_wait(xseg, peer->portno_start);
532                 XSEGLOG2(&lc, I, "%s woke up\n", id);
533         }
534
535         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
536                         xq_count(&peer->free_reqs), peer->nr_ops);
537         return 0;
538 }
539
540 void custom_peer_finalize(struct peerd *peer)
541 {
542         struct bench *prefs = peer->priv;
543         //TODO: Measure mean time, standard variation
544
545         if (!prefs->total_tm->completed)
546                 timer_stop(prefs, prefs->total_tm, NULL);
547
548         print_stats(prefs);
549         print_res(prefs, prefs->total_tm, "Total Requests");
550         return;
551 }
552
553
554 static void handle_received(struct peerd *peer, struct peer_req *pr)
555 {
556         //FIXME: handle null pointer
557         struct bench *prefs = peer->priv;
558         struct timer *rec = prefs->rec_tm;
559
560         if (!pr->req) {
561                 //This is a serious error, so we must stop
562                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
563                 terminated++;
564                 return;
565         }
566
567         if (!pr->priv) {
568                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
569                 return;
570         }
571
572         timer_stop(prefs, rec, pr->priv);
573
574         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
575                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
576
577         //QUESTION, can't we just keep the malloced memory for future use?
578         free(pr->priv);
579         free_peer_req(peer, pr);
580 }
581
582 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
583                 enum dispatch_reason reason)
584 {
585         switch (reason) {
586                 case dispatch_accept:
587                         //This is wrong, benchmarking peer should not accept requests,
588                         //only receive them.
589                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
590                         complete(peer, pr);
591                         break;
592                 case dispatch_receive:
593                         handle_received(peer, pr);
594                         break;
595                 default:
596                         fail(peer, pr);
597         }
598         return 0;
599 }