2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
39 #include <sys/syscall.h>
40 #include <sys/types.h>
42 #include <xseg/xseg.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
51 char global_id[IDLEN + 1];
53 * This macro checks two things:
54 * a) If in-flight requests are less than given iodepth
55 * b) If we have submitted all of the requests
56 * c) If we are not in ping mode
57 * d) If we have been asked to terminate
59 #define CAN_SEND_REQUEST(__p) \
60 ((__p->status->submitted - __p->status->received < __p->iodepth) && \
61 (__p->status->submitted < __p->status->max) && \
62 (GET_FLAG(PING, __p->flags) == PING_MODE_OFF) && \
65 #define CAN_VERIFY(__p) \
66 ((GET_FLAG(VERIFY, __p->flags) != VERIFY_NO) && __p->op == X_READ)
68 #define CAN_PRINT_PROGRESS(__p, __q) \
69 ((GET_FLAG(PROGRESS, __p->flags) == PROGRESS_YES) && \
70 (__p->status->received == __q))
72 void custom_peer_usage()
74 fprintf(stderr, "Custom peer options: \n"
75 " --------------------------------------------\n"
76 " -op | None | XSEG operation [read|write|info|delete]\n"
77 " --pattern | None | I/O pattern [seq|rand]\n"
78 " --verify | no | Verify written requests [no|meta|full]\n"
79 " -rc | None | Request cap\n"
80 " -to | None | Total objects\n"
81 " -ts | None | Total I/O size\n"
82 " -os | 4M | Object size\n"
83 " -bs | 4k | Block size\n"
84 " -tp | None | Target port\n"
85 " --iodepth | 1 | Number of in-flight I/O requests\n"
86 " --seed | None | Initialize LFSR and target names\n"
87 " --insanity| sane | Adjust insanity level of benchmark:\n"
88 " | | [sane|eccentric|manic|paranoid]\n"
89 " --progress| yes | Show progress of requests\n"
90 " --ping | yes | Ping target before starting benchmark\n"
92 "Additional information:\n"
93 " --------------------------------------------\n"
94 " The -to and -ts options are mutually exclusive\n"
98 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
101 char request_cap[MAX_ARG_LEN + 1];
102 char total_objects[MAX_ARG_LEN + 1];
103 char total_size[MAX_ARG_LEN + 1];
104 char object_size[MAX_ARG_LEN + 1];
105 char block_size[MAX_ARG_LEN + 1];
106 char op[MAX_ARG_LEN + 1];
107 char pattern[MAX_ARG_LEN + 1];
108 char insanity[MAX_ARG_LEN + 1];
109 char verify[MAX_ARG_LEN + 1];
110 char progress[MAX_ARG_LEN + 1];
111 char ping[MAX_ARG_LEN + 1];
112 struct xseg *xseg = peer->xseg;
113 unsigned int xseg_page_size = 1 << xseg->config.page_shift;
116 unsigned long seed = -1;
118 struct timespec timer_seed;
125 total_objects[0] = 0;
135 prefs = malloc(sizeof(struct bench));
140 memset(prefs, 0, sizeof(struct bench));
142 prefs->status = malloc(sizeof(struct req_status));
143 if (!prefs->status) {
147 memset(prefs->status, 0, sizeof(struct req_status));
149 for (i = 0; i < peer->nr_ops; i++) {
150 ts = malloc(sizeof(struct timespec));
155 peer->peer_reqs[i].priv = ts;
158 //Begin reading the benchmark-specific arguments
159 BEGIN_READ_ARGS(argc, argv);
160 READ_ARG_STRING("-rc", request_cap, MAX_ARG_LEN);
161 READ_ARG_STRING("-op", op, MAX_ARG_LEN);
162 READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
163 READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
164 READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
165 READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
166 READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
167 READ_ARG_ULONG("--iodepth", iodepth);
168 READ_ARG_ULONG("-tp", dst_port);
169 READ_ARG_ULONG("--seed", seed);
170 READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
171 READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
172 READ_ARG_STRING("--progress", progress, MAX_ARG_LEN);
173 READ_ARG_STRING("--ping", ping, MAX_ARG_LEN);
176 /*****************************\
177 * Check I/O type parameters *
178 \*****************************/
180 //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
181 //The I/O pattern of these operations can be either sequential (seq) or
184 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
189 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
195 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
198 r = read_pattern(pattern);
200 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
203 SET_FLAG(PATTERN, prefs->flags, r);
206 strcpy(verify, "no");
207 r = read_verify(verify);
209 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
212 SET_FLAG(VERIFY, prefs->flags, r);
214 //Default iodepth value is 1
218 prefs->iodepth = iodepth;
220 /**************************\
221 * Check timer parameters *
222 \**************************/
224 //Most of the times, not all timers need to be used.
225 //We can choose which timers will be used by adjusting the "insanity"
226 //level of the benchmark i.e. the obscurity of code paths (get request,
227 //submit request) that will be timed.
229 strcpy(insanity, "sane");
231 r = read_insanity(insanity);
233 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
236 SET_FLAG(INSANITY, prefs->flags, r);
238 /*****************************\
239 * Check I/O size parameters *
240 \*****************************/
242 //Block size (bs): Defaults to 4K.
243 //It must be a number followed by one of these characters:
245 //If not, it will be considered as size in bytes.
246 //Must be integer multiple of segment's page size (typically 4k).
248 strcpy(block_size,"4k");
250 prefs->bs = str2num(block_size);
252 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
254 } else if (prefs->bs % xseg_page_size) {
255 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
259 //Total objects (to) or total I/O size (ts).
260 //Must have the same format as "block size"
261 //They are mutually exclusive
262 if (total_objects[0] && total_size[0]) {
263 XSEGLOG2(&lc, E, "Total objects and total size are mutually exclusive\n");
265 } else if (total_objects[0]) {
266 prefs->to = str2num(total_objects);
268 XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
271 //In this case, the maximum number of requests is the total number of
272 //objects we will handle
273 prefs->status->max = prefs->to;
274 } else if (total_size[0]) {
275 if (prefs->op != X_READ && prefs->op != X_WRITE) {
277 "Total objects must be supplied (required by -op %s)\n", op);
280 prefs->ts = str2num(total_size);
282 XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
284 } else if (prefs->ts % prefs->bs) {
285 XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
288 //In this case, the maximum number of requests is the number of blocks
289 //we need to cover the total I/O size
290 prefs->status->max = prefs->ts / prefs->bs;
292 XSEGLOG2(&lc, E, "Total objects or total size must be supplied\n");
296 //Object size (os): Defaults to 4M.
297 //Must have the same format as "block size"
298 //Must be integer multiple of "block size"
300 strcpy(object_size,"4M");
302 prefs->os = str2num(object_size);
304 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
306 } else if (prefs->os % prefs->bs) {
307 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
312 /*************************\
313 * Check port parameters *
314 \*************************/
317 XSEGLOG2(&lc, E, "Target port must be supplied\n");
321 prefs->src_port = peer->portno_start; //TODO: allow user to change this
322 prefs->dst_port = (xport) dst_port;
324 /*********************************\
325 * Create timers for all metrics *
326 \*********************************/
328 if (init_timer(&prefs->total_tm, INSANITY_SANE))
330 if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
332 if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
334 if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
337 /*************************************\
338 * Initialize the LFSR and global_id *
339 \*************************************/
341 //We proceed to initialise the global_id, and seed variables.
343 clock_gettime(CLOCK_BENCH, &timer_seed);
344 seed = timer_seed.tv_nsec;
350 if (prefs->status->max == 1)
351 SET_FLAG(PATTERN, prefs->flags, PATTERN_SEQ);
353 if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
354 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
360 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
361 if (r && set_by_hand) {
362 XSEGLOG2(&lc, E, "LFSR could not be initialized.\n");
370 /*********************************\
371 * Miscellaneous initializations *
372 \*********************************/
374 /* The request cap must be enforced only after the LFSR is initialized */
375 if (request_cap[0]) {
376 rc = str2num(request_cap);
378 XSEGLOG2(&lc, E, "Invalid syntax: -rc %s\n", request_cap);
380 } else if (rc > prefs->status->max) {
381 XSEGLOG2(&lc, E, "Request cap exceeds current request total.\n");
384 prefs->status->max = rc;
387 /* Benchmarking progress printing is on by default */
389 strcpy(progress, "yes");
390 r = read_progress(progress);
392 XSEGLOG2(&lc, E, "Invalid syntax: --progress %s\n", progress);
395 SET_FLAG(PROGRESS, prefs->flags, r);
397 /* Pinging the target peer is on by default */
402 XSEGLOG2(&lc, E, "Invalid syntax: --ping %s\n", ping);
405 SET_FLAG(PING, prefs->flags, r);
408 peer->peerd_loop = bench_peerd_loop;
409 peer->priv = (void *) prefs;
410 XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
418 free(prefs->total_tm);
423 for (; i >= 0; i--) {
424 free(peer->peer_reqs[i].priv);
434 static int send_request(struct peerd *peer, struct bench *prefs)
436 struct xseg_request *req;
437 struct xseg *xseg = peer->xseg;
439 xport srcport = prefs->src_port;
440 xport dstport = prefs->dst_port;
445 uint64_t size = prefs->bs;
448 //srcport and dstport must already be provided by the user.
449 //returns struct xseg_request with basic initializations
450 XSEGLOG2(&lc, D, "Get new request\n");
451 timer_start(prefs, prefs->get_tm);
452 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
454 XSEGLOG2(&lc, W, "Cannot get request\n");
457 timer_stop(prefs, prefs->get_tm, NULL);
459 //Allocate enough space for the data and the target's name
460 XSEGLOG2(&lc, D, "Prepare new request\n");
461 r = xseg_prep_request(xseg, req, TARGETLEN, size);
463 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
464 TARGETLEN, (unsigned long long)size);
465 goto put_xseg_request;
468 //Determine what the next target/chunk will be, based on I/O pattern
469 new = determine_next(prefs);
471 XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
472 //Create a target of this format: "bench-<global_id>-<obj_no>"
473 create_target(prefs, req, new);
475 if (prefs->op == X_WRITE || prefs->op == X_READ) {
477 //Calculate the chunk's offset inside the object
478 req->offset = calculate_offset(prefs, new);
479 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
481 if (prefs->op == X_WRITE)
482 create_chunk(prefs, req, new);
485 XSEGLOG2(&lc, D, "Allocate peer request\n");
486 pr = alloc_peer_req(peer);
488 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
489 peer->nr_ops - xq_count(&peer->free_reqs));
490 goto put_xseg_request;
493 pr->portno = srcport;
496 //XSEGLOG2(&lc, D, "Set request data\n");
497 r = xseg_set_req_data(xseg, req, pr);
499 XSEGLOG2(&lc, W, "Cannot set request data\n");
500 goto put_peer_request;
504 * Start measuring receive time.
505 * When we receive a request, we need to have its submission time to
506 * measure elapsed time. Thus, we copy its submission time to pr->priv.
507 * QUESTION: Is this the fastest way?
509 timer_start(prefs, prefs->rec_tm);
510 if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags)) {
511 ts = (struct timespec *)pr->priv;
512 ts->tv_sec = prefs->rec_tm->start_time.tv_sec;
513 ts->tv_nsec = prefs->rec_tm->start_time.tv_nsec;
516 //Submit the request from the source port to the target port
517 XSEGLOG2(&lc, D, "Submit request %lu\n", new);
518 timer_start(prefs, prefs->sub_tm);
519 p = xseg_submit(xseg, req, srcport, X_ALLOC);
521 XSEGLOG2(&lc, W, "Cannot submit request\n");
522 goto put_peer_request;
524 prefs->status->submitted++;
525 timer_stop(prefs, prefs->sub_tm, NULL);
527 //Send SIGIO to the process that has bound this port to inform that
529 r = xseg_signal(xseg, p);
531 // XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
536 free_peer_req(peer, pr);
538 if (xseg_put_request(xseg, req, srcport))
539 XSEGLOG2(&lc, W, "Cannot put request\n");
543 static int send_ping_request(struct peerd *peer, struct bench *prefs)
545 struct xseg_request *req;
546 struct xseg *xseg = peer->xseg;
548 xport srcport = prefs->src_port;
549 xport dstport = prefs->dst_port;
553 XSEGLOG2(&lc, I, "Sending ping request...");
554 //srcport and dstport must already be provided by the user.
555 //returns struct xseg_request with basic initializations
556 XSEGLOG2(&lc, D, "Get new request\n");
557 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
559 XSEGLOG2(&lc, W, "Cannot get request\n");
564 XSEGLOG2(&lc, D, "Allocate peer request\n");
565 pr = alloc_peer_req(peer);
567 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
568 peer->nr_ops - xq_count(&peer->free_reqs));
569 goto put_xseg_request;
572 pr->portno = srcport;
575 r = xseg_set_req_data(xseg, req, pr);
577 XSEGLOG2(&lc, W, "Cannot set request data\n");
578 goto put_peer_request;
581 //Submit the request from the source port to the target port
582 XSEGLOG2(&lc, D, "Submit ping request");
583 p = xseg_submit(xseg, req, srcport, X_ALLOC);
585 XSEGLOG2(&lc, W, "Cannot submit request\n");
586 goto put_peer_request;
588 timer_stop(prefs, prefs->sub_tm, NULL);
590 //Send SIGIO to the process that has bound this port to inform that
592 r = xseg_signal(xseg, p);
594 // XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
599 free_peer_req(peer, pr);
601 if (xseg_put_request(xseg, req, srcport))
602 XSEGLOG2(&lc, W, "Cannot put request\n");
607 * This function substitutes the default generic_peerd_loop of peer.c.
608 * It's plugged to struct peerd at custom peer's initialisation
610 int bench_peerd_loop(void *arg)
613 struct thread *t = (struct thread *) arg;
614 struct peerd *peer = t->peer;
617 struct peerd *peer = (struct peerd *) arg;
618 char id[4] = {'P','e','e','r'};
620 struct xseg *xseg = peer->xseg;
621 struct bench *prefs = peer->priv;
622 xport portno_start = peer->portno_start;
623 xport portno_end = peer->portno_end;
624 pid_t pid = syscall(SYS_gettid);
625 uint64_t threshold=1000/(1 + portno_end - portno_start);
626 uint64_t cached_prog_quantum = 0;
627 uint64_t prog_quantum = 0;
631 if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES) {
632 prog_quantum = calculate_prog_quantum(prefs);
633 cached_prog_quantum = prog_quantum;
637 XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
638 xseg_init_local_signal(xseg, peer->portno_start);
640 /* If no ping is going to be sent, we can begin the benchmark now. */
641 if (GET_FLAG(PING, prefs->flags) == PING_MODE_OFF)
642 timer_start(prefs, prefs->total_tm);
644 send_ping_request(peer, prefs);
647 while (!(isTerminate() && all_peer_reqs_free(peer))) {
648 while (CAN_SEND_REQUEST(prefs)) {
649 xseg_cancel_wait(xseg, peer->portno_start);
650 XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
651 prefs->status->submitted - prefs->status->received,
652 prefs->iodepth, prefs->status->received,
654 XSEGLOG2(&lc, D, "Start sending new request\n");
655 r = send_request(peer, prefs);
659 //Heart of peerd_loop. This loop is common for everyone.
660 for (loops = threshold; loops > 0; loops--) {
662 xseg_prepare_wait(xseg, peer->portno_start);
664 if (UNLIKELY(CAN_PRINT_PROGRESS(prefs, prog_quantum))) {
665 prog_quantum += cached_prog_quantum;
666 print_progress(prefs);
669 if (check_ports(peer)) {
670 //If an old request has just been acked, the most sensible
671 //thing to do is to immediately send a new one
672 if (prefs->status->received < prefs->status->max)
678 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
680 //q = XPTR_TAKE(port->request_queue, xseg->segment);
681 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
683 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
684 xseg_wait_signal(xseg, 10000000UL);
685 xseg_cancel_wait(xseg, peer->portno_start);
686 XSEGLOG2(&lc, I, "%s woke up\n", id);
689 XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
690 xq_count(&peer->free_reqs), peer->nr_ops);
694 void custom_peer_finalize(struct peerd *peer)
696 struct bench *prefs = peer->priv;
697 //TODO: Measure mean time, standard variation
699 if (!prefs->total_tm->completed)
700 timer_stop(prefs, prefs->total_tm, NULL);
702 if (GET_FLAG(PROGRESS, prefs->flags) == PROGRESS_YES)
703 print_progress(prefs);
707 print_remaining(prefs);
713 * handle_received: +1 to our received requests.
714 * Do some sanity checks and then check if request is failed.
715 * If not try to verify the request if asked.
717 static void handle_received(struct peerd *peer, struct peer_req *pr)
719 //FIXME: handle null pointer
720 struct bench *prefs = peer->priv;
721 struct timer *rec = prefs->rec_tm;
725 //This is a serious error, so we must stop
726 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
732 * If we were in ping mode, we can now switch off and start the
735 if (GET_FLAG(PING, prefs->flags) == PING_MODE_ON) {
736 XSEGLOG2(&lc, I, "Ping received. Benchmark can start now.");
737 SET_FLAG(PING, prefs->flags, PING_MODE_OFF);
742 prefs->status->received++;
744 if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
745 XSEGLOG2(&lc, W, "Cannot find submission time of request");
749 timer_stop(prefs, rec, (struct timespec *)pr->priv);
751 if (!(pr->req->state & XS_SERVED))
752 prefs->status->failed++;
753 else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
754 prefs->status->corrupted++;
757 if (xseg_put_request(peer->xseg, pr->req, pr->portno))
758 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
760 free_peer_req(peer, pr);
763 timer_start(prefs, prefs->total_tm);
766 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
767 enum dispatch_reason reason)
770 case dispatch_accept:
771 //This is wrong, benchmarking peer should not accept requests,
773 XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
776 case dispatch_receive:
777 handle_received(peer, pr);