12 #include <xtypes/xhash.h>
13 #include <xtypes/xobj.h>
14 #include <xseg/xseg.h>
15 #include <xseg/protocol.h>
18 printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
20 " <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
27 " bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
30 " alloc_requests (to source) <nr>\n"
31 " free_requests (from source) <nr>\n"
32 " put_requests (all from dest)\n"
33 " put_replies (all from dest)\n"
34 " wait <nr_replies>\n"
35 " complete <nr_requests>\n"
36 " fail <nr_requests>\n"
37 " rndwrite <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
38 " rndread <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
39 " submit_reqs <nr_loops> <concurrent_reqs>\n"
41 " read <target> <offset> <size>\n"
42 " write <target> <offset> < data\n"
43 " truncate <target> <size>\n"
48 " clone <src> <dst>\n"
68 struct xseg_config cfg;
70 uint32_t srcport, dstport;
72 #define mkname mkname_heavy
73 /* heavy distributes duplicates much more widely than light
74 * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
78 static void init_local_signal()
80 if (xseg && sport != srcport){
81 xseg_init_local_signal(xseg, srcport);
86 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
90 for (i = 0; i < namelen; i += 1) {
91 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
92 c = '0' + ((c + (c >> 4)) & 0xf);
96 seed *= ((seed % 137911) | 1) * 137911;
100 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
104 for (i = 0; i < namelen; i += 1) {
106 name[i] = 'A' + (c & 0xf);
111 uint64_t pick(uint64_t size)
113 return (uint64_t)((double)(RAND_MAX) / random());
116 void mkchunk( char *chunk, uint32_t datalen,
117 char *target, uint32_t targetlen, uint64_t offset)
119 long i, r, bufsize = targetlen + 16;
121 r = datalen % bufsize;
122 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
124 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125 memcpy(chunk + i, buf, bufsize);
127 memcpy(chunk + datalen - r, buf, r);
130 int chkchunk( char *chunk, uint32_t datalen,
131 char *target, uint32_t targetlen, uint64_t offset)
134 int bufsize = targetlen + 16;
136 r = datalen % targetlen;
137 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
139 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
140 if (memcmp(chunk + i, buf, bufsize)) {
141 /*printf("mismatch: '%*s'* vs '%*s'\n",
142 bufsize, buf, datalen, chunk);
147 if (memcmp(chunk + datalen - r, buf, r))
154 #define ALLOC_MIN 4096
155 #define ALLOC_MAX 1048576
157 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
159 static uint64_t alloc_size;
165 if (alloc_size < ALLOC_MIN)
166 alloc_size = ALLOC_MIN;
168 if (alloc_size > ALLOC_MAX)
169 alloc_size = ALLOC_MAX;
171 p = realloc(buf, alloc_size);
182 r = fread(buf + size, 1, alloc_size - size, fp);
186 if (size >= alloc_size) {
187 p = realloc(buf, alloc_size * 2);
205 void report_request(struct xseg_request *req)
207 char target[64], data[64];
208 char *req_target, *req_data;
209 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
210 req_target = xseg_get_target(xseg, req);
211 req_data = xseg_get_data(xseg, req);
213 strncpy(target, req_target, end);
215 strncpy(data, req_data, 63);
218 "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
219 "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
220 "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
221 (unsigned long) req, req->targetlen, (unsigned long long)req->target,
223 (unsigned long long) req->datalen, (unsigned long long) req->data,
225 (unsigned long long) req->offset, (unsigned long long) req->size,
226 (unsigned long long) req->serviced, req->op, req->state, req->flags,
227 (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
228 (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
233 int cmd_info(char *target)
235 uint32_t targetlen = strlen(target);
236 size_t size = sizeof(uint64_t);
239 struct xseg_request *req;
242 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
244 fprintf(stderr, "No request!\n");
248 r = xseg_prep_request(xseg, req, targetlen, size);
250 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
251 (unsigned long) targetlen, (unsigned long) size);
252 xseg_put_request(xseg, req, srcport);
256 req_target = xseg_get_target(xseg, req);
257 strncpy(req_target, target, targetlen);
262 p = xseg_submit(xseg, req, srcport, X_ALLOC);
266 xseg_signal(xseg, p);
271 int cmd_read(char *target, uint64_t offset, uint64_t size)
273 uint32_t targetlen = strlen(target);
277 struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
279 fprintf(stderr, "No request\n");
283 r = xseg_prep_request(xseg, req, targetlen, size);
285 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
286 (unsigned long)targetlen, (unsigned long long)size);
287 xseg_put_request(xseg, req, srcport);
291 req_target = xseg_get_target(xseg, req);
292 strncpy(req_target, target, targetlen);
293 req->offset = offset;
297 p = xseg_submit(xseg, req, srcport, X_ALLOC);
301 xseg_signal(xseg, p);
305 int cmd_write(char *target, uint64_t offset)
311 char *req_target, *req_data;
312 uint32_t targetlen = strlen(target);
313 struct xseg_request *req;
315 inputbuf(stdin, &buf, &size);
317 fprintf(stderr, "No input\n");
321 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
323 fprintf(stderr, "No request\n");
327 r = xseg_prep_request(xseg, req, targetlen, size);
329 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
330 (unsigned long)targetlen, (unsigned long long)size);
331 xseg_put_request(xseg, req, srcport);
335 req_target = xseg_get_target(xseg, req);
336 strncpy(req_target, target, targetlen);
338 req_data = xseg_get_data(xseg, req);
339 memcpy(req_data, buf, size);
340 req->offset = offset;
344 p = xseg_submit(xseg, req, srcport, X_ALLOC);
346 fprintf(stderr, "Cannot submit\n");
349 xseg_signal(xseg, p);
354 int cmd_truncate(char *target, uint64_t offset)
359 int cmd_delete(char *target)
361 uint32_t targetlen = strlen(target);
363 struct xseg_request *req;
365 xseg_bind_port(xseg, srcport, NULL);
367 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
369 fprintf(stderr, "No request!\n");
373 r = xseg_prep_request(xseg, req, targetlen, 0);
375 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
376 (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
377 xseg_put_request(xseg, req, srcport);
381 char *reqtarget = xseg_get_target(xseg, req);
382 strncpy(reqtarget, target, targetlen);
385 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
387 fprintf(stderr, "Couldn't submit request\n");
388 xseg_put_request(xseg, req, srcport);
392 xseg_signal(xseg, p);
397 int cmd_acquire(char *target)
399 uint32_t targetlen = strlen(target);
403 struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
405 fprintf(stderr, "No request\n");
409 r = xseg_prep_request(xseg, req, targetlen, 0);
411 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
412 (unsigned long)targetlen);
413 xseg_put_request(xseg, req, srcport);
417 req_target = xseg_get_target(xseg, req);
418 strncpy(req_target, target, targetlen);
422 p = xseg_submit(xseg, req, srcport, X_ALLOC);
426 xseg_signal(xseg, p);
430 int cmd_release(char *target)
432 uint32_t targetlen = strlen(target);
436 struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
438 fprintf(stderr, "No request\n");
442 r = xseg_prep_request(xseg, req, targetlen, 0);
444 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
445 (unsigned long)targetlen);
446 xseg_put_request(xseg, req, srcport);
450 req_target = xseg_get_target(xseg, req);
451 strncpy(req_target, target, targetlen);
455 p = xseg_submit(xseg, req, srcport, X_ALLOC);
459 xseg_signal(xseg, p);
464 int cmd_copy(char *src, char *dst)
466 uint32_t targetlen = strlen(dst);
467 uint32_t parentlen = strlen(src);
468 struct xseg_request *req;
469 struct xseg_request_copy *xcopy;
470 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
472 fprintf(stderr, "No request\n");
476 int r = xseg_prep_request(xseg, req, targetlen,
477 sizeof(struct xseg_request_copy));
479 fprintf(stderr, "Cannot prepare request!\n");
480 xseg_put_request(xseg, req, srcport);
484 char *target = xseg_get_target(xseg, req);
485 char *data = xseg_get_data(xseg, req);
487 strncpy(target, dst, targetlen);
488 xcopy = (struct xseg_request_copy *) data;
489 strncpy(xcopy->target, src, parentlen);
490 xcopy->targetlen = parentlen;
492 req->size = sizeof(struct xseg_request_copy);
495 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
497 fprintf(stderr, "Cannot submit request\n");
500 xseg_signal(xseg, p);
506 int cmd_clone(char *src, char *dst)
509 uint32_t targetlen = strlen(dst);
510 uint32_t parentlen = strlen(src);
511 struct xseg_request *req;
512 struct xseg_request_clone *xclone;
513 xseg_bind_port(xseg, srcport, NULL);
514 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
516 fprintf(stderr, "No request\n");
520 int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
522 fprintf(stderr, "Cannot prepare request!\n");
523 xseg_put_request(xseg, req, srcport);
527 char *target = xseg_get_target(xseg, req);
528 char *data = xseg_get_data(xseg, req);
530 strncpy(target, dst, targetlen);
531 xclone = (struct xseg_request_clone *) data;
532 strncpy(xclone->target, src, parentlen);
533 xclone->targetlen = parentlen;
536 req->size = sizeof(struct xseg_request_clone);
539 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
541 fprintf(stderr, "Cannot submit request\n");
544 xseg_signal(xseg, p);
549 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
550 struct xseg_request *req)
553 char target[64], data[64];
554 char *req_target, *req_data;
555 /* null terminate name in case of req->target is less than 63 characters,
556 * and next character after name (aka first byte of next buffer) is not
559 unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
561 req_target = xseg_get_target(xseg, req);
562 req_data = xseg_get_data(xseg, req);
564 logfp = fdopen(logfd, "a");
570 strncpy(target, req_target, end);
572 strncpy(data, req_data, 63);
576 "src port: %u, dst port: %u, op:%u offset: %llu size: %lu, reqstate: %u\n"
577 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
578 (unsigned int)portno1,
579 (unsigned int)portno2,
580 (unsigned int)req->op,
581 (unsigned long long)req->offset,
582 (unsigned long)req->size,
583 (unsigned int)req->state,
584 (unsigned int)req->targetlen, target,
585 (unsigned long long)req->datalen, data);
589 "src port: %u, dst port: %u, op: %u\n",
590 (unsigned int)portno1,
591 (unsigned int)portno2,
592 (unsigned int)req->op);
595 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
596 (unsigned int)portno1,
597 (unsigned int)portno2,
598 (unsigned long long)++reqs);
606 #define LOG_RECEIVE 1
608 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
610 struct xseg_request *req;
612 if (!strcmp(logfile, "-"))
615 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
622 if (!strcmp(how, "full"))
624 else if (!strcmp(how, "summary"))
630 int reloop = 0, active;
631 xseg_prepare_wait(xseg, portno1);
632 xseg_prepare_wait(xseg, portno2);
639 req = xseg_accept(xseg, portno1, 0);
641 xseg_submit(xseg, req, portno2, X_ALLOC);
642 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
646 req = xseg_accept(xseg, portno2, 0);
648 xseg_submit(xseg, req, portno1, X_ALLOC);
649 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
653 req = xseg_receive(xseg, portno1, 0);
655 xseg_respond(xseg, req, portno2, X_ALLOC);
656 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
660 req = xseg_receive(xseg, portno2, 0);
662 xseg_respond(xseg, req, portno1, X_ALLOC);
663 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
670 /* wait on multiple queues? */
671 xseg_wait_signal(xseg, 100000);
674 xseg_cancel_wait(xseg, portno1);
675 xseg_cancel_wait(xseg, portno2);
686 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
691 if (targetlen >= chunksize) {
692 fprintf(stderr, "targetlen >= chunksize\n");
696 char *p = realloc(namebuf, targetlen+1);
698 fprintf(stderr, "Cannot allocate memory\n");
703 p = realloc(chunk, chunksize);
705 fprintf(stderr, "Cannot allocate memory\n");
709 memset(chunk, 0, chunksize);
713 struct xseg_request *submitted = NULL, *received;
714 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
718 char *req_data, *req_target;
723 xseg_prepare_wait(xseg, srcport);
724 if (nr_submitted < loops &&
725 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
726 xseg_cancel_wait(xseg, srcport);
727 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
729 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
730 targetlen, chunksize);
731 xseg_put_request(xseg, submitted, srcport);
735 req_target = xseg_get_target(xseg, submitted);
736 req_data = xseg_get_data(xseg, submitted);
739 mkname(namebuf, targetlen, seed);
740 namebuf[targetlen] = 0;
741 //printf("%ld: %s\n", nr_submitted, namebuf);
742 strncpy(req_target, namebuf, targetlen);
743 offset = 0;// pick(size);
744 mkchunk(req_data, chunksize, namebuf, targetlen, offset);
746 submitted->offset = offset;
747 submitted->size = chunksize;
748 submitted->op = X_WRITE;
749 submitted->flags |= XF_NOSYNC;
751 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
752 if (port == NoPort) {
753 xseg_put_request(xseg, submitted, srcport);
757 xseg_signal(xseg, port);
761 received = xseg_receive(xseg, srcport, 0);
763 xseg_cancel_wait(xseg, srcport);
765 if (!(received->state & XS_SERVED)) {
767 report_request(received);
769 if (xseg_put_request(xseg, received, srcport))
770 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
773 if (!submitted && !received)
774 xseg_wait_signal(xseg, 1000000);
776 if (nr_submitted % 1000 == 0 && !reported) {
778 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
779 nr_submitted, nr_received, nr_failed);
782 if (nr_received >= loops)
786 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
787 nr_submitted, nr_received, nr_failed);
791 int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
795 char *p = realloc(namebuf, targetlen+1);
797 fprintf(stderr, "Cannot allocate memory\n");
804 struct xseg_request *submitted = NULL, *received;
805 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
813 xseg_prepare_wait(xseg, srcport);
814 if (nr_submitted < loops &&
815 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
816 xseg_cancel_wait(xseg, srcport);
817 r = xseg_prep_request(xseg, submitted, targetlen, 0);
819 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
821 xseg_put_request(xseg, submitted, srcport);
825 req_target = xseg_get_target(xseg, submitted);
828 mkname(namebuf, targetlen, seed);
829 namebuf[targetlen] = 0;
830 //printf("%ld: %s\n", nr_submitted, namebuf);
831 strncpy(req_target, namebuf, targetlen);
832 submitted->offset = 0;
834 submitted->op = X_DELETE;
835 submitted->flags = 0;
837 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
838 if (port == NoPort) {
839 xseg_put_request(xseg, submitted, srcport);
843 xseg_signal(xseg, port);
847 received = xseg_receive(xseg, srcport, 0);
849 xseg_cancel_wait(xseg, srcport);
851 if (!(received->state & XS_SERVED)) {
853 report_request(received);
855 if (xseg_put_request(xseg, received, srcport))
856 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
859 if (!submitted && !received)
860 xseg_wait_signal(xseg, 1000000);
862 if (nr_submitted % 1000 == 0 && !reported) {
864 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
865 nr_submitted, nr_received, nr_failed);
868 if (nr_received >= loops)
872 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
873 nr_submitted, nr_received, nr_failed);
877 * prepare/wait rhythm,
878 * files are converted to independent chunk access patterns,
881 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
886 if (targetlen >= chunksize) {
887 fprintf(stderr, "targetlen >= chunksize\n");
891 char *p = realloc(namebuf, targetlen+1);
893 fprintf(stderr, "Cannot allocate memory\n");
898 p = realloc(chunk, chunksize);
900 fprintf(stderr, "Cannot allocate memory\n");
904 memset(chunk, 0, chunksize);
908 struct xseg_request *submitted = NULL, *received;
909 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
913 char *req_data, *req_target;
919 xseg_prepare_wait(xseg, srcport);
920 if (nr_submitted < loops &&
921 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
922 xseg_cancel_wait(xseg, srcport);
923 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
925 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
926 targetlen, chunksize);
927 xseg_put_request(xseg, submitted, srcport);
931 req_target = xseg_get_target(xseg, submitted);
933 mkname(namebuf, targetlen, seed);
934 namebuf[targetlen] = 0;
935 //printf("%ld: %s\n", nr_submitted, namebuf);
936 offset = 0;//pick(size);
938 strncpy(req_target, namebuf, targetlen);
939 submitted->offset = offset;
940 submitted->size = chunksize;
941 submitted->op = X_READ;
942 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
943 if (port == NoPort) {
944 xseg_put_request(xseg, submitted, srcport);
948 xseg_signal(xseg, port);
952 received = xseg_receive(xseg, srcport, 0);
954 xseg_cancel_wait(xseg, srcport);
956 req_target = xseg_get_target(xseg, received);
957 req_data = xseg_get_data(xseg, received);
958 if (!(received->state & XS_SERVED)) {
960 report_request(received);
961 } else if (!chkchunk(req_data, received->datalen,
962 req_target, received->targetlen, received->offset)) {
963 // report_request(received);
967 if (xseg_put_request(xseg, received, srcport))
968 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
971 if (!submitted && !received)
972 xseg_wait_signal(xseg, 1000000);
974 if (nr_submitted % 1000 == 0 && !reported) {
976 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
977 nr_submitted, nr_received, nr_failed, nr_mismatch);
980 if (nr_received >= loops)
984 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
985 nr_submitted, nr_received, nr_failed, nr_mismatch);
989 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
994 struct xseg_request *submitted = NULL, *received;
995 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
998 uint32_t targetlen = 10, chunksize = 4096;
999 struct timeval tv1, tv2;
1001 char *req_data, *req_target;
1003 xseg_bind_port(xseg, srcport, NULL);
1005 gettimeofday(&tv1, NULL);
1008 xseg_prepare_wait(xseg, srcport);
1009 if (nr_submitted < loops && nr_flying < concurrent_reqs &&
1010 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
1011 xseg_cancel_wait(xseg, srcport);
1012 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
1014 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
1015 targetlen, chunksize);
1016 xseg_put_request(xseg, submitted, srcport);
1024 offset = 0;//pick(size);
1026 submitted->offset = offset;
1027 submitted->size = chunksize;
1028 req_target = xseg_get_target(xseg, submitted);
1029 req_data = xseg_get_data(xseg, submitted);
1032 submitted->op = X_INFO;
1034 submitted->op = X_READ;
1036 submitted->op = X_WRITE;
1037 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
1040 p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
1042 if (xseg_signal(xseg, p) < 0)
1043 perror("Cannot signal peer");
1046 received = xseg_receive(xseg, srcport, 0);
1048 xseg_cancel_wait(xseg, srcport);
1050 if (nr_received == 0)
1051 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
1052 (unsigned long long)received->elapsed);
1054 if (!(received->state & XS_SERVED)) {
1056 //report_request(received);
1059 if (xseg_put_request(xseg, received, srcport))
1060 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
1063 if (!submitted && !received)
1064 xseg_wait_signal(xseg, 10000000L);
1066 if (nr_received >= loops)
1069 gettimeofday(&tv2, NULL);
1071 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1072 nr_submitted, nr_received, nr_failed, nr_mismatch);
1073 long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
1074 fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
1079 static void lock_status(struct xlock *lock, char *buf, int len)
1082 if (lock->owner == Noone)
1083 r = snprintf(buf, len, "Locked: No");
1085 r = snprintf(buf, len, "Locked: Yes (Owner: %lu)", lock->owner);
1090 int cmd_report(uint32_t portno)
1092 char fls[64], rls[64], pls[64]; // buffer to store lock status
1093 struct xseg_port *port = xseg_get_port(xseg, portno);
1095 printf("port %u is not assigned\n", portno);
1098 struct xq *fq, *rq, *pq;
1099 fq = xseg_get_queue(xseg, port, free_queue);
1100 rq = xseg_get_queue(xseg, port, request_queue);
1101 pq = xseg_get_queue(xseg, port, reply_queue);
1102 lock_status(&port->fq_lock, fls, 64);
1103 lock_status(&port->rq_lock, rls, 64);
1104 lock_status(&port->pq_lock, pls, 64);
1105 fprintf(stderr, "port %u:\n"
1106 " requests: %llu/%llu src gw: %u dst gw: %u\n"
1107 " free_queue [%p] count : %4llu | %s\n"
1108 " request_queue [%p] count : %4llu | %s\n"
1109 " reply_queue [%p] count : %4llu | %s\n",
1110 portno, (unsigned long long)port->alloc_reqs,
1111 (unsigned long long)port->max_alloc_reqs,
1112 xseg->src_gw[portno],
1113 xseg->dst_gw[portno],
1114 (void *)fq, (unsigned long long)xq_count(fq), fls,
1115 (void *)rq, (unsigned long long)xq_count(rq), rls,
1116 (void *)pq, (unsigned long long)xq_count(pq), pls);
1125 xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
1127 fprintf(stderr, "cannot join segment!\n");
1132 static void print_hanlder(char *name, struct xobject_h *obj_h)
1135 lock_status(&obj_h->lock, ls, 64);
1136 fprintf(stderr, "%20s: free: %4llu, allocated: %4llu, allocated space: %7llu (object size: %llu), Lock %s\n",
1138 (unsigned long long) obj_h->nr_free,
1139 (unsigned long long) obj_h->nr_allocated,
1140 (unsigned long long) obj_h->allocated_space,
1141 (unsigned long long) obj_h->obj_size, ls);
1145 static void print_heap(struct xseg *xseg)
1155 MULT[2] = 1024*1024;
1156 MULT[3] = 1024*1024*1024;
1160 fprintf(stderr, "Heap usage: ");
1162 t = xseg->heap->cur;
1169 t = xseg->heap->cur / MULT[u];
1171 float tf = ((float)(xseg->heap->cur))/((float)MULT[u]);
1172 fprintf(stderr, "%2.1f %s/", tf, UNIT[u]);
1175 unsigned int tu = xseg->heap->cur / MULT[u];
1176 fprintf(stderr, "%3u %s/", tu, UNIT[u]);
1180 t = xseg->config.heap_size;
1187 t = xseg->config.heap_size/MULT[u];
1189 float tf = ((float)(xseg->config.heap_size))/(float)MULT[u];
1190 fprintf(stderr, "%2.1f %s ", tf, UNIT[u]);
1193 unsigned int tu = xseg->config.heap_size / MULT[u];
1194 fprintf(stderr, "%3u %s ", tu, UNIT[u]);
1197 lock_status(&xseg->heap->lock, ls, 64);
1198 fprintf(stderr, "(%llu / %llu), %s\n",
1199 (unsigned long long)xseg->heap->cur,
1200 (unsigned long long)xseg->config.heap_size,
1204 int cmd_reportall(void)
1211 fprintf(stderr, "Segment lock: %s\n",
1212 (xseg->shared->flags & XSEG_F_LOCK) ? "Locked" : "Unlocked");
1214 /* fprintf(stderr, "Heap usage: %llu / %llu\n", */
1215 /* (unsigned long long)xseg->heap->cur, */
1216 /* (unsigned long long)xseg->config.heap_size); */
1217 fprintf(stderr, "Handlers: \n");
1218 print_hanlder("Requests handler", xseg->request_h);
1219 print_hanlder("Ports handler", xseg->port_h);
1220 print_hanlder("Objects handler", xseg->object_handlers);
1221 fprintf(stderr, "\n");
1223 for (t = 0; t < xseg->config.nr_ports; t++)
1230 int finish_req(struct xseg_request *req, enum req_action action)
1232 if (action == COMPLETE){
1233 req->state &= ~XS_FAILED;
1234 req->state |= XS_SERVED;
1236 req->state |= XS_FAILED;
1237 req->state &= ~XS_SERVED;
1240 xport p = xseg_respond(xseg, req, srcport, X_ALLOC);
1242 xseg_put_request(xseg, req, srcport);
1244 xseg_signal(xseg, p);
1248 //FIXME this should be in xseg lib?
1249 static int isDangling(struct xseg_request *req)
1252 struct xseg_port *port;
1253 for (i = 0; i < xseg->config.nr_ports; i++) {
1254 if (xseg->ports[i]){
1255 port = xseg_get_port(xseg, i);
1257 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1260 struct xq *fq, *rq, *pq;
1261 fq = xseg_get_queue(xseg, port, free_queue);
1262 rq = xseg_get_queue(xseg, port, request_queue);
1263 pq = xseg_get_queue(xseg, port, reply_queue);
1264 xlock_acquire(&port->fq_lock, srcport);
1265 if (__xq_check(fq, XPTR_MAKE(req, xseg->segment))){
1266 xlock_release(&port->fq_lock);
1269 xlock_release(&port->fq_lock);
1270 xlock_acquire(&port->rq_lock, srcport);
1271 if (__xq_check(rq, XPTR_MAKE(req, xseg->segment))){
1272 xlock_release(&port->rq_lock);
1275 xlock_release(&port->rq_lock);
1276 xlock_acquire(&port->pq_lock, srcport);
1277 if (__xq_check(pq, XPTR_MAKE(req, xseg->segment))){
1278 xlock_release(&port->pq_lock);
1281 xlock_release(&port->pq_lock);
1287 int prompt_user(char *msg)
1290 printf("%s [y/n]: ", msg);
1293 if (c == 'y' || c == 'Y')
1295 else if (c == 'n' || c == 'N')
1297 else if (c == '\n'){
1299 printf("%s [y/n]: ", msg);
1307 //FIXME this should be in xseg lib?
1308 int cmd_verify(int fix)
1313 if (xseg->shared->flags & XSEG_F_LOCK){
1314 fprintf(stderr, "Segment lock: Locked\n");
1315 if (fix && prompt_user("Unlock it ?"))
1316 xseg->shared->flags &= ~XSEG_F_LOCK;
1319 if (xseg->heap->lock.owner != Noone){
1320 fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
1321 (unsigned long long)xseg->heap->lock.owner);
1322 if (fix && prompt_user("Unlock it ?"))
1323 xlock_release(&xseg->heap->lock);
1326 if (xseg->request_h->lock.owner != Noone){
1327 fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
1328 (unsigned long long)xseg->request_h->lock.owner);
1329 if (fix && prompt_user("Unlock it ?"))
1330 xlock_release(&xseg->request_h->lock);
1332 if (xseg->port_h->lock.owner != Noone){
1333 fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
1334 (unsigned long long)xseg->port_h->lock.owner);
1335 if (fix && prompt_user("Unlock it ?"))
1336 xlock_release(&xseg->port_h->lock);
1338 if (xseg->object_handlers->lock.owner != Noone){
1339 fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
1340 (unsigned long long)xseg->object_handlers->lock.owner);
1341 if (fix && prompt_user("Unlock it ?"))
1342 xlock_release(&xseg->object_handlers->lock);
1344 //take segment lock?
1346 struct xseg_port *port;
1347 for (i = 0; i < xseg->config.nr_ports; i++) {
1348 if (xseg->ports[i]){
1349 port = xseg_get_port(xseg, i);
1351 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1354 if (port->fq_lock.owner != Noone) {
1355 fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
1356 i, (unsigned long long)port->fq_lock.owner);
1357 if (fix && prompt_user("Unlock it ?"))
1358 xlock_release(&port->fq_lock);
1360 if (port->rq_lock.owner != Noone) {
1361 fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
1362 i, (unsigned long long)port->rq_lock.owner);
1363 if (fix && prompt_user("Unlock it ?"))
1364 xlock_release(&port->rq_lock);
1366 if (port->pq_lock.owner != Noone) {
1367 fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
1368 i, (unsigned long long)port->pq_lock.owner);
1369 if (fix && prompt_user("Unlock it ?"))
1370 xlock_release(&port->pq_lock);
1375 struct xobject_h *obj_h = xseg->request_h;
1376 struct xobject_iter it;
1378 struct xseg_request *req;
1379 xlock_acquire(&obj_h->lock, srcport);
1380 xobj_iter_init(obj_h, &it);
1381 while (xobj_iterate(obj_h, &it, (void **)&req)){
1382 //FIXME this will not work cause obj->magic - req->serial is not
1383 //touched when a request is get
1384 /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
1385 if (isDangling(req) && !__xobj_isFree(obj_h, req)){
1386 report_request(req);
1387 if (fix && prompt_user("Fail it ?")){
1388 printf("Finishing ...\n");
1389 finish_req(req, FAIL);
1393 xlock_release(&obj_h->lock);
1397 int cmd_inspectq(xport portno, enum queue qt)
1404 struct xseg_port *port = xseg_get_port(xseg, portno);
1407 if (qt == FREE_QUEUE){
1408 q = xseg_get_queue(xseg, port, free_queue);
1411 else if (qt == REQUEST_QUEUE){
1412 q = xseg_get_queue(xseg, port, request_queue);
1415 else if (qt == REPLY_QUEUE) {
1416 q = xseg_get_queue(xseg, port, reply_queue);
1421 xlock_acquire(l, srcport);
1422 xqindex i,c = xq_count(q);
1424 struct xseg_request *req;
1426 for (i = 0; i < c; i++) {
1427 xqi = __xq_pop_head(q);
1428 req = XPTR_TAKE(xqi, xseg->segment);
1429 report_request(req);
1430 __xq_append_tail(q, xqi);
1434 fprintf(stderr, "Queue is empty\n\n");
1441 int cmd_request(struct xseg_request *req, enum req_action action)
1446 struct xobject_h *obj_h = xseg->request_h;
1447 if (!xobj_check(obj_h, req))
1450 if (action == REPORT)
1451 report_request(req);
1452 else if (action == FAIL){
1453 report_request(req);
1454 if (prompt_user("fail it ?")){
1455 printf("Finishing ...\n");
1456 finish_req(req, FAIL);
1459 else if (action == COMPLETE){
1460 report_request(req);
1461 if (prompt_user("Complete it ?")){
1462 printf("Finishing ...\n");
1463 finish_req(req, COMPLETE);
1469 int cmd_create(void)
1471 int r = xseg_create(&cfg);
1473 fprintf(stderr, "cannot create segment!\n");
1477 fprintf(stderr, "Segment initialized.\n");
1481 int cmd_destroy(void)
1483 if (!xseg && cmd_join())
1488 fprintf(stderr, "Segment destroyed.\n");
1492 int cmd_alloc_requests(unsigned long nr)
1494 return xseg_alloc_requests(xseg, srcport, nr);
1497 int cmd_free_requests(unsigned long nr)
1499 return xseg_free_requests(xseg, srcport, nr);
1502 int cmd_put_requests(void)
1504 struct xseg_request *req;
1507 req = xseg_accept(xseg, dstport, 0);
1510 if (xseg_put_request(xseg, req, srcport))
1511 fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1517 int cmd_finish(unsigned long nr, int fail)
1519 struct xseg_request *req;
1520 char *buf = malloc(sizeof(char) * 8128);
1521 char *req_target, *req_data;
1522 xseg_bind_port(xseg, srcport, NULL);
1526 xseg_prepare_wait(xseg, srcport);
1527 req = xseg_accept(xseg, srcport, 0);
1529 req_target = xseg_get_target(xseg, req);
1530 req_data = xseg_get_data(xseg, req);
1531 xseg_cancel_wait(xseg, srcport);
1533 req->state &= ~XS_SERVED;
1535 if (req->op == X_READ)
1536 mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1537 else if (req->op == X_WRITE)
1538 memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1539 else if (req->op == X_INFO)
1540 *((uint64_t *) req->data) = 4294967296;
1542 req->state |= XS_SERVED;
1543 req->serviced = req->size;
1546 p = xseg_respond(xseg, req, srcport, X_ALLOC);
1547 xseg_signal(xseg, p);
1551 xseg_wait_signal(xseg, 10000000L);
1559 void handle_reply(struct xseg_request *req)
1561 char *req_data = xseg_get_data(xseg, req);
1562 char *req_target = xseg_get_target(xseg, req);
1563 if (!(req->state & XS_SERVED)) {
1564 report_request(req);
1570 fwrite(req_data, 1, req->datalen, stdout);
1574 fprintf(stdout, "wrote: ");
1575 fwrite(req_data, 1, req->datalen, stdout);
1579 fprintf(stderr, "deleted %s\n", req_target);
1584 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1587 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1590 fprintf(stderr, "copied %s\n", ((struct xseg_request_copy *)req_data)->target);
1593 fprintf(stderr, "Closed %s\n", req_target);
1595 fprintf(stderr, "Opened %s\n", req_target);
1602 if (xseg_put_request(xseg, req, srcport))
1603 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1606 int cmd_wait(uint32_t nr)
1608 struct xseg_request *req;
1610 init_local_signal();
1613 req = xseg_receive(xseg, srcport, 0);
1622 ret = xseg_prepare_wait(xseg, srcport);
1626 ret = xseg_wait_signal(xseg, 1000000);
1627 ret = xseg_cancel_wait(xseg, srcport);
1635 int cmd_put_replies(void)
1637 struct xseg_request *req;
1640 req = xseg_receive(xseg, dstport, 0);
1643 fprintf(stderr, "request: %08llx%08llx\n"
1646 0LL, (unsigned long long)req->serial,
1649 report_request(req);
1651 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1653 if (xseg_put_request(xseg, req, srcport))
1654 fprintf(stderr, "Cannot put reply\n");
1660 int cmd_bind(long portno)
1662 struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1664 fprintf(stderr, "failed to bind port %ld\n", portno);
1668 fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1672 int cmd_signal(uint32_t portno)
1674 return xseg_signal(xseg, portno);
1677 int parse_ports(char *str)
1688 if ((s > str) && isdigit(str[0])) {
1689 srcport = atol(str);
1702 if ((s > str) && isdigit(str[0])) {
1703 dstport = atol(str);
1714 int main(int argc, char **argv)
1726 if (xseg_parse_spec(spec, &cfg)) {
1727 fprintf(stderr, "Cannot parse spec\n");
1731 if (xseg_initialize()) {
1732 fprintf(stderr, "cannot initialize!\n");
1736 for (i = 2; i < argc; i++) {
1738 if (!strcmp(argv[i], "create")) {
1743 if (!strcmp(argv[i], "join")) {
1746 fprintf(stderr, "Segment joined.\n");
1750 if (!strcmp(argv[i], "destroy")) {
1751 ret = cmd_destroy();
1758 if (!strcmp(argv[i], "reportall")) {
1759 ret = cmd_reportall();
1763 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1764 ret = cmd_bind(atol(argv[i+1]));
1769 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1770 ret = cmd_signal(atol(argv[i+1]));
1775 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1776 ret = cmd_bridge(atol(argv[i+1]),
1784 if (srcport == -1) {
1785 if (!parse_ports(argv[i]))
1786 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1790 if (dstport == -1) {
1791 if (!parse_ports(argv[i]))
1792 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1796 if (!strcmp(argv[i], "verify")) {
1797 ret = cmd_verify(0);
1801 if (!strcmp(argv[i], "verify-fix")) {
1802 ret = cmd_verify(1);
1806 if (!strcmp(argv[i], "failreq") && (i + 1 < argc)) {
1807 struct xseg_request *req;
1808 sscanf(argv[i+1], "%lx", &req);
1809 ret = cmd_request(req, FAIL);
1814 if (!strcmp(argv[i], "inspect-freeq") && (i + 1 < argc)) {
1815 ret = cmd_inspectq(atol(argv[i+1]), FREE_QUEUE);
1820 if (!strcmp(argv[i], "inspect-requestq") && (i + 1 < argc)) {
1821 ret = cmd_inspectq(atol(argv[i+1]), REQUEST_QUEUE);
1826 if (!strcmp(argv[i], "inspect-replyq") && (i + 1 < argc)) {
1827 ret = cmd_inspectq(atol(argv[i+1]), REPLY_QUEUE);
1832 if (!strcmp(argv[i], "report")) {
1833 ret = cmd_report(dstport);
1837 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1838 ret = cmd_alloc_requests(atol(argv[i+1]));
1843 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1844 ret = cmd_free_requests(atol(argv[i+1]));
1849 if (!strcmp(argv[i], "put_requests")) {
1850 ret = cmd_put_requests();
1854 if (!strcmp(argv[i], "put_replies")) {
1855 ret = cmd_put_replies();
1859 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1860 ret = cmd_finish(atol(argv[i+1]), 0);
1865 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1866 ret = cmd_finish(atol(argv[i+1]), 1);
1871 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1872 ret = cmd_wait(atol(argv[i+1]));
1877 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1878 long nr_loops = atol(argv[i+1]);
1879 unsigned int seed = atoi(argv[i+2]);
1880 unsigned int targetlen = atoi(argv[i+3]);
1881 unsigned int chunksize = atoi(argv[i+4]);
1882 unsigned long objectsize = atol(argv[i+5]);
1883 ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1888 if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1889 long nr_loops = atol(argv[i+1]);
1890 unsigned int seed = atoi(argv[i+2]);
1891 unsigned int targetlen = atoi(argv[i+3]);
1892 ret = cmd_rnddelete(nr_loops, seed, targetlen);
1897 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1898 long nr_loops = atol(argv[i+1]);
1899 unsigned int seed = atoi(argv[i+2]);
1900 unsigned int targetlen = atoi(argv[i+3]);
1901 unsigned int chunksize = atoi(argv[i+4]);
1902 unsigned long objectsize = atol(argv[i+5]);
1903 ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1908 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1909 long nr_loops = atol(argv[i+1]);
1910 long concurrent_reqs = atol(argv[i+2]);
1911 int op = atoi(argv[i+3]);
1912 ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1917 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1918 char *target = argv[i+1];
1919 uint64_t offset = atol(argv[i+2]);
1920 uint64_t size = atol(argv[i+3]);
1921 ret = cmd_read(target, offset, size);
1926 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1927 char *target = argv[i+1];
1928 uint64_t offset = atol(argv[i+2]);
1929 ret = cmd_write(target, offset);
1934 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1935 char *target = argv[i+1];
1936 uint64_t offset = atol(argv[i+2]);
1937 ret = cmd_truncate(target, offset);
1942 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1943 char *target = argv[i+1];
1944 ret = cmd_delete(target);
1949 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1950 char *target = argv[i+1];
1951 ret = cmd_acquire(target);
1956 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1957 char *target = argv[i+1];
1958 ret = cmd_release(target);
1963 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1964 char *src = argv[i+1];
1965 char *dst = argv[i+2];
1966 ret = cmd_copy(src, dst);
1971 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1972 char *src = argv[i+1];
1973 char *dst = argv[i+2];
1974 ret = cmd_clone(src, dst);
1979 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1980 char *target = argv[i+1];
1981 ret = cmd_info(target);
1987 if (!parse_ports(argv[i]))
1988 fprintf(stderr, "invalid argument: %s\n", argv[i]);