2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
39 #include <sys/syscall.h>
40 #include <sys/types.h>
42 #include <xseg/xseg.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
51 char global_id[IDLEN];
53 * This macro checks two things:
54 * a) If in-flight requests are less than given iodepth
55 * b) If we have submitted all of the requests
57 #define CAN_SEND_REQUEST(prefs) \
58 ((prefs->status->submitted - prefs->status->received < prefs->iodepth) && \
59 (prefs->status->submitted < prefs->status->max))
61 #define CAN_VERIFY(prefs) \
62 ((GET_FLAG(VERIFY, prefs->flags) != VERIFY_NO) && prefs->op == X_READ)
64 void custom_peer_usage()
66 fprintf(stderr, "Custom peer options: \n"
67 " --------------------------------------------\n"
68 " -op | None | XSEG operation [read|write|info|delete]\n"
69 " --pattern | None | I/O pattern [seq|rand]\n"
70 " --verify | no | Verify written requests [no|meta|full]\n"
71 " -to | None | Total objects (not for read/write)\n"
72 " -ts | None | Total I/O size\n"
73 " -os | 4M | Object size\n"
74 " -bs | 4k | Block size\n"
75 " -tp | None | Target port\n"
76 " --iodepth | 1 | Number of in-flight I/O requests\n"
77 " --seed | None | Initialize LFSR and target names\n"
78 " --insanity| sane | Adjust insanity level of benchmark:\n"
79 " | | [sane|eccentric|manic|paranoid]\n"
83 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
86 char total_objects[MAX_ARG_LEN + 1];
87 char total_size[MAX_ARG_LEN + 1];
88 char object_size[MAX_ARG_LEN + 1];
89 char block_size[MAX_ARG_LEN + 1];
90 char op[MAX_ARG_LEN + 1];
91 char pattern[MAX_ARG_LEN + 1];
92 char insanity[MAX_ARG_LEN + 1];
93 char verify[MAX_ARG_LEN + 1];
94 struct xseg *xseg = peer->xseg;
95 unsigned int xseg_page_size = 1 << xseg->config.page_shift;
98 unsigned long seed = -1;
99 struct timespec timer_seed;
105 total_objects[0] = 0;
113 for (i = 0; i < nr_threads; i++) {
114 prefs = peer->thread[i]->priv;
115 prefs = malloc(sizeof(struct bench));
122 prefs = malloc(sizeof(struct bench));
129 prefs->status = malloc(sizeof(struct req_status));
130 if (!prefs->status) {
134 memset(prefs->status, 0, sizeof(struct req_status));
136 //Begin reading the benchmark-specific arguments
137 BEGIN_READ_ARGS(argc, argv);
138 READ_ARG_STRING("-op", op, MAX_ARG_LEN);
139 READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
140 READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
141 READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
142 READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
143 READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
144 READ_ARG_ULONG("--iodepth", iodepth);
145 READ_ARG_ULONG("-tp", dst_port);
146 READ_ARG_ULONG("--seed", seed);
147 READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
148 READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
151 /*****************************\
152 * Check I/O type parameters *
153 \*****************************/
155 //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
156 //The I/O pattern of these operations can be either sequential (seq) or
159 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
164 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
170 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
173 r = read_pattern(pattern);
175 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
178 SET_FLAG(PATTERN, prefs->flags, r);
181 strcpy(verify, "no");
182 r = read_verify(verify);
184 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
187 SET_FLAG(VERIFY, prefs->flags, r);
189 //Default iodepth value is 1
193 prefs->iodepth = iodepth;
195 /**************************\
196 * Check timer parameters *
197 \**************************/
199 //Most of the times, not all timers need to be used.
200 //We can choose which timers will be used by adjusting the "insanity"
201 //level of the benchmark i.e. the obscurity of code paths (get request,
202 //submit request) that will be timed.
204 strcpy(insanity, "sane");
206 r = read_insanity(insanity);
208 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
211 SET_FLAG(INSANITY, prefs->flags, r);
214 * If we have a request other than read/write, we don't need to check
215 * about size parameters, but only how many objects we want to affect
217 if (prefs->op != X_READ && prefs->op != X_WRITE) {
219 /***************************\
220 * Check object parameters *
221 \***************************/
223 if (!total_objects[0]) {
225 "Total number of objects needs to be supplied\n");
228 prefs->to = str2num(total_objects);
230 XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
234 //In this case, the maximum number of requests is the total number of
235 //objects we will handle
236 prefs->status->max = prefs->to;
239 /*************************\
240 * Check size parameters *
241 \*************************/
243 //Block size (bs): Defaults to 4K.
244 //It must be a number followed by one of these characters:
246 //If not, it will be considered as size in bytes.
247 //Must be integer multiple of segment's page size (typically 4k).
249 strcpy(block_size,"4k");
251 prefs->bs = str2num(block_size);
253 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
255 } else if (prefs->bs % xseg_page_size) {
256 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
260 //Total I/O size (ts): Must be supplied by user.
261 //Must have the same format as "total size"
262 //Must be integer multiple of "block size"
263 if (!total_size[0]) {
264 XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
268 prefs->ts = str2num(total_size);
270 XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
272 } else if (prefs->ts % prefs->bs) {
273 XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
275 } else if (prefs->ts > xseg->segment_size) {
277 "Total I/O size exceeds segment size\n", total_size);
281 //Object size (os): Defaults to 4M.
282 //Must have the same format as "total size"
283 //Must be integer multiple of "block size"
285 strcpy(object_size,"4M");
287 prefs->os = str2num(object_size);
289 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
291 } else if (prefs->os % prefs->bs) {
292 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
296 //In this case, the maximum number of requests is the number of blocks
297 //we need to cover the total I/O size
298 prefs->status->max = prefs->ts / prefs->bs;
301 /*************************\
302 * Check port parameters *
303 \*************************/
306 XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
310 prefs->src_port = peer->portno_start; //TODO: allow user to change this
311 prefs->dst_port = (xport) dst_port;
313 /*********************************\
314 * Create timers for all metrics *
315 \*********************************/
317 if (init_timer(&prefs->total_tm, INSANITY_SANE))
319 if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
321 if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
323 if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
326 /********************************\
327 * Customize struct peerd/prefs *
328 \********************************/
333 //We proceed to initialise the global_id, and seed variables.
335 clock_gettime(CLOCK_MONOTONIC_RAW, &timer_seed);
336 seed = timer_seed.tv_nsec;
342 if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
343 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
349 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
350 if (r && set_by_hand) {
351 XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
358 XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
360 peer->peerd_loop = bench_peerd_loop;
361 peer->priv = (void *) prefs;
369 free(prefs->total_tm);
378 static int send_request(struct peerd *peer, struct bench *prefs)
380 struct xseg_request *req;
381 struct xseg *xseg = peer->xseg;
383 xport srcport = prefs->src_port;
384 xport dstport = prefs->dst_port;
389 uint64_t size = prefs->bs;
391 //srcport and dstport must already be provided by the user.
392 //returns struct xseg_request with basic initializations
393 XSEGLOG2(&lc, D, "Get new request\n");
394 timer_start(prefs, prefs->get_tm);
395 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
397 XSEGLOG2(&lc, W, "Cannot get request\n");
400 timer_stop(prefs, prefs->get_tm, NULL);
402 //Allocate enough space for the data and the target's name
403 XSEGLOG2(&lc, D, "Prepare new request\n");
404 r = xseg_prep_request(xseg, req, TARGETLEN, size);
406 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
407 TARGETLEN, (unsigned long long)size);
408 goto put_xseg_request;
411 //Determine what the next target/chunk will be, based on I/O pattern
412 new = determine_next(prefs);
414 XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
415 //Create a target of this format: "bench-<global_id>-<obj_no>"
416 create_target(prefs, req, new);
418 if (prefs->op == X_WRITE || prefs->op == X_READ) {
420 //Calculate the chunk offset inside the object
421 req->offset = (new * prefs->bs) % prefs->os;
422 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
424 if (prefs->op == X_WRITE)
425 create_chunk(prefs, req, new);
430 XSEGLOG2(&lc, D, "Allocate peer request\n");
431 pr = alloc_peer_req(peer);
433 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
434 peer->nr_ops - xq_count(&peer->free_reqs));
435 goto put_xseg_request;
438 pr->portno = srcport;
440 pr->priv = malloc(sizeof(struct timespec));
443 goto put_peer_request;
446 //XSEGLOG2(&lc, D, "Set request data\n");
447 r = xseg_set_req_data(xseg, req, pr);
449 XSEGLOG2(&lc, W, "Cannot set request data\n");
450 goto put_peer_request;
454 * Start measuring receive time.
455 * When we receive a request, we need to have its submission time to
456 * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
457 * QUESTION: Is this the fastest way?
459 timer_start(prefs, prefs->rec_tm);
460 if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags))
461 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
463 //Submit the request from the source port to the target port
464 XSEGLOG2(&lc, D, "Submit request %lu\n", new);
465 timer_start(prefs, prefs->sub_tm);
466 p = xseg_submit(xseg, req, srcport, X_ALLOC);
468 XSEGLOG2(&lc, W, "Cannot submit request\n");
469 goto put_peer_request;
471 prefs->status->submitted++;
472 timer_stop(prefs, prefs->sub_tm, NULL);
474 //Send SIGIO to the process that has bound this port to inform that
476 r = xseg_signal(xseg, p);
478 // XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
484 free_peer_req(peer, pr);
486 if (xseg_put_request(xseg, req, srcport))
487 XSEGLOG2(&lc, W, "Cannot put request\n");
492 * This function substitutes the default generic_peerd_loop of peer.c.
493 * It's plugged to struct peerd at custom peer's initialisation
495 int bench_peerd_loop(void *arg)
498 struct thread *t = (struct thread *) arg;
499 struct peerd *peer = t->peer;
502 struct peerd *peer = (struct peerd *) arg;
503 char id[4] = {'P','e','e','r'};
505 struct xseg *xseg = peer->xseg;
506 struct bench *prefs = peer->priv;
507 xport portno_start = peer->portno_start;
508 xport portno_end = peer->portno_end;
509 uint64_t threshold=1000/(1 + portno_end - portno_start);
510 pid_t pid = syscall(SYS_gettid);
514 XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
515 xseg_init_local_signal(xseg, peer->portno_start);
517 timer_start(prefs, prefs->total_tm);
519 while (!isTerminate()) {
522 XSEGLOG2(&lc, D, "%s executes function\n", id);
523 xseg_cancel_wait(xseg, peer->portno_start);
530 while (CAN_SEND_REQUEST(prefs)) {
531 xseg_cancel_wait(xseg, peer->portno_start);
532 XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
533 prefs->status->submitted - prefs->status->received,
534 prefs->iodepth, prefs->status->received,
536 XSEGLOG2(&lc, D, "Start sending new request\n");
537 r = send_request(peer, prefs);
541 //Heart of peerd_loop. This loop is common for everyone.
542 for (loops = threshold; loops > 0; loops--) {
544 xseg_prepare_wait(xseg, peer->portno_start);
546 if (check_ports(peer)) {
547 //If an old request has just been acked, the most sensible
548 //thing to do is to immediately send a new one
549 if (prefs->status->received < prefs->status->max)
555 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
557 //q = XPTR_TAKE(port->request_queue, xseg->segment);
558 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
560 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
567 xseg_wait_signal(xseg, 10000000UL);
568 xseg_cancel_wait(xseg, peer->portno_start);
569 XSEGLOG2(&lc, I, "%s woke up\n", id);
572 XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
573 xq_count(&peer->free_reqs), peer->nr_ops);
577 void custom_peer_finalize(struct peerd *peer)
579 struct bench *prefs = peer->priv;
580 //TODO: Measure mean time, standard variation
582 if (!prefs->total_tm->completed)
583 timer_stop(prefs, prefs->total_tm, NULL);
586 print_res(prefs, prefs->total_tm, "Total Requests");
591 * handle_received: +1 to our received requests.
592 * Do some sanity checks and then check if request is failed.
593 * If not try to verify the request if asked.
595 static void handle_received(struct peerd *peer, struct peer_req *pr)
597 //FIXME: handle null pointer
598 struct bench *prefs = peer->priv;
599 struct timer *rec = prefs->rec_tm;
601 prefs->status->received++;
603 //This is a serious error, so we must stop
604 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
609 if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
610 XSEGLOG2(&lc, W, "Cannot find submission time of request");
614 timer_stop(prefs, rec, pr->priv);
616 if (!(pr->req->state & XS_SERVED))
617 prefs->status->failed++;
618 else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
619 prefs->status->corrupted++;
621 if (xseg_put_request(peer->xseg, pr->req, pr->portno))
622 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
624 //QUESTION, can't we just keep the malloced memory for future use?
626 free_peer_req(peer, pr);
629 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
630 enum dispatch_reason reason)
633 case dispatch_accept:
634 //This is wrong, benchmarking peer should not accept requests,
636 XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
639 case dispatch_receive:
640 handle_received(peer, pr);