12 #include <xseg/xseg.h>
16 printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
18 " <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
25 " bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
28 " alloc_requests (to source) <nr>\n"
29 " free_requests (from source) <nr>\n"
30 " put_requests (all from dest)\n"
31 " put_replies (all from dest)\n"
32 " wait <nr_replies>\n"
33 " complete <nr_requests>\n"
34 " fail <nr_requests>\n"
35 " rndwrite <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36 " rndread <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
38 " read <target> <offset> <size>\n"
39 " write <target> <offset> < data\n"
40 " truncate <target> <size>\n"
45 " clone <src> <dst>\n"
52 struct xseg_config cfg;
54 uint32_t srcport, dstport;
58 #define mkname mkname_heavy
59 /* heavy distributes duplicates much more widely than light
60 * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
63 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
67 for (i = 0; i < namelen; i += 1) {
68 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
69 c = '0' + ((c + (c >> 4)) & 0xf);
73 seed *= ((seed % 137911) | 1) * 137911;
77 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
81 for (i = 0; i < namelen; i += 1) {
83 name[i] = 'A' + (c & 0xf);
88 uint64_t pick(uint64_t size)
90 return (uint64_t)((double)(RAND_MAX) / random());
93 void mkchunk( char *chunk, uint32_t datalen,
94 char *target, uint32_t targetlen, uint64_t offset)
96 long i, r, bufsize = targetlen + 16;
98 r = datalen % bufsize;
99 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
101 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
102 memcpy(chunk + i, buf, bufsize);
104 memcpy(chunk + datalen - r, buf, r);
107 int chkchunk( char *chunk, uint32_t datalen,
108 char *target, uint32_t targetlen, uint64_t offset)
111 int bufsize = targetlen + 16;
113 r = datalen % targetlen;
114 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
116 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
117 if (memcmp(chunk + i, buf, bufsize)) {
118 /*printf("mismatch: '%*s'* vs '%*s'\n",
119 bufsize, buf, datalen, chunk);
124 if (memcmp(chunk + datalen - r, buf, r))
131 #define ALLOC_MIN 4096
132 #define ALLOC_MAX 1048576
134 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
136 static uint64_t alloc_size;
142 if (alloc_size < ALLOC_MIN)
143 alloc_size = ALLOC_MIN;
145 if (alloc_size > ALLOC_MAX)
146 alloc_size = ALLOC_MAX;
148 p = realloc(buf, alloc_size);
159 r = fread(buf + size, 1, alloc_size - size, fp);
163 if (size >= alloc_size) {
164 p = realloc(buf, alloc_size * 2);
182 void report_request(struct xseg_request *req)
184 uint32_t max = req->datalen;
187 req->data[max-1] = 0;
188 fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
189 fprintf(stderr, "data: %s\n", req->data);
192 int cmd_info(char *target)
194 uint32_t targetlen = strlen(target);
195 size_t size = sizeof(uint64_t);
198 struct xseg_request *req;
200 req = xseg_get_request(xseg, srcport);
202 fprintf(stderr, "No request!\n");
206 r = xseg_prep_request(req, targetlen, size);
208 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
209 (unsigned long) targetlen, (unsigned long) size);
210 xseg_put_request(xseg, srcport, req);
214 strncpy(req->target, target, targetlen);
219 srl = xseg_submit(xseg, dstport, req);
223 xseg_signal(xseg, dstport);
228 int cmd_read(char *target, uint64_t offset, uint64_t size)
230 uint32_t targetlen = strlen(target);
233 struct xseg_request *req = xseg_get_request(xseg, srcport);
235 fprintf(stderr, "No request\n");
239 r = xseg_prep_request(req, targetlen, size);
241 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
242 (unsigned long)targetlen, (unsigned long long)size);
243 xseg_put_request(xseg, srcport, req);
247 strncpy(req->target, target, targetlen);
248 req->offset = offset;
252 srl = xseg_submit(xseg, dstport, req);
256 xseg_signal(xseg, dstport);
260 int cmd_write(char *target, uint64_t offset)
266 uint32_t targetlen = strlen(target);
267 struct xseg_request *req;
269 inputbuf(stdin, &buf, &size);
271 fprintf(stderr, "No input\n");
275 req = xseg_get_request(xseg, srcport);
277 fprintf(stderr, "No request\n");
281 r = xseg_prep_request(req, targetlen, size);
283 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
284 (unsigned long)targetlen, (unsigned long long)size);
285 xseg_put_request(xseg, srcport, req);
289 strncpy(req->target, target, targetlen);
290 memcpy(req->buffer, buf, size);
291 req->offset = offset;
295 srl = xseg_submit(xseg, dstport, req);
297 fprintf(stderr, "Cannot submit\n");
304 int cmd_truncate(char *target, uint64_t offset)
309 int cmd_delete(char *target)
314 int cmd_acquire(char *target)
319 int cmd_release(char *target)
324 int cmd_copy(char *src, char *dst)
329 int cmd_clone(char *src, char *dst)
334 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
335 struct xseg_request *req)
337 char target[64], data[64];
338 /* null terminate name in case of req->target is less than 63 characters,
339 * and next character after name (aka first byte of next buffer) is not
342 unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
346 strncpy(target, req->target, end);
348 strncpy(data, req->data, 63);
352 "src port: %u, dst port: %u, op:%u offset: %llu size: %lu, reqstate: %u\n"
353 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
354 (unsigned int)portno1,
355 (unsigned int)portno2,
356 (unsigned int)req->op,
357 (unsigned long long)req->offset,
358 (unsigned long)req->size,
359 (unsigned int)req->state,
360 (unsigned int)req->targetlen, target,
361 (unsigned long long)req->datalen, data);
365 "src port: %u, dst port: %u, op: %u\n",
366 (unsigned int)portno1,
367 (unsigned int)portno2,
368 (unsigned int)req->op);
371 fprintf(logfd, "src port: %u, dst port: %u, reqs: %u\n",
372 (unsigned int)portno1,
373 (unsigned int)portno2,
381 #define LOG_RECEIVE 1
383 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
385 struct xseg_request *req;
387 if (!strcmp(logfile, "-"))
390 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
397 if (!strcmp(how, "full"))
399 else if (!strcmp(how, "summary"))
405 int reloop = 0, active;
406 xseg_prepare_wait(xseg, portno1);
407 xseg_prepare_wait(xseg, portno2);
413 req = xseg_accept(xseg, portno1);
415 xseg_submit(xseg, portno2, req);
416 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
420 req = xseg_accept(xseg, portno2);
422 xseg_submit(xseg, portno1, req);
423 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
427 req = xseg_receive(xseg, portno1);
429 xseg_respond(xseg, portno2, req);
430 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
434 req = xseg_receive(xseg, portno2);
436 xseg_respond(xseg, portno1, req);
437 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
444 /* wait on multiple queues? */
445 xseg_wait_signal(xseg, 100000);
448 xseg_cancel_wait(xseg, portno1);
449 xseg_cancel_wait(xseg, portno2);
460 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
465 if (targetlen >= chunksize) {
466 fprintf(stderr, "targetlen >= chunksize\n");
470 char *p = realloc(namebuf, targetlen+1);
472 fprintf(stderr, "Cannot allocate memory\n");
477 p = realloc(chunk, chunksize);
479 fprintf(stderr, "Cannot allocate memory\n");
483 memset(chunk, 0, chunksize);
487 struct xseg_request *submitted = NULL, *received;
488 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
494 xseg_prepare_wait(xseg, srcport);
495 if (nr_submitted < loops &&
496 (submitted = xseg_get_request(xseg, srcport))) {
497 xseg_cancel_wait(xseg, srcport);
498 r = xseg_prep_request(submitted, targetlen, chunksize);
500 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
501 targetlen, chunksize);
502 xseg_put_request(xseg, submitted->portno, submitted);
509 mkname(namebuf, targetlen, seed);
510 namebuf[targetlen] = 0;
511 //printf("%ld: %s\n", nr_submitted, namebuf);
512 strncpy(submitted->target, namebuf, targetlen);
513 offset = 0;// pick(size);
514 mkchunk(submitted->buffer, chunksize, namebuf, targetlen, offset);
516 submitted->offset = offset;
517 submitted->size = chunksize;
518 submitted->op = X_WRITE;
519 submitted->flags |= XF_NOSYNC;
521 srl = xseg_submit(xseg, dstport, submitted);
523 xseg_signal(xseg, dstport);
526 received = xseg_receive(xseg, srcport);
528 xseg_cancel_wait(xseg, srcport);
530 if (!(received->state & XS_SERVED)) {
532 report_request(received);
534 if (xseg_put_request(xseg, received->portno, received))
535 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
538 if (!submitted && !received)
539 xseg_wait_signal(xseg, 1000000);
541 if (nr_submitted % 1000 == 0 && !reported) {
543 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
544 nr_submitted, nr_received, nr_failed);
547 if (nr_received >= loops)
551 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
552 nr_submitted, nr_received, nr_failed);
557 * prepare/wait rhythm,
558 * files are converted to independent chunk access patterns,
561 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
566 if (targetlen >= chunksize) {
567 fprintf(stderr, "targetlen >= chunksize\n");
571 char *p = realloc(namebuf, targetlen+1);
573 fprintf(stderr, "Cannot allocate memory\n");
578 p = realloc(chunk, chunksize);
580 fprintf(stderr, "Cannot allocate memory\n");
584 memset(chunk, 0, chunksize);
588 struct xseg_request *submitted = NULL, *received;
589 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
596 xseg_prepare_wait(xseg, srcport);
597 if (nr_submitted < loops &&
598 (submitted = xseg_get_request(xseg, srcport))) {
599 xseg_cancel_wait(xseg, srcport);
600 r = xseg_prep_request(submitted, targetlen, chunksize);
602 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
603 targetlen, chunksize);
604 xseg_put_request(xseg, submitted->portno, submitted);
611 mkname(namebuf, targetlen, seed);
612 namebuf[targetlen] = 0;
613 //printf("%ld: %s\n", nr_submitted, namebuf);
614 offset = 0;//pick(size);
616 strncpy(submitted->target, namebuf, targetlen);
617 submitted->offset = offset;
618 submitted->size = chunksize;
619 submitted->op = X_READ;
621 srl = xseg_submit(xseg, dstport, submitted);
623 xseg_signal(xseg, dstport);
626 received = xseg_receive(xseg, srcport);
628 xseg_cancel_wait(xseg, srcport);
630 if (!(received->state & XS_SERVED)) {
632 report_request(received);
633 } else if (!chkchunk(received->data, received->datalen,
634 received->target, received->targetlen, received->offset)) {
638 if (xseg_put_request(xseg, received->portno, received))
639 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
642 if (!submitted && !received)
643 xseg_wait_signal(xseg, 1000000);
645 if (nr_submitted % 1000 == 0 && !reported) {
647 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
648 nr_submitted, nr_received, nr_failed, nr_mismatch);
651 if (nr_received >= loops)
655 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
656 nr_submitted, nr_received, nr_failed, nr_mismatch);
660 int cmd_report(uint32_t port)
662 struct xq *fq, *rq, *pq;
663 fq = &xseg->ports[port].free_queue;
664 rq = &xseg->ports[port].request_queue;
665 pq = &xseg->ports[port].reply_queue;
666 fprintf(stderr, "port %u:\n"
667 " free_queue [%p] count : %u\n"
668 " request_queue [%p] count : %u\n"
669 " reply_queue [%p] count : %u\n",
671 (void *)fq, xq_count(fq),
672 (void *)rq, xq_count(rq),
673 (void *)pq, xq_count(pq));
682 xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
684 fprintf(stderr, "cannot join segment!\n");
690 int cmd_reportall(void)
697 fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
698 for (t = 0; t < xseg->config.nr_ports; t++)
706 int r = xseg_create(&cfg);
708 fprintf(stderr, "cannot create segment!\n");
712 fprintf(stderr, "Segment initialized.\n");
716 int cmd_destroy(void)
718 if (!xseg && cmd_join())
723 fprintf(stderr, "Segment destroyed.\n");
727 int cmd_alloc_requests(unsigned long nr)
729 return xseg_alloc_requests(xseg, srcport, nr);
732 int cmd_free_requests(unsigned long nr)
734 return xseg_free_requests(xseg, srcport, nr);
737 int cmd_put_requests(void)
739 struct xseg_request *req;
742 req = xseg_accept(xseg, dstport);
745 if (xseg_put_request(xseg, req->portno, req))
746 fprintf(stderr, "Cannot put request at port %u\n", req->portno);
752 int cmd_finish(unsigned long nr, int fail)
754 struct xseg_request *req;
757 req = xseg_accept(xseg, srcport);
761 req->state &= ~XS_SERVED;
763 req->state |= XS_SERVED;
764 xseg_respond(xseg, dstport, req);
765 xseg_signal(xseg, dstport);
771 void handle_reply(struct xseg_request *req)
773 if (!(req->state & XS_SERVED)) {
780 fwrite(req->data, 1, req->datalen, stdout);
790 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req->data));
798 if (xseg_put_request(xseg, req->portno, req))
799 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
802 int cmd_wait(uint32_t nr)
804 struct xseg_request *req;
808 req = xseg_receive(xseg, srcport);
817 ret = xseg_prepare_wait(xseg, srcport);
821 ret = xseg_wait_signal(xseg, 1000000);
822 ret = xseg_cancel_wait(xseg, srcport);
830 int cmd_put_replies(void)
832 struct xseg_request *req;
835 req = xseg_receive(xseg, dstport);
838 fprintf(stderr, "request: %08llx%08llx\n"
841 0LL, (unsigned long long)req->serial,
846 //fwrite(req->buffer, 1, req->bufferlen, stdout);
848 if (xseg_put_request(xseg, req->portno, req))
849 fprintf(stderr, "Cannot put reply\n");
855 int cmd_bind(long portno)
857 struct xseg_port *port = xseg_bind_port(xseg, portno);
859 fprintf(stderr, "failed to bind port %ld\n", portno);
863 fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
867 int cmd_signal(uint32_t portno)
869 return xseg_signal(xseg, portno);
872 int parse_ports(char *str)
883 if ((s > str) && isdigit(str[0])) {
897 if ((s > str) && isdigit(str[0])) {
909 int main(int argc, char **argv)
921 if (xseg_parse_spec(spec, &cfg)) {
922 fprintf(stderr, "Cannot parse spec\n");
926 if (xseg_initialize()) {
927 fprintf(stderr, "cannot initialize!\n");
931 for (i = 2; i < argc; i++) {
933 if (!strcmp(argv[i], "create")) {
938 if (!strcmp(argv[i], "join")) {
941 fprintf(stderr, "Segment joined.\n");
945 if (!strcmp(argv[i], "destroy")) {
953 if (!strcmp(argv[i], "reportall")) {
954 ret = cmd_reportall();
958 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
959 ret = cmd_bind(atol(argv[i+1]));
964 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
965 ret = cmd_signal(atol(argv[i+1]));
970 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
971 ret = cmd_bridge(atol(argv[i+1]),
980 if (!parse_ports(argv[i]))
981 fprintf(stderr, "source port undefined: %s\n", argv[i]);
986 if (!parse_ports(argv[i]))
987 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
991 if (!strcmp(argv[i], "report")) {
992 ret = cmd_report(dstport);
996 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
997 ret = cmd_alloc_requests(atol(argv[i+1]));
1002 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1003 ret = cmd_free_requests(atol(argv[i+1]));
1008 if (!strcmp(argv[i], "put_requests")) {
1009 ret = cmd_put_requests();
1013 if (!strcmp(argv[i], "put_replies")) {
1014 ret = cmd_put_replies();
1018 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1019 ret = cmd_finish(atol(argv[i+1]), 0);
1024 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1025 ret = cmd_finish(atol(argv[i+1]), 1);
1030 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1031 ret = cmd_wait(atol(argv[i+1]));
1036 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1037 long nr_loops = atol(argv[i+1]);
1038 unsigned int seed = atoi(argv[i+2]);
1039 unsigned int targetlen = atoi(argv[i+3]);
1040 unsigned int chunksize = atoi(argv[i+4]);
1041 unsigned long objectsize = atol(argv[i+5]);
1042 ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1047 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1048 long nr_loops = atol(argv[i+1]);
1049 unsigned int seed = atoi(argv[i+2]);
1050 unsigned int targetlen = atoi(argv[i+3]);
1051 unsigned int chunksize = atoi(argv[i+4]);
1052 unsigned long objectsize = atol(argv[i+5]);
1053 ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1058 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1059 char *target = argv[i+1];
1060 uint64_t offset = atol(argv[i+2]);
1061 uint64_t size = atol(argv[i+3]);
1062 ret = cmd_read(target, offset, size);
1067 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1068 char *target = argv[i+1];
1069 uint64_t offset = atol(argv[i+2]);
1070 ret = cmd_write(target, offset);
1075 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1076 char *target = argv[i+1];
1077 uint64_t offset = atol(argv[i+2]);
1078 ret = cmd_truncate(target, offset);
1083 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1084 char *target = argv[i+1];
1085 ret = cmd_delete(target);
1090 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1091 char *target = argv[i+1];
1092 ret = cmd_acquire(target);
1097 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1098 char *target = argv[i+1];
1099 ret = cmd_release(target);
1104 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1105 char *src = argv[i+1];
1106 char *dst = argv[i+2];
1107 ret = cmd_copy(src, dst);
1112 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1113 char *src = argv[i+1];
1114 char *dst = argv[i+2];
1115 ret = cmd_clone(src, dst);
1120 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1121 char *target = argv[i+1];
1122 ret = cmd_info(target);
1128 if (!parse_ports(argv[i]))
1129 fprintf(stderr, "invalid argument: %s\n", argv[i]);