12 #include <xseg/xseg.h>
16 printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
18 " <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
25 " bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
28 " alloc_requests (to source) <nr>\n"
29 " free_requests (from source) <nr>\n"
30 " put_requests (all from dest)\n"
31 " put_replies (all from dest)\n"
32 " wait <nr_replies>\n"
33 " complete <nr_requests>\n"
34 " fail <nr_requests>\n"
35 " rndwrite <nr_loops> <seed> <namesize> <datasize> <objectsize>\n"
36 " rndread <nr_loops> <seed> <namesize> <datasize> <objectsize>\n"
38 " read <name> <offset> <size>\n"
39 " write <name> <offset> < data\n"
40 " truncate <name> <size>\n"
45 " clone <src> <dst>\n"
52 struct xseg_config cfg;
54 uint32_t srcport, dstport;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59 * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
62 void mkname_heavy(char *name, uint32_t namesize, uint32_t seed)
66 for (i = 0; i < namesize; i += 1) {
67 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
68 c = '0' + ((c + (c >> 4)) & 0xf);
72 seed *= ((seed % 137911) | 1) * 137911;
76 void mkname_light(char *name, uint32_t namesize, uint32_t seed)
80 for (i = 0; i < namesize; i += 1) {
82 name[i] = 'A' + (c & 0xf);
87 uint64_t pick(uint64_t size)
89 return (uint64_t)((double)(RAND_MAX) / random());
92 void mkchunk( char *chunk, uint32_t datasize,
93 char *name, uint32_t namesize, uint64_t offset)
95 long i, r, bufsize = namesize + 16;
97 r = datasize % bufsize;
98 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name);
100 for (i = 0; i <= (long)datasize - bufsize; i += bufsize)
101 memcpy(chunk + i, buf, bufsize);
103 memcpy(chunk + datasize - r, buf, r);
106 int chkchunk( char *chunk, uint32_t datasize,
107 char *name, uint32_t namesize, uint64_t offset)
110 int bufsize = namesize + 16;
112 r = datasize % namesize;
113 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name);
115 for (i = 0; i <= (long)datasize - bufsize; i += bufsize)
116 if (memcmp(chunk + i, buf, bufsize)) {
117 /*printf("mismatch: '%*s'* vs '%*s'\n",
118 bufsize, buf, datasize, chunk);
123 if (memcmp(chunk + datasize - r, buf, r))
130 #define ALLOC_MIN 4096
131 #define ALLOC_MAX 1048576
133 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
135 static uint64_t alloc_size;
141 if (alloc_size < ALLOC_MIN)
142 alloc_size = ALLOC_MIN;
144 if (alloc_size > ALLOC_MAX)
145 alloc_size = ALLOC_MAX;
147 p = realloc(buf, alloc_size);
158 r = fread(buf + size, 1, alloc_size - size, fp);
162 if (size >= alloc_size) {
163 p = realloc(buf, alloc_size * 2);
181 void report_request(struct xseg_request *req)
183 uint32_t max = req->datasize;
186 req->data[max-1] = 0;
187 fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
188 fprintf(stderr, "data: %s\n", req->data);
191 int cmd_info(char *name)
193 uint32_t namesize = strlen(name);
194 size_t size = sizeof(uint64_t);
197 struct xseg_request *req;
199 req = xseg_get_request(xseg, srcport);
201 fprintf(stderr, "No request!\n");
205 r = xseg_prep_request(req, namesize, size);
207 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
208 (unsigned long) namesize, (unsigned long) size);
209 xseg_put_request(xseg, srcport, req);
213 strncpy(req->name, name, namesize);
218 srl = xseg_submit(xseg, dstport, req);
222 xseg_signal(xseg, dstport);
227 int cmd_read(char *name, uint64_t offset, uint64_t size)
229 uint32_t namesize = strlen(name);
232 struct xseg_request *req = xseg_get_request(xseg, srcport);
234 fprintf(stderr, "No request\n");
238 r = xseg_prep_request(req, namesize, size);
240 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
241 (unsigned long)namesize, (unsigned long long)size);
242 xseg_put_request(xseg, srcport, req);
246 strncpy(req->name, name, namesize);
247 req->offset = offset;
251 srl = xseg_submit(xseg, dstport, req);
255 xseg_signal(xseg, dstport);
259 int cmd_write(char *name, uint64_t offset)
265 uint32_t namesize = strlen(name);
266 struct xseg_request *req;
268 inputbuf(stdin, &buf, &size);
270 fprintf(stderr, "No input\n");
274 req = xseg_get_request(xseg, srcport);
276 fprintf(stderr, "No request\n");
280 r = xseg_prep_request(req, namesize, size);
282 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
283 (unsigned long)namesize, (unsigned long long)size);
284 xseg_put_request(xseg, srcport, req);
288 strncpy(req->name, name, namesize);
289 memcpy(req->buffer, buf, size);
290 req->offset = offset;
294 srl = xseg_submit(xseg, dstport, req);
296 fprintf(stderr, "Cannot submit\n");
303 int cmd_truncate(char *name, uint64_t offset)
308 int cmd_delete(char *name)
313 int cmd_acquire(char *name)
318 int cmd_release(char *name)
323 int cmd_copy(char *src, char *dst)
328 int cmd_clone(char *src, char *dst)
333 void log_req( uint32_t portno2, uint32_t portno1, int op, int method,
334 struct xseg_request *req)
340 #define LOG_RECEIVE 1
342 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
344 struct xseg_request *req;
346 if (!strcmp(logfile, "-"))
349 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
356 if (!strcmp(how, "full"))
358 else if (!strcmp(how, "full"))
364 int reloop = 0, active;
365 xseg_prepare_wait(xseg, portno1);
366 xseg_prepare_wait(xseg, portno2);
372 req = xseg_accept(xseg, portno1);
374 xseg_submit(xseg, portno2, req);
375 log_req(portno1, portno2, LOG_ACCEPT, method, req);
379 req = xseg_accept(xseg, portno2);
381 xseg_submit(xseg, portno1, req);
382 log_req(portno2, portno1, LOG_ACCEPT, method, req);
386 req = xseg_receive(xseg, portno1);
388 xseg_respond(xseg, portno2, req);
389 log_req(portno1, portno2, LOG_RECEIVE, method, req);
393 req = xseg_receive(xseg, portno2);
395 xseg_respond(xseg, portno1, req);
396 log_req(portno2, portno1, LOG_RECEIVE, method, req);
403 /* wait on multiple queues? */
404 xseg_wait_signal(xseg, portno1, 100000);
407 xseg_cancel_wait(xseg, portno1);
408 xseg_cancel_wait(xseg, portno2);
417 int cmd_rndwrite(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size)
422 if (namesize >= chunksize) {
423 fprintf(stderr, "namesize >= chunksize\n");
427 char *p = realloc(namebuf, namesize+1);
429 fprintf(stderr, "Cannot allocate memory\n");
434 p = realloc(chunk, chunksize);
436 fprintf(stderr, "Cannot allocate memory\n");
440 memset(chunk, 0, chunksize);
444 struct xseg_request *submitted = NULL, *received;
445 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
451 xseg_prepare_wait(xseg, srcport);
452 if (nr_submitted < loops &&
453 (submitted = xseg_get_request(xseg, srcport))) {
454 xseg_cancel_wait(xseg, srcport);
455 r = xseg_prep_request(submitted, namesize, chunksize);
457 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
458 namesize, chunksize);
459 xseg_put_request(xseg, submitted->portno, submitted);
466 mkname(namebuf, namesize, seed);
467 namebuf[namesize] = 0;
468 //printf("%ld: %s\n", nr_submitted, namebuf);
469 strncpy(submitted->name, namebuf, namesize);
470 offset = 0;// pick(size);
471 mkchunk(submitted->buffer, chunksize, namebuf, namesize, offset);
473 submitted->offset = offset;
474 submitted->size = chunksize;
475 submitted->op = X_WRITE;
476 submitted->flags |= XF_NOSYNC;
478 srl = xseg_submit(xseg, dstport, submitted);
480 xseg_signal(xseg, dstport);
483 received = xseg_receive(xseg, srcport);
485 xseg_cancel_wait(xseg, srcport);
487 if (!(received->state & XS_SERVED)) {
489 report_request(received);
491 if (xseg_put_request(xseg, received->portno, received))
492 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
495 if (!submitted && !received)
496 xseg_wait_signal(xseg, srcport, 1000000);
498 if (nr_submitted % 1000 == 0 && !reported) {
500 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
501 nr_submitted, nr_received, nr_failed);
504 if (nr_received >= loops)
508 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
509 nr_submitted, nr_received, nr_failed);
514 * prepare/wait rhythm,
515 * files are converted to independent chunk access patterns,
518 int cmd_rndread(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size)
523 if (namesize >= chunksize) {
524 fprintf(stderr, "namesize >= chunksize\n");
528 char *p = realloc(namebuf, namesize+1);
530 fprintf(stderr, "Cannot allocate memory\n");
535 p = realloc(chunk, chunksize);
537 fprintf(stderr, "Cannot allocate memory\n");
541 memset(chunk, 0, chunksize);
545 struct xseg_request *submitted = NULL, *received;
546 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
553 xseg_prepare_wait(xseg, srcport);
554 if (nr_submitted < loops &&
555 (submitted = xseg_get_request(xseg, srcport))) {
556 xseg_cancel_wait(xseg, srcport);
557 r = xseg_prep_request(submitted, namesize, chunksize);
559 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
560 namesize, chunksize);
561 xseg_put_request(xseg, submitted->portno, submitted);
568 mkname(namebuf, namesize, seed);
569 namebuf[namesize] = 0;
570 //printf("%ld: %s\n", nr_submitted, namebuf);
571 offset = 0;//pick(size);
573 strncpy(submitted->name, namebuf, namesize);
574 submitted->offset = offset;
575 submitted->size = chunksize;
576 submitted->op = X_READ;
578 srl = xseg_submit(xseg, dstport, submitted);
580 xseg_signal(xseg, dstport);
583 received = xseg_receive(xseg, srcport);
585 xseg_cancel_wait(xseg, srcport);
587 if (!(received->state & XS_SERVED)) {
589 report_request(received);
590 } else if (!chkchunk(received->data, received->datasize,
591 received->name, received->namesize, received->offset)) {
595 if (xseg_put_request(xseg, received->portno, received))
596 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
599 if (!submitted && !received)
600 xseg_wait_signal(xseg, srcport, 1000000);
602 if (nr_submitted % 1000 == 0 && !reported) {
604 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
605 nr_submitted, nr_received, nr_failed, nr_mismatch);
608 if (nr_received >= loops)
612 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
613 nr_submitted, nr_received, nr_failed, nr_mismatch);
617 int cmd_report(uint32_t port)
619 struct xq *fq, *rq, *pq;
620 fq = &xseg->ports[port].free_queue;
621 rq = &xseg->ports[port].request_queue;
622 pq = &xseg->ports[port].reply_queue;
623 fprintf(stderr, "port %u:\n"
624 " free_queue [%p] count : %u\n"
625 " request_queue [%p] count : %u\n"
626 " reply_queue [%p] count : %u\n",
628 (void *)fq, xq_count(fq),
629 (void *)rq, xq_count(rq),
630 (void *)pq, xq_count(pq));
639 xseg = xseg_join(cfg.type, cfg.name);
641 fprintf(stderr, "cannot join segment!\n");
647 int cmd_reportall(void)
654 fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
655 for (t = 0; t < xseg->config.nr_ports; t++)
663 int r = xseg_create(&cfg);
665 fprintf(stderr, "cannot create segment!\n");
669 fprintf(stderr, "Segment initialized.\n");
673 int cmd_destroy(void)
675 if (!xseg && cmd_join())
679 fprintf(stderr, "Segment destroyed.\n");
683 int cmd_alloc_requests(unsigned long nr)
685 return xseg_alloc_requests(xseg, srcport, nr);
688 int cmd_free_requests(unsigned long nr)
690 return xseg_free_requests(xseg, srcport, nr);
693 int cmd_put_requests(void)
695 struct xseg_request *req;
698 req = xseg_accept(xseg, dstport);
701 if (xseg_put_request(xseg, req->portno, req))
702 fprintf(stderr, "Cannot put request at port %u\n", req->portno);
708 int cmd_finish(unsigned long nr, int fail)
710 struct xseg_request *req;
713 req = xseg_accept(xseg, srcport);
717 req->state &= ~XS_SERVED;
719 req->state |= XS_SERVED;
720 xseg_respond(xseg, dstport, req);
721 xseg_signal(xseg, dstport);
727 void handle_reply(struct xseg_request *req)
729 if (!(req->state & XS_SERVED)) {
736 fwrite(req->data, 1, req->datasize, stdout);
746 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req->data));
754 if (xseg_put_request(xseg, req->portno, req))
755 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
758 int cmd_wait(uint32_t nr)
760 struct xseg_request *req;
764 req = xseg_receive(xseg, srcport);
773 ret = xseg_prepare_wait(xseg, srcport);
777 ret = xseg_wait_signal(xseg, srcport, 1000000);
778 ret = xseg_cancel_wait(xseg, srcport);
786 int cmd_put_replies(void)
788 struct xseg_request *req;
791 req = xseg_receive(xseg, dstport);
794 fprintf(stderr, "request: %08llx%08llx\n"
797 0LL, (unsigned long long)req->serial,
802 //fwrite(req->buffer, 1, req->buffersize, stdout);
804 if (xseg_put_request(xseg, req->portno, req))
805 fprintf(stderr, "Cannot put reply\n");
811 int cmd_bind(long portno)
813 struct xseg_port *port = xseg_bind_port(xseg, portno);
815 fprintf(stderr, "failed to bind port %ld\n", portno);
819 fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
823 int cmd_signal(uint32_t portno)
825 return xseg_signal(xseg, portno);
828 int parse_ports(char *str)
839 if ((s > str) && isdigit(str[0])) {
853 if ((s > str) && isdigit(str[0])) {
865 int main(int argc, char **argv)
877 if (xseg_parse_spec(spec, &cfg)) {
878 fprintf(stderr, "Cannot parse spec\n");
882 if (xseg_initialize("posix")) {
883 fprintf(stderr, "cannot initialize!\n");
887 for (i = 2; i < argc; i++) {
889 if (!strcmp(argv[i], "create")) {
894 if (!strcmp(argv[i], "join")) {
897 fprintf(stderr, "Segment joined.\n");
901 if (!strcmp(argv[i], "destroy")) {
909 if (!strcmp(argv[i], "reportall")) {
910 ret = cmd_reportall();
914 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
915 ret = cmd_bind(atol(argv[i+1]));
920 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
921 ret = cmd_signal(atol(argv[i+1]));
926 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
927 ret = cmd_bridge(atol(argv[i+1]),
936 if (!parse_ports(argv[i]))
937 fprintf(stderr, "source port undefined: %s\n", argv[i]);
942 if (!parse_ports(argv[i]))
943 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
947 if (!strcmp(argv[i], "report")) {
948 ret = cmd_report(dstport);
952 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
953 ret = cmd_alloc_requests(atol(argv[i+1]));
958 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
959 ret = cmd_free_requests(atol(argv[i+1]));
964 if (!strcmp(argv[i], "put_requests")) {
965 ret = cmd_put_requests();
969 if (!strcmp(argv[i], "put_replies")) {
970 ret = cmd_put_replies();
974 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
975 ret = cmd_finish(atol(argv[i+1]), 0);
980 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
981 ret = cmd_finish(atol(argv[i+1]), 1);
986 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
987 ret = cmd_wait(atol(argv[i+1]));
992 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
993 long nr_loops = atol(argv[i+1]);
994 unsigned int seed = atoi(argv[i+2]);
995 unsigned int namesize = atoi(argv[i+3]);
996 unsigned int chunksize = atoi(argv[i+4]);
997 unsigned long objectsize = atol(argv[i+5]);
998 ret = cmd_rndwrite(nr_loops, seed, namesize, chunksize, objectsize);
1003 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1004 long nr_loops = atol(argv[i+1]);
1005 unsigned int seed = atoi(argv[i+2]);
1006 unsigned int namesize = atoi(argv[i+3]);
1007 unsigned int chunksize = atoi(argv[i+4]);
1008 unsigned long objectsize = atol(argv[i+5]);
1009 ret = cmd_rndread(nr_loops, seed, namesize, chunksize, objectsize);
1014 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1015 char *name = argv[i+1];
1016 uint64_t offset = atol(argv[i+2]);
1017 uint64_t size = atol(argv[i+3]);
1018 ret = cmd_read(name, offset, size);
1023 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1024 char *name = argv[i+1];
1025 uint64_t offset = atol(argv[i+2]);
1026 ret = cmd_write(name, offset);
1031 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1032 char *name = argv[i+1];
1033 uint64_t offset = atol(argv[i+2]);
1034 ret = cmd_truncate(name, offset);
1039 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1040 char *name = argv[i+1];
1041 ret = cmd_delete(name);
1046 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1047 char *name = argv[i+1];
1048 ret = cmd_acquire(name);
1053 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1054 char *name = argv[i+1];
1055 ret = cmd_release(name);
1060 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1061 char *src = argv[i+1];
1062 char *dst = argv[i+2];
1063 ret = cmd_copy(src, dst);
1068 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1069 char *src = argv[i+1];
1070 char *dst = argv[i+2];
1071 ret = cmd_clone(src, dst);
1076 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1077 char *name = argv[i+1];
1078 ret = cmd_info(name);
1084 if (!parse_ports(argv[i]))
1085 fprintf(stderr, "invalid argument: %s\n", argv[i]);