12 #include <xseg/xseg.h>
13 #include <xseg/protocol.h>
16 printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
18 " <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
25 " bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
28 " alloc_requests (to source) <nr>\n"
29 " free_requests (from source) <nr>\n"
30 " put_requests (all from dest)\n"
31 " put_replies (all from dest)\n"
32 " wait <nr_replies>\n"
33 " complete <nr_requests>\n"
34 " fail <nr_requests>\n"
35 " rndwrite <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36 " rndread <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
37 " submit_reqs <nr_loops> <concurrent_reqs>\n"
39 " read <target> <offset> <size>\n"
40 " write <target> <offset> < data\n"
41 " truncate <target> <size>\n"
46 " clone <src> <dst>\n"
53 struct xseg_config cfg;
55 uint32_t srcport, dstport;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59 * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
63 static void init_local_signal()
65 if (xseg && sport != srcport){
66 xseg_init_local_signal(xseg, srcport);
71 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
75 for (i = 0; i < namelen; i += 1) {
76 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
77 c = '0' + ((c + (c >> 4)) & 0xf);
81 seed *= ((seed % 137911) | 1) * 137911;
85 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
89 for (i = 0; i < namelen; i += 1) {
91 name[i] = 'A' + (c & 0xf);
96 uint64_t pick(uint64_t size)
98 return (uint64_t)((double)(RAND_MAX) / random());
101 void mkchunk( char *chunk, uint32_t datalen,
102 char *target, uint32_t targetlen, uint64_t offset)
104 long i, r, bufsize = targetlen + 16;
106 r = datalen % bufsize;
107 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
109 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
110 memcpy(chunk + i, buf, bufsize);
112 memcpy(chunk + datalen - r, buf, r);
115 int chkchunk( char *chunk, uint32_t datalen,
116 char *target, uint32_t targetlen, uint64_t offset)
119 int bufsize = targetlen + 16;
121 r = datalen % targetlen;
122 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
124 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125 if (memcmp(chunk + i, buf, bufsize)) {
126 /*printf("mismatch: '%*s'* vs '%*s'\n",
127 bufsize, buf, datalen, chunk);
132 if (memcmp(chunk + datalen - r, buf, r))
139 #define ALLOC_MIN 4096
140 #define ALLOC_MAX 1048576
142 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
144 static uint64_t alloc_size;
150 if (alloc_size < ALLOC_MIN)
151 alloc_size = ALLOC_MIN;
153 if (alloc_size > ALLOC_MAX)
154 alloc_size = ALLOC_MAX;
156 p = realloc(buf, alloc_size);
167 r = fread(buf + size, 1, alloc_size - size, fp);
171 if (size >= alloc_size) {
172 p = realloc(buf, alloc_size * 2);
190 void report_request(struct xseg_request *req)
192 //uint32_t max = req->datalen;
193 //char *data = xseg_get_data(xseg, req);
197 fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
200 int cmd_info(char *target)
202 uint32_t targetlen = strlen(target);
203 size_t size = sizeof(uint64_t);
206 struct xseg_request *req;
209 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
211 fprintf(stderr, "No request!\n");
215 r = xseg_prep_request(xseg, req, targetlen, size);
217 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
218 (unsigned long) targetlen, (unsigned long) size);
219 xseg_put_request(xseg, req, srcport);
223 req_target = xseg_get_target(xseg, req);
224 strncpy(req_target, target, targetlen);
229 p = xseg_submit(xseg, req, srcport, X_ALLOC);
233 xseg_signal(xseg, p);
238 int cmd_read(char *target, uint64_t offset, uint64_t size)
240 uint32_t targetlen = strlen(target);
244 struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
247 fprintf(stderr, "No request\n");
251 r = xseg_prep_request(xseg, req, targetlen, size);
253 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
254 (unsigned long)targetlen, (unsigned long long)size);
255 xseg_put_request(xseg, req, srcport);
259 req_target = xseg_get_target(xseg, req);
260 strncpy(req_target, target, targetlen);
261 req->offset = offset;
265 p = xseg_submit(xseg, req, srcport, X_ALLOC);
269 xseg_signal(xseg, p);
273 int cmd_write(char *target, uint64_t offset)
279 char *req_target, *req_data;
280 uint32_t targetlen = strlen(target);
281 struct xseg_request *req;
283 inputbuf(stdin, &buf, &size);
285 fprintf(stderr, "No input\n");
289 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
291 fprintf(stderr, "No request\n");
295 r = xseg_prep_request(xseg, req, targetlen, size);
297 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
298 (unsigned long)targetlen, (unsigned long long)size);
299 xseg_put_request(xseg, req, srcport);
303 req_target = xseg_get_target(xseg, req);
304 strncpy(req_target, target, targetlen);
306 req_data = xseg_get_data(xseg, req);
307 memcpy(req_data, buf, size);
308 req->offset = offset;
312 p = xseg_submit(xseg, req, srcport, X_ALLOC);
314 fprintf(stderr, "Cannot submit\n");
317 xseg_signal(xseg, p);
322 int cmd_truncate(char *target, uint64_t offset)
327 int cmd_delete(char *target)
329 uint32_t targetlen = strlen(target);
331 struct xseg_request *req;
333 xseg_bind_port(xseg, srcport, NULL);
335 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
337 fprintf(stderr, "No request!\n");
341 r = xseg_prep_request(xseg, req, targetlen, 0);
343 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
344 (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
345 xseg_put_request(xseg, req, srcport);
349 char *reqtarget = xseg_get_target(xseg, req);
350 strncpy(reqtarget, target, targetlen);
353 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
355 fprintf(stderr, "Couldn't submit request\n");
356 xseg_put_request(xseg, req, srcport);
360 xseg_signal(xseg, p);
365 int cmd_acquire(char *target)
370 int cmd_release(char *target)
375 int cmd_copy(char *src, char *dst)
380 int cmd_clone(char *src, char *dst)
383 uint32_t targetlen = strlen(dst);
384 uint32_t parentlen = strlen(src);
385 struct xseg_request *req;
386 struct xseg_request_clone *xclone;
387 xseg_bind_port(xseg, srcport, NULL);
388 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
390 fprintf(stderr, "No request\n");
394 int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
396 fprintf(stderr, "Cannot prepare request!\n");
397 xseg_put_request(xseg, req, srcport);
401 char *target = xseg_get_target(xseg, req);
402 char *data = xseg_get_data(xseg, req);
404 strncpy(target, dst, targetlen);
405 xclone = (struct xseg_request_clone *) data;
406 strncpy(xclone->target, src, parentlen);
407 xclone->targetlen = parentlen;
410 req->size = sizeof(struct xseg_request_clone);
413 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
415 fprintf(stderr, "Cannot submit request\n");
418 xseg_signal(xseg, p);
423 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
424 struct xseg_request *req)
427 char target[64], data[64];
428 char *req_target, *req_data;
429 /* null terminate name in case of req->target is less than 63 characters,
430 * and next character after name (aka first byte of next buffer) is not
433 unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
435 req_target = xseg_get_target(xseg, req);
436 req_data = xseg_get_data(xseg, req);
438 logfp = fdopen(logfd, "a");
444 strncpy(target, req_target, end);
446 strncpy(data, req_data, 63);
450 "src port: %u, dst port: %u, op:%u offset: %llu size: %lu, reqstate: %u\n"
451 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
452 (unsigned int)portno1,
453 (unsigned int)portno2,
454 (unsigned int)req->op,
455 (unsigned long long)req->offset,
456 (unsigned long)req->size,
457 (unsigned int)req->state,
458 (unsigned int)req->targetlen, target,
459 (unsigned long long)req->datalen, data);
463 "src port: %u, dst port: %u, op: %u\n",
464 (unsigned int)portno1,
465 (unsigned int)portno2,
466 (unsigned int)req->op);
469 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
470 (unsigned int)portno1,
471 (unsigned int)portno2,
472 (unsigned long long)++reqs);
480 #define LOG_RECEIVE 1
482 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
484 struct xseg_request *req;
486 if (!strcmp(logfile, "-"))
489 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
496 if (!strcmp(how, "full"))
498 else if (!strcmp(how, "summary"))
504 int reloop = 0, active;
505 xseg_prepare_wait(xseg, portno1);
506 xseg_prepare_wait(xseg, portno2);
513 req = xseg_accept(xseg, portno1, 0);
515 xseg_submit(xseg, req, portno2, X_ALLOC);
516 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
520 req = xseg_accept(xseg, portno2, 0);
522 xseg_submit(xseg, req, portno1, X_ALLOC);
523 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
527 req = xseg_receive(xseg, portno1, 0);
529 xseg_respond(xseg, req, portno2, X_ALLOC);
530 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
534 req = xseg_receive(xseg, portno2, 0);
536 xseg_respond(xseg, req, portno1, X_ALLOC);
537 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
544 /* wait on multiple queues? */
545 xseg_wait_signal(xseg, 100000);
548 xseg_cancel_wait(xseg, portno1);
549 xseg_cancel_wait(xseg, portno2);
560 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
565 if (targetlen >= chunksize) {
566 fprintf(stderr, "targetlen >= chunksize\n");
570 char *p = realloc(namebuf, targetlen+1);
572 fprintf(stderr, "Cannot allocate memory\n");
577 p = realloc(chunk, chunksize);
579 fprintf(stderr, "Cannot allocate memory\n");
583 memset(chunk, 0, chunksize);
587 struct xseg_request *submitted = NULL, *received;
588 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
592 char *req_data, *req_target;
597 xseg_prepare_wait(xseg, srcport);
598 if (nr_submitted < loops &&
599 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
600 xseg_cancel_wait(xseg, srcport);
601 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
603 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
604 targetlen, chunksize);
605 xseg_put_request(xseg, submitted, srcport);
609 req_target = xseg_get_target(xseg, submitted);
610 req_data = xseg_get_data(xseg, submitted);
613 mkname(namebuf, targetlen, seed);
614 namebuf[targetlen] = 0;
615 //printf("%ld: %s\n", nr_submitted, namebuf);
616 strncpy(req_target, namebuf, targetlen);
617 offset = 0;// pick(size);
618 mkchunk(req_data, chunksize, namebuf, targetlen, offset);
620 submitted->offset = offset;
621 submitted->size = chunksize;
622 submitted->op = X_WRITE;
623 submitted->flags |= XF_NOSYNC;
625 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
626 if (port == NoPort) {
627 xseg_put_request(xseg, submitted, srcport);
631 xseg_signal(xseg, port);
635 received = xseg_receive(xseg, srcport, 0);
637 xseg_cancel_wait(xseg, srcport);
639 if (!(received->state & XS_SERVED)) {
641 report_request(received);
643 if (xseg_put_request(xseg, received, srcport))
644 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
647 if (!submitted && !received)
648 xseg_wait_signal(xseg, 1000000);
650 if (nr_submitted % 1000 == 0 && !reported) {
652 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
653 nr_submitted, nr_received, nr_failed);
656 if (nr_received >= loops)
660 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
661 nr_submitted, nr_received, nr_failed);
665 int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
669 char *p = realloc(namebuf, targetlen+1);
671 fprintf(stderr, "Cannot allocate memory\n");
678 struct xseg_request *submitted = NULL, *received;
679 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
687 xseg_prepare_wait(xseg, srcport);
688 if (nr_submitted < loops &&
689 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
690 xseg_cancel_wait(xseg, srcport);
691 r = xseg_prep_request(xseg, submitted, targetlen, 0);
693 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
695 xseg_put_request(xseg, submitted, srcport);
699 req_target = xseg_get_target(xseg, submitted);
702 mkname(namebuf, targetlen, seed);
703 namebuf[targetlen] = 0;
704 //printf("%ld: %s\n", nr_submitted, namebuf);
705 strncpy(req_target, namebuf, targetlen);
706 submitted->offset = 0;
708 submitted->op = X_DELETE;
709 submitted->flags = 0;
711 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
712 if (port == NoPort) {
713 xseg_put_request(xseg, submitted, srcport);
717 xseg_signal(xseg, port);
721 received = xseg_receive(xseg, srcport, 0);
723 xseg_cancel_wait(xseg, srcport);
725 if (!(received->state & XS_SERVED)) {
727 report_request(received);
729 if (xseg_put_request(xseg, received, srcport))
730 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
733 if (!submitted && !received)
734 xseg_wait_signal(xseg, 1000000);
736 if (nr_submitted % 1000 == 0 && !reported) {
738 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
739 nr_submitted, nr_received, nr_failed);
742 if (nr_received >= loops)
746 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
747 nr_submitted, nr_received, nr_failed);
751 * prepare/wait rhythm,
752 * files are converted to independent chunk access patterns,
755 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
760 if (targetlen >= chunksize) {
761 fprintf(stderr, "targetlen >= chunksize\n");
765 char *p = realloc(namebuf, targetlen+1);
767 fprintf(stderr, "Cannot allocate memory\n");
772 p = realloc(chunk, chunksize);
774 fprintf(stderr, "Cannot allocate memory\n");
778 memset(chunk, 0, chunksize);
782 struct xseg_request *submitted = NULL, *received;
783 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
787 char *req_data, *req_target;
793 xseg_prepare_wait(xseg, srcport);
794 if (nr_submitted < loops &&
795 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
796 xseg_cancel_wait(xseg, srcport);
797 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
799 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
800 targetlen, chunksize);
801 xseg_put_request(xseg, submitted, srcport);
805 req_target = xseg_get_target(xseg, submitted);
807 mkname(namebuf, targetlen, seed);
808 namebuf[targetlen] = 0;
809 //printf("%ld: %s\n", nr_submitted, namebuf);
810 offset = 0;//pick(size);
812 strncpy(req_target, namebuf, targetlen);
813 submitted->offset = offset;
814 submitted->size = chunksize;
815 submitted->op = X_READ;
816 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
817 if (port == NoPort) {
818 xseg_put_request(xseg, submitted, srcport);
822 xseg_signal(xseg, port);
826 received = xseg_receive(xseg, srcport, 0);
828 xseg_cancel_wait(xseg, srcport);
830 req_target = xseg_get_target(xseg, received);
831 req_data = xseg_get_data(xseg, received);
832 if (!(received->state & XS_SERVED)) {
834 report_request(received);
835 } else if (!chkchunk(req_data, received->datalen,
836 req_target, received->targetlen, received->offset)) {
837 // report_request(received);
841 if (xseg_put_request(xseg, received, srcport))
842 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
845 if (!submitted && !received)
846 xseg_wait_signal(xseg, 1000000);
848 if (nr_submitted % 1000 == 0 && !reported) {
850 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
851 nr_submitted, nr_received, nr_failed, nr_mismatch);
854 if (nr_received >= loops)
858 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
859 nr_submitted, nr_received, nr_failed, nr_mismatch);
863 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
868 struct xseg_request *submitted = NULL, *received;
869 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
872 uint32_t targetlen = 10, chunksize = 4096;
873 struct timeval tv1, tv2;
875 char *req_data, *req_target;
877 xseg_bind_port(xseg, srcport, NULL);
879 gettimeofday(&tv1, NULL);
882 xseg_prepare_wait(xseg, srcport);
883 if (nr_submitted < loops && nr_flying < concurrent_reqs &&
884 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
885 xseg_cancel_wait(xseg, srcport);
886 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
888 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
889 targetlen, chunksize);
890 xseg_put_request(xseg, submitted, srcport);
898 offset = 0;//pick(size);
900 submitted->offset = offset;
901 submitted->size = chunksize;
902 req_target = xseg_get_target(xseg, submitted);
903 req_data = xseg_get_data(xseg, submitted);
906 submitted->op = X_INFO;
908 submitted->op = X_READ;
910 submitted->op = X_WRITE;
911 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
914 p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
916 if (xseg_signal(xseg, p) < 0)
917 perror("Cannot signal peer");
920 received = xseg_receive(xseg, srcport, 0);
922 xseg_cancel_wait(xseg, srcport);
924 if (nr_received == 0)
925 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
926 (unsigned long long)received->elapsed);
928 if (!(received->state & XS_SERVED)) {
930 //report_request(received);
933 if (xseg_put_request(xseg, received, srcport))
934 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
937 if (!submitted && !received)
938 xseg_wait_signal(xseg, 10000000L);
940 if (nr_received >= loops)
943 gettimeofday(&tv2, NULL);
945 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
946 nr_submitted, nr_received, nr_failed, nr_mismatch);
947 long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
948 fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
953 int cmd_report(uint32_t portno)
955 struct xseg_port *port = xseg_get_port(xseg, portno);
957 printf("port %u is not assigned\n", portno);
960 struct xq *fq, *rq, *pq;
961 fq = xseg_get_queue(xseg, port, free_queue);
962 rq = xseg_get_queue(xseg, port, request_queue);
963 pq = xseg_get_queue(xseg, port, reply_queue);
964 fprintf(stderr, "port %u:\n"
965 " requests: %llu/%llu src gw: %u dst gw: %u\n"
966 " free_queue [%p] count : %u\n"
967 " request_queue [%p] count : %u\n"
968 " reply_queue [%p] count : %u\n",
969 portno, port->alloc_reqs, port->max_alloc_reqs,
970 xseg->src_gw[portno], xseg->dst_gw[portno],
971 (void *)fq, xq_count(fq),
972 (void *)rq, xq_count(rq),
973 (void *)pq, xq_count(pq));
982 xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
984 fprintf(stderr, "cannot join segment!\n");
990 int cmd_reportall(void)
998 fprintf(stderr, "Heap usage: %llu / %llu\n", xseg->heap->cur, xseg->config.heap_size);
999 for (t = 0; t < xseg->config.nr_ports; t++)
1005 int cmd_create(void)
1007 int r = xseg_create(&cfg);
1009 fprintf(stderr, "cannot create segment!\n");
1013 fprintf(stderr, "Segment initialized.\n");
1017 int cmd_destroy(void)
1019 if (!xseg && cmd_join())
1024 fprintf(stderr, "Segment destroyed.\n");
1028 int cmd_alloc_requests(unsigned long nr)
1030 return xseg_alloc_requests(xseg, srcport, nr);
1033 int cmd_free_requests(unsigned long nr)
1035 return xseg_free_requests(xseg, srcport, nr);
1038 int cmd_put_requests(void)
1040 struct xseg_request *req;
1043 req = xseg_accept(xseg, dstport, 0);
1046 if (xseg_put_request(xseg, req, srcport))
1047 fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1053 int cmd_finish(unsigned long nr, int fail)
1055 struct xseg_request *req;
1056 char *buf = malloc(sizeof(char) * 8128);
1057 char *req_target, *req_data;
1058 xseg_bind_port(xseg, srcport, NULL);
1062 xseg_prepare_wait(xseg, srcport);
1063 req = xseg_accept(xseg, srcport, 0);
1065 req_target = xseg_get_target(xseg, req);
1066 req_data = xseg_get_data(xseg, req);
1067 xseg_cancel_wait(xseg, srcport);
1069 req->state &= ~XS_SERVED;
1071 if (req->op == X_READ)
1072 mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1073 else if (req->op == X_WRITE)
1074 memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1075 else if (req->op == X_INFO)
1076 *((uint64_t *) req->data) = 4294967296;
1078 req->state |= XS_SERVED;
1079 req->serviced = req->size;
1082 p = xseg_respond(xseg, req, srcport, X_ALLOC);
1083 xseg_signal(xseg, p);
1087 xseg_wait_signal(xseg, 10000000L);
1095 void handle_reply(struct xseg_request *req)
1097 char *req_data = xseg_get_data(xseg, req);
1098 char *req_target = xseg_get_target(xseg, req);
1099 if (!(req->state & XS_SERVED)) {
1100 report_request(req);
1106 fwrite(req_data, 1, req->datalen, stdout);
1110 fprintf(stdout, "wrote: ");
1111 fwrite(req_data, 1, req->datalen, stdout);
1115 fprintf(stderr, "deleted %s\n", req_target);
1120 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1123 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1131 if (xseg_put_request(xseg, req, srcport))
1132 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1135 int cmd_wait(uint32_t nr)
1137 struct xseg_request *req;
1139 init_local_signal();
1142 req = xseg_receive(xseg, srcport, 0);
1151 ret = xseg_prepare_wait(xseg, srcport);
1155 ret = xseg_wait_signal(xseg, 1000000);
1156 ret = xseg_cancel_wait(xseg, srcport);
1164 int cmd_put_replies(void)
1166 struct xseg_request *req;
1169 req = xseg_receive(xseg, dstport, 0);
1172 fprintf(stderr, "request: %08llx%08llx\n"
1175 0LL, (unsigned long long)req->serial,
1178 report_request(req);
1180 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1182 if (xseg_put_request(xseg, req, srcport))
1183 fprintf(stderr, "Cannot put reply\n");
1189 int cmd_bind(long portno)
1191 struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1193 fprintf(stderr, "failed to bind port %ld\n", portno);
1197 fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1201 int cmd_signal(uint32_t portno)
1203 return xseg_signal(xseg, portno);
1206 int parse_ports(char *str)
1217 if ((s > str) && isdigit(str[0])) {
1218 srcport = atol(str);
1231 if ((s > str) && isdigit(str[0])) {
1232 dstport = atol(str);
1243 int main(int argc, char **argv)
1255 if (xseg_parse_spec(spec, &cfg)) {
1256 fprintf(stderr, "Cannot parse spec\n");
1260 if (xseg_initialize()) {
1261 fprintf(stderr, "cannot initialize!\n");
1265 for (i = 2; i < argc; i++) {
1267 if (!strcmp(argv[i], "create")) {
1272 if (!strcmp(argv[i], "join")) {
1275 fprintf(stderr, "Segment joined.\n");
1279 if (!strcmp(argv[i], "destroy")) {
1280 ret = cmd_destroy();
1287 if (!strcmp(argv[i], "reportall")) {
1288 ret = cmd_reportall();
1292 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1293 ret = cmd_bind(atol(argv[i+1]));
1298 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1299 ret = cmd_signal(atol(argv[i+1]));
1304 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1305 ret = cmd_bridge(atol(argv[i+1]),
1313 if (srcport == -1) {
1314 if (!parse_ports(argv[i]))
1315 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1319 if (dstport == -1) {
1320 if (!parse_ports(argv[i]))
1321 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1325 if (!strcmp(argv[i], "report")) {
1326 ret = cmd_report(dstport);
1330 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1331 ret = cmd_alloc_requests(atol(argv[i+1]));
1336 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1337 ret = cmd_free_requests(atol(argv[i+1]));
1342 if (!strcmp(argv[i], "put_requests")) {
1343 ret = cmd_put_requests();
1347 if (!strcmp(argv[i], "put_replies")) {
1348 ret = cmd_put_replies();
1352 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1353 ret = cmd_finish(atol(argv[i+1]), 0);
1358 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1359 ret = cmd_finish(atol(argv[i+1]), 1);
1364 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1365 ret = cmd_wait(atol(argv[i+1]));
1370 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1371 long nr_loops = atol(argv[i+1]);
1372 unsigned int seed = atoi(argv[i+2]);
1373 unsigned int targetlen = atoi(argv[i+3]);
1374 unsigned int chunksize = atoi(argv[i+4]);
1375 unsigned long objectsize = atol(argv[i+5]);
1376 ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1381 if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1382 long nr_loops = atol(argv[i+1]);
1383 unsigned int seed = atoi(argv[i+2]);
1384 unsigned int targetlen = atoi(argv[i+3]);
1385 ret = cmd_rnddelete(nr_loops, seed, targetlen);
1390 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1391 long nr_loops = atol(argv[i+1]);
1392 unsigned int seed = atoi(argv[i+2]);
1393 unsigned int targetlen = atoi(argv[i+3]);
1394 unsigned int chunksize = atoi(argv[i+4]);
1395 unsigned long objectsize = atol(argv[i+5]);
1396 ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1401 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1402 long nr_loops = atol(argv[i+1]);
1403 long concurrent_reqs = atol(argv[i+2]);
1404 int op = atoi(argv[i+3]);
1405 ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1410 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1411 char *target = argv[i+1];
1412 uint64_t offset = atol(argv[i+2]);
1413 uint64_t size = atol(argv[i+3]);
1414 ret = cmd_read(target, offset, size);
1419 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1420 char *target = argv[i+1];
1421 uint64_t offset = atol(argv[i+2]);
1422 ret = cmd_write(target, offset);
1427 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1428 char *target = argv[i+1];
1429 uint64_t offset = atol(argv[i+2]);
1430 ret = cmd_truncate(target, offset);
1435 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1436 char *target = argv[i+1];
1437 ret = cmd_delete(target);
1442 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1443 char *target = argv[i+1];
1444 ret = cmd_acquire(target);
1449 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1450 char *target = argv[i+1];
1451 ret = cmd_release(target);
1456 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1457 char *src = argv[i+1];
1458 char *dst = argv[i+2];
1459 ret = cmd_copy(src, dst);
1464 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1465 char *src = argv[i+1];
1466 char *dst = argv[i+2];
1467 ret = cmd_clone(src, dst);
1472 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1473 char *target = argv[i+1];
1474 ret = cmd_info(target);
1480 if (!parse_ports(argv[i]))
1481 fprintf(stderr, "invalid argument: %s\n", argv[i]);