2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
39 #include <sys/syscall.h>
40 #include <sys/types.h>
42 #include <xseg/xseg.h>
47 #include <bench-xseg.h>
50 char global_id[IDLEN];
54 * This function checks two things:
55 * a) If in-flight requests are less than given iodepth
56 * b) If we have submitted al of the requests
58 #define CAN_SEND_REQUEST(prefs) \
59 prefs->sub_tm->completed - prefs->rec_tm->completed < prefs->iodepth && \
60 prefs->sub_tm->completed < prefs->max_requests \
62 void custom_peer_usage()
64 fprintf(stderr, "Custom peer options: \n"
65 " --------------------------------------------\n"
66 " -op | None | XSEG operation [read|write|info|delete]\n"
67 " --pattern | None | I/O pattern [seq|rand]\n"
68 " -to | None | Total objects (not for read/write)\n"
69 " -ts | None | Total I/O size\n"
70 " -os | 4M | Object size\n"
71 " -bs | 4k | Block size\n"
72 " -dp | None | Destination port\n"
73 " --iodepth | 1 | Number of in-flight I/O requests\n"
74 " --verify | no | Verify written requests [no|meta|hash]\n"
75 " --seed | None | Inititalize LFSR and target names\n"
76 " --insanity| sane | Adjust insanity level of benchmark:\n"
77 " | | [sane|eccentric|manic|paranoid]\n"
81 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
84 char total_objects[MAX_ARG_LEN + 1];
85 char total_size[MAX_ARG_LEN + 1];
86 char object_size[MAX_ARG_LEN + 1];
87 char block_size[MAX_ARG_LEN + 1];
88 char op[MAX_ARG_LEN + 1];
89 char pattern[MAX_ARG_LEN + 1];
90 char insanity[MAX_ARG_LEN + 1];
91 struct xseg *xseg = peer->xseg;
92 unsigned int xseg_page_size = 1 << xseg->config.page_shift;
105 for (i = 0; i < nr_threads; i++) {
106 prefs = peer->thread[i]->priv;
107 prefs = malloc(sizeof(struct bench));
114 prefs = malloc(sizeof(struct bench));
121 //Begin reading the benchmark-specific arguments
122 BEGIN_READ_ARGS(argc, argv);
123 READ_ARG_STRING("-op", op, MAX_ARG_LEN);
124 READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
125 READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
126 READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
127 READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
128 READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
129 READ_ARG_ULONG("--iodepth", prefs->iodepth);
130 READ_ARG_ULONG("-dp", dst_port);
131 READ_ARG_STRING("--insanity", block_size, MAX_ARG_LEN);
134 /*****************************\
135 * Check I/O type parameters *
136 \*****************************/
138 //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
139 //The I/O pattern of thesee operations can be either synchronous (sync) or
142 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
147 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
153 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
156 r = read_pattern(pattern);
158 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
161 prefs->flags |= (uint8_t)r;
163 /**************************\
164 * Check timer parameters *
165 \**************************/
167 //Most of the times, not all timers need to be used.
168 //We can choose which timers will be used by adjusting the "insanity"
169 //level of the benchmark i.e. the obscurity of code paths (get request,
170 //submit request) that will be timed.
172 strcpy(insanity, "sane");
174 prefs->insanity = read_insanity(insanity);
175 if (prefs->insanity < 0) {
176 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
181 * If we have a request other than read/write, we don't need to check
182 * about size parameters, but only how many objects we want to affect
184 if (prefs->op != X_READ && prefs->op != X_WRITE) {
186 /***************************\
187 * Check object parameters *
188 \***************************/
190 if (!total_objects[0]) {
192 "Total number of objects needs to be supplied\n");
195 prefs->to = str2num(total_objects);
197 XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
201 //In this case, the maximum number of requests is the total number of
202 //objects we will handle
203 prefs->max_requests = prefs->to;
206 /*************************\
207 * Check size parameters *
208 \*************************/
210 //Block size (bs): Defaults to 4K.
211 //It must be a number followed by one of these characters:
213 //If not, it will be considered as size in bytes.
214 //Must be integer multiple of segment's page size (typically 4k).
216 strcpy(block_size,"4k");
221 prefs->bs = str2num(block_size);
223 XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
225 } else if (prefs->bs % xseg_page_size) {
226 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
230 //Total I/O size (ts): Must be supplied by user.
231 //Must have the same format as "total size"
232 //Must be integer multiple of "block size"
233 if (!total_size[0]) {
234 XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
238 prefs->ts = str2num(total_size);
240 XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
242 } else if (prefs->ts % prefs->bs) {
243 XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
245 } else if (prefs->ts > xseg->segment_size) {
247 "Total I/O size exceeds segment size\n", total_size);
251 //Object size (os): Defaults to 4M.
252 //Must have the same format as "total size"
253 //Must be integer multiple of "block size"
255 strcpy(object_size,"4M");
257 prefs->os = str2num(object_size);
259 XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
261 } else if (prefs->os % prefs->bs) {
262 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
266 //In this case, the maximum number of requests is the number of blocks
267 //we need to cover the total I/O size
268 prefs->max_requests = prefs->ts / prefs->bs;
271 /*************************
272 * Check port parameters *
273 *************************/
276 XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
280 prefs->src_port = peer->portno_start; //TODO: allow user to change this
281 prefs->dst_port = (xport) dst_port;
283 /*********************************
284 * Create timers for all metrics *
285 *********************************/
287 if (init_timer(&prefs->total_tm, TM_SANE))
289 if (init_timer(&prefs->sub_tm, TM_MANIC))
291 if (init_timer(&prefs->get_tm, TM_PARANOID))
293 if (init_timer(&prefs->rec_tm, TM_ECCENTRIC))
296 /********************************\
297 * Customize struct peerd/prefs *
298 \********************************/
302 //The following function initializes the global_id, global_seed extern
306 if ((prefs->flags & (1 <<PATTERN_FLAG)) == IO_RAND) {
307 prefs->lfsr = malloc(sizeof(struct lfsr));
312 //FIXME: Give a name to max requests, not just prefs->ts / prefs->bs
313 //FIXME: handle better the seed passing than just giving UINT64_MAX
314 if (lfsr_init(prefs->lfsr, prefs->max_requests, UINT64_MAX)) {
315 XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
320 peer->peerd_loop = custom_peerd_loop;
321 peer->priv = (void *) prefs;
329 free(prefs->total_tm);
338 static int send_request(struct peerd *peer, struct bench *prefs)
340 struct xseg_request *req;
341 struct xseg *xseg = peer->xseg;
343 xport srcport = prefs->src_port;
344 xport dstport = prefs->dst_port;
349 uint64_t size = prefs->bs;
351 //srcport and dstport must already be provided by the user.
352 //returns struct xseg_request with basic initializations
353 //XSEGLOG2(&lc, D, "Get new request\n");
354 timer_start(prefs, prefs->get_tm);
355 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
357 XSEGLOG2(&lc, W, "Cannot get request\n");
360 timer_stop(prefs, prefs->get_tm, NULL);
362 //Allocate enough space for the data and the target's name
363 //XSEGLOG2(&lc, D, "Prepare new request\n");
364 r = xseg_prep_request(xseg, req, TARGETLEN, size);
366 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
367 TARGETLEN, (unsigned long long)size);
368 goto put_xseg_request;
371 //Determine what the next target/chunk will be, based on I/O pattern
372 new = determine_next(prefs);
373 XSEGLOG2(&lc, D, "Our new request is %lu\n", new);
374 //Create a target of this format: "bench-<obj_no>"
375 create_target(prefs, req, new);
377 if(prefs->op == X_WRITE || prefs->op == X_READ) {
379 //Calculate the chunk offset inside the object
380 req->offset = (new * prefs->bs) % prefs->os;
381 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
383 if(prefs->op == X_WRITE)
384 create_chunk(prefs, req, new);
390 //XSEGLOG2(&lc, D, "Allocate peer request\n");
391 pr = alloc_peer_req(peer);
393 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
394 peer->nr_ops - xq_count(&peer->free_reqs));
395 goto put_xseg_request;
398 pr->portno = srcport;
400 pr->priv = malloc(sizeof(struct timespec));
403 goto put_peer_request;
406 //XSEGLOG2(&lc, D, "Set request data\n");
407 r = xseg_set_req_data(xseg, req, pr);
409 XSEGLOG2(&lc, W, "Cannot set request data\n");
410 goto put_peer_request;
414 * Start measuring receive time.
415 * When we receive a request, we need to have its submission time to
416 * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
417 * QUESTION: Is this the fastest way?
419 timer_start(prefs, prefs->rec_tm);
420 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
422 //Submit the request from the source port to the target port
423 //XSEGLOG2(&lc, D, "Submit request %lu\n", new);
424 //QUESTION: Can't we just use the submision time calculated previously?
425 timer_start(prefs, prefs->sub_tm);
426 p = xseg_submit(xseg, req, srcport, X_ALLOC);
428 XSEGLOG2(&lc, W, "Cannot submit request\n");
429 goto put_peer_request;
431 timer_stop(prefs, prefs->sub_tm, NULL);
433 //Send SIGIO to the process that has binded this port to inform that
435 xseg_signal(xseg, p);
441 free_peer_req(peer, pr);
443 if (xseg_put_request(xseg, req, srcport))
444 XSEGLOG2(&lc, W, "Cannot put request\n");
449 * This function substitutes the default generic_peerd_loop of peer.c.
450 * It's plugged to struct peerd at custom peer's initialisation
452 int custom_peerd_loop(void *arg)
455 struct thread *t = (struct thread *) arg;
456 struct peerd *peer = t->peer;
459 struct peerd *peer = (struct peerd *) arg;
460 char id[4] = {'P','e','e','r'};
462 struct xseg *xseg = peer->xseg;
463 struct bench *prefs = peer->priv;
464 xport portno_start = peer->portno_start;
465 xport portno_end = peer->portno_end;
466 uint64_t threshold=1000/(1 + portno_end - portno_start);
467 pid_t pid =syscall(SYS_gettid);
471 XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
472 xseg_init_local_signal(xseg, peer->portno_start);
475 timer_start(prefs, prefs->total_tm);
476 while (!isTerminate()) {
479 XSEGLOG2(&lc, D, "%s executes function\n", id);
480 xseg_cancel_wait(xseg, peer->portno_start);
488 while (CAN_SEND_REQUEST(prefs)) {
489 XSEGLOG2(&lc, D, "Because %lu < %lu && %lu < %lu\n",
490 prefs->sub_tm->completed - prefs->rec_tm->completed,
491 prefs->iodepth, prefs->sub_tm->completed,
492 prefs->max_requests);
493 xseg_cancel_wait(xseg, peer->portno_start);
494 XSEGLOG2(&lc, D, "Start sending new request\n");
495 r = send_request(peer, prefs);
499 //Heart of peerd_loop. This loop is common for everyone.
500 for (loops = threshold; loops > 0; loops--) {
501 if (check_ports(peer)) {
502 if (prefs->max_requests == prefs->rec_tm->completed)
505 //If an old request has just been acked, the most sensible
506 //thing to do is to immediately send a new one
510 XSEGLOG2(&lc, I, "%s goes to sleep\n",id);
511 xseg_prepare_wait(xseg, peer->portno_start);
518 xseg_wait_signal(xseg, 10000000UL);
519 xseg_cancel_wait(xseg, peer->portno_start);
520 XSEGLOG2(&lc, I, "%s woke up\n", id);
523 XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
524 xq_count(&peer->free_reqs), peer->nr_ops);
528 void custom_peer_finalize(struct peerd *peer)
530 struct bench *prefs = peer->priv;
531 //TODO: Measure mean time, standard variation
532 struct tm_result total; //mean, std;
534 if (!prefs->total_tm->completed)
535 timer_stop(prefs, prefs->total_tm, NULL);
537 separate_by_order(prefs->total_tm->sum, &total);
538 print_res(total, "Total Time");
543 static void handle_received(struct peerd *peer, struct peer_req *pr)
545 //FIXME: handle null pointer
546 struct bench *prefs = peer->priv;
547 struct timer *rec = prefs->rec_tm;
550 //This is a serious error, so we must stop
551 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
557 XSEGLOG2(&lc, W, "Cannot find submission time of request");
561 timer_stop(prefs, rec, pr->priv);
563 if (xseg_put_request(peer->xseg, pr->req, pr->portno))
564 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
566 //QUESTION, can't we just keep the malloced memory for future use?
568 free_peer_req(peer, pr);
571 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
572 enum dispatch_reason reason)
575 case dispatch_accept:
576 //This is wrong, benchmarking peer should not accept requests,
578 XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
581 case dispatch_receive:
582 handle_received(peer, pr);