12 #include <xtypes/xhash.h>
13 #include <xtypes/xobj.h>
14 #include <xseg/xseg.h>
15 #include <xseg/protocol.h>
18 printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
20 " <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
27 " bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
30 " alloc_requests (to source) <nr>\n"
31 " free_requests (from source) <nr>\n"
32 " put_requests (all from dest)\n"
33 " put_replies (all from dest)\n"
34 " wait <nr_replies>\n"
35 " complete <nr_requests>\n"
36 " fail <nr_requests>\n"
37 " rndwrite <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
38 " rndread <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
39 " submit_reqs <nr_loops> <concurrent_reqs>\n"
41 " read <target> <offset> <size>\n"
42 " write <target> <offset> < data\n"
43 " truncate <target> <size>\n"
48 " clone <src> <dst>\n"
62 struct xseg_config cfg;
64 uint32_t srcport, dstport;
66 #define mkname mkname_heavy
67 /* heavy distributes duplicates much more widely than light
68 * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
72 static void init_local_signal()
74 if (xseg && sport != srcport){
75 xseg_init_local_signal(xseg, srcport);
80 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
84 for (i = 0; i < namelen; i += 1) {
85 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
86 c = '0' + ((c + (c >> 4)) & 0xf);
90 seed *= ((seed % 137911) | 1) * 137911;
94 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
98 for (i = 0; i < namelen; i += 1) {
100 name[i] = 'A' + (c & 0xf);
105 uint64_t pick(uint64_t size)
107 return (uint64_t)((double)(RAND_MAX) / random());
110 void mkchunk( char *chunk, uint32_t datalen,
111 char *target, uint32_t targetlen, uint64_t offset)
113 long i, r, bufsize = targetlen + 16;
115 r = datalen % bufsize;
116 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
118 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
119 memcpy(chunk + i, buf, bufsize);
121 memcpy(chunk + datalen - r, buf, r);
124 int chkchunk( char *chunk, uint32_t datalen,
125 char *target, uint32_t targetlen, uint64_t offset)
128 int bufsize = targetlen + 16;
130 r = datalen % targetlen;
131 snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
133 for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
134 if (memcmp(chunk + i, buf, bufsize)) {
135 /*printf("mismatch: '%*s'* vs '%*s'\n",
136 bufsize, buf, datalen, chunk);
141 if (memcmp(chunk + datalen - r, buf, r))
148 #define ALLOC_MIN 4096
149 #define ALLOC_MAX 1048576
151 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
153 static uint64_t alloc_size;
159 if (alloc_size < ALLOC_MIN)
160 alloc_size = ALLOC_MIN;
162 if (alloc_size > ALLOC_MAX)
163 alloc_size = ALLOC_MAX;
165 p = realloc(buf, alloc_size);
176 r = fread(buf + size, 1, alloc_size - size, fp);
180 if (size >= alloc_size) {
181 p = realloc(buf, alloc_size * 2);
199 void report_request(struct xseg_request *req)
201 char target[64], data[64];
202 char *req_target, *req_data;
203 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
204 req_target = xseg_get_target(xseg, req);
205 req_data = xseg_get_data(xseg, req);
207 strncpy(target, req_target, end);
209 strncpy(data, req_data, 63);
212 "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
213 "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
214 "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
215 (unsigned long) req, req->targetlen, (unsigned long long)req->target,
216 xseg_get_target(xseg, req),
217 (unsigned long long) req->datalen, (unsigned long long) req->data,
218 xseg_get_data(xseg, req),
219 (unsigned long long) req->offset, (unsigned long long) req->size,
220 (unsigned long long) req->serviced, req->op, req->state, req->flags,
221 (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
222 (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
227 int cmd_info(char *target)
229 uint32_t targetlen = strlen(target);
230 size_t size = sizeof(uint64_t);
233 struct xseg_request *req;
236 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
238 fprintf(stderr, "No request!\n");
242 r = xseg_prep_request(xseg, req, targetlen, size);
244 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
245 (unsigned long) targetlen, (unsigned long) size);
246 xseg_put_request(xseg, req, srcport);
250 req_target = xseg_get_target(xseg, req);
251 strncpy(req_target, target, targetlen);
256 p = xseg_submit(xseg, req, srcport, X_ALLOC);
260 xseg_signal(xseg, p);
265 int cmd_read(char *target, uint64_t offset, uint64_t size)
267 uint32_t targetlen = strlen(target);
271 struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
273 fprintf(stderr, "No request\n");
277 r = xseg_prep_request(xseg, req, targetlen, size);
279 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
280 (unsigned long)targetlen, (unsigned long long)size);
281 xseg_put_request(xseg, req, srcport);
285 req_target = xseg_get_target(xseg, req);
286 strncpy(req_target, target, targetlen);
287 req->offset = offset;
291 p = xseg_submit(xseg, req, srcport, X_ALLOC);
295 xseg_signal(xseg, p);
299 int cmd_write(char *target, uint64_t offset)
305 char *req_target, *req_data;
306 uint32_t targetlen = strlen(target);
307 struct xseg_request *req;
309 inputbuf(stdin, &buf, &size);
311 fprintf(stderr, "No input\n");
315 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
317 fprintf(stderr, "No request\n");
321 r = xseg_prep_request(xseg, req, targetlen, size);
323 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
324 (unsigned long)targetlen, (unsigned long long)size);
325 xseg_put_request(xseg, req, srcport);
329 req_target = xseg_get_target(xseg, req);
330 strncpy(req_target, target, targetlen);
332 req_data = xseg_get_data(xseg, req);
333 memcpy(req_data, buf, size);
334 req->offset = offset;
338 p = xseg_submit(xseg, req, srcport, X_ALLOC);
340 fprintf(stderr, "Cannot submit\n");
343 xseg_signal(xseg, p);
348 int cmd_truncate(char *target, uint64_t offset)
353 int cmd_delete(char *target)
355 uint32_t targetlen = strlen(target);
357 struct xseg_request *req;
359 xseg_bind_port(xseg, srcport, NULL);
361 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
363 fprintf(stderr, "No request!\n");
367 r = xseg_prep_request(xseg, req, targetlen, 0);
369 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
370 (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
371 xseg_put_request(xseg, req, srcport);
375 char *reqtarget = xseg_get_target(xseg, req);
376 strncpy(reqtarget, target, targetlen);
379 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
381 fprintf(stderr, "Couldn't submit request\n");
382 xseg_put_request(xseg, req, srcport);
386 xseg_signal(xseg, p);
391 int cmd_acquire(char *target)
396 int cmd_release(char *target)
401 int cmd_copy(char *src, char *dst)
406 int cmd_clone(char *src, char *dst)
409 uint32_t targetlen = strlen(dst);
410 uint32_t parentlen = strlen(src);
411 struct xseg_request *req;
412 struct xseg_request_clone *xclone;
413 xseg_bind_port(xseg, srcport, NULL);
414 req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
416 fprintf(stderr, "No request\n");
420 int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
422 fprintf(stderr, "Cannot prepare request!\n");
423 xseg_put_request(xseg, req, srcport);
427 char *target = xseg_get_target(xseg, req);
428 char *data = xseg_get_data(xseg, req);
430 strncpy(target, dst, targetlen);
431 xclone = (struct xseg_request_clone *) data;
432 strncpy(xclone->target, src, parentlen);
433 xclone->targetlen = parentlen;
436 req->size = sizeof(struct xseg_request_clone);
439 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
441 fprintf(stderr, "Cannot submit request\n");
444 xseg_signal(xseg, p);
449 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
450 struct xseg_request *req)
453 char target[64], data[64];
454 char *req_target, *req_data;
455 /* null terminate name in case of req->target is less than 63 characters,
456 * and next character after name (aka first byte of next buffer) is not
459 unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
461 req_target = xseg_get_target(xseg, req);
462 req_data = xseg_get_data(xseg, req);
464 logfp = fdopen(logfd, "a");
470 strncpy(target, req_target, end);
472 strncpy(data, req_data, 63);
476 "src port: %u, dst port: %u, op:%u offset: %llu size: %lu, reqstate: %u\n"
477 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
478 (unsigned int)portno1,
479 (unsigned int)portno2,
480 (unsigned int)req->op,
481 (unsigned long long)req->offset,
482 (unsigned long)req->size,
483 (unsigned int)req->state,
484 (unsigned int)req->targetlen, target,
485 (unsigned long long)req->datalen, data);
489 "src port: %u, dst port: %u, op: %u\n",
490 (unsigned int)portno1,
491 (unsigned int)portno2,
492 (unsigned int)req->op);
495 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
496 (unsigned int)portno1,
497 (unsigned int)portno2,
498 (unsigned long long)++reqs);
506 #define LOG_RECEIVE 1
508 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
510 struct xseg_request *req;
512 if (!strcmp(logfile, "-"))
515 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
522 if (!strcmp(how, "full"))
524 else if (!strcmp(how, "summary"))
530 int reloop = 0, active;
531 xseg_prepare_wait(xseg, portno1);
532 xseg_prepare_wait(xseg, portno2);
539 req = xseg_accept(xseg, portno1, 0);
541 xseg_submit(xseg, req, portno2, X_ALLOC);
542 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
546 req = xseg_accept(xseg, portno2, 0);
548 xseg_submit(xseg, req, portno1, X_ALLOC);
549 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
553 req = xseg_receive(xseg, portno1, 0);
555 xseg_respond(xseg, req, portno2, X_ALLOC);
556 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
560 req = xseg_receive(xseg, portno2, 0);
562 xseg_respond(xseg, req, portno1, X_ALLOC);
563 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
570 /* wait on multiple queues? */
571 xseg_wait_signal(xseg, 100000);
574 xseg_cancel_wait(xseg, portno1);
575 xseg_cancel_wait(xseg, portno2);
586 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
591 if (targetlen >= chunksize) {
592 fprintf(stderr, "targetlen >= chunksize\n");
596 char *p = realloc(namebuf, targetlen+1);
598 fprintf(stderr, "Cannot allocate memory\n");
603 p = realloc(chunk, chunksize);
605 fprintf(stderr, "Cannot allocate memory\n");
609 memset(chunk, 0, chunksize);
613 struct xseg_request *submitted = NULL, *received;
614 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
618 char *req_data, *req_target;
623 xseg_prepare_wait(xseg, srcport);
624 if (nr_submitted < loops &&
625 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
626 xseg_cancel_wait(xseg, srcport);
627 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
629 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
630 targetlen, chunksize);
631 xseg_put_request(xseg, submitted, srcport);
635 req_target = xseg_get_target(xseg, submitted);
636 req_data = xseg_get_data(xseg, submitted);
639 mkname(namebuf, targetlen, seed);
640 namebuf[targetlen] = 0;
641 //printf("%ld: %s\n", nr_submitted, namebuf);
642 strncpy(req_target, namebuf, targetlen);
643 offset = 0;// pick(size);
644 mkchunk(req_data, chunksize, namebuf, targetlen, offset);
646 submitted->offset = offset;
647 submitted->size = chunksize;
648 submitted->op = X_WRITE;
649 submitted->flags |= XF_NOSYNC;
651 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
652 if (port == NoPort) {
653 xseg_put_request(xseg, submitted, srcport);
657 xseg_signal(xseg, port);
661 received = xseg_receive(xseg, srcport, 0);
663 xseg_cancel_wait(xseg, srcport);
665 if (!(received->state & XS_SERVED)) {
667 report_request(received);
669 if (xseg_put_request(xseg, received, srcport))
670 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
673 if (!submitted && !received)
674 xseg_wait_signal(xseg, 1000000);
676 if (nr_submitted % 1000 == 0 && !reported) {
678 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
679 nr_submitted, nr_received, nr_failed);
682 if (nr_received >= loops)
686 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
687 nr_submitted, nr_received, nr_failed);
691 int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
695 char *p = realloc(namebuf, targetlen+1);
697 fprintf(stderr, "Cannot allocate memory\n");
704 struct xseg_request *submitted = NULL, *received;
705 long nr_submitted = 0, nr_received = 0, nr_failed = 0;
713 xseg_prepare_wait(xseg, srcport);
714 if (nr_submitted < loops &&
715 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
716 xseg_cancel_wait(xseg, srcport);
717 r = xseg_prep_request(xseg, submitted, targetlen, 0);
719 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
721 xseg_put_request(xseg, submitted, srcport);
725 req_target = xseg_get_target(xseg, submitted);
728 mkname(namebuf, targetlen, seed);
729 namebuf[targetlen] = 0;
730 //printf("%ld: %s\n", nr_submitted, namebuf);
731 strncpy(req_target, namebuf, targetlen);
732 submitted->offset = 0;
734 submitted->op = X_DELETE;
735 submitted->flags = 0;
737 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
738 if (port == NoPort) {
739 xseg_put_request(xseg, submitted, srcport);
743 xseg_signal(xseg, port);
747 received = xseg_receive(xseg, srcport, 0);
749 xseg_cancel_wait(xseg, srcport);
751 if (!(received->state & XS_SERVED)) {
753 report_request(received);
755 if (xseg_put_request(xseg, received, srcport))
756 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
759 if (!submitted && !received)
760 xseg_wait_signal(xseg, 1000000);
762 if (nr_submitted % 1000 == 0 && !reported) {
764 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
765 nr_submitted, nr_received, nr_failed);
768 if (nr_received >= loops)
772 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
773 nr_submitted, nr_received, nr_failed);
777 * prepare/wait rhythm,
778 * files are converted to independent chunk access patterns,
781 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
786 if (targetlen >= chunksize) {
787 fprintf(stderr, "targetlen >= chunksize\n");
791 char *p = realloc(namebuf, targetlen+1);
793 fprintf(stderr, "Cannot allocate memory\n");
798 p = realloc(chunk, chunksize);
800 fprintf(stderr, "Cannot allocate memory\n");
804 memset(chunk, 0, chunksize);
808 struct xseg_request *submitted = NULL, *received;
809 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
813 char *req_data, *req_target;
819 xseg_prepare_wait(xseg, srcport);
820 if (nr_submitted < loops &&
821 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
822 xseg_cancel_wait(xseg, srcport);
823 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
825 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
826 targetlen, chunksize);
827 xseg_put_request(xseg, submitted, srcport);
831 req_target = xseg_get_target(xseg, submitted);
833 mkname(namebuf, targetlen, seed);
834 namebuf[targetlen] = 0;
835 //printf("%ld: %s\n", nr_submitted, namebuf);
836 offset = 0;//pick(size);
838 strncpy(req_target, namebuf, targetlen);
839 submitted->offset = offset;
840 submitted->size = chunksize;
841 submitted->op = X_READ;
842 port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
843 if (port == NoPort) {
844 xseg_put_request(xseg, submitted, srcport);
848 xseg_signal(xseg, port);
852 received = xseg_receive(xseg, srcport, 0);
854 xseg_cancel_wait(xseg, srcport);
856 req_target = xseg_get_target(xseg, received);
857 req_data = xseg_get_data(xseg, received);
858 if (!(received->state & XS_SERVED)) {
860 report_request(received);
861 } else if (!chkchunk(req_data, received->datalen,
862 req_target, received->targetlen, received->offset)) {
863 // report_request(received);
867 if (xseg_put_request(xseg, received, srcport))
868 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
871 if (!submitted && !received)
872 xseg_wait_signal(xseg, 1000000);
874 if (nr_submitted % 1000 == 0 && !reported) {
876 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
877 nr_submitted, nr_received, nr_failed, nr_mismatch);
880 if (nr_received >= loops)
884 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
885 nr_submitted, nr_received, nr_failed, nr_mismatch);
889 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
894 struct xseg_request *submitted = NULL, *received;
895 long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
898 uint32_t targetlen = 10, chunksize = 4096;
899 struct timeval tv1, tv2;
901 char *req_data, *req_target;
903 xseg_bind_port(xseg, srcport, NULL);
905 gettimeofday(&tv1, NULL);
908 xseg_prepare_wait(xseg, srcport);
909 if (nr_submitted < loops && nr_flying < concurrent_reqs &&
910 (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
911 xseg_cancel_wait(xseg, srcport);
912 r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
914 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
915 targetlen, chunksize);
916 xseg_put_request(xseg, submitted, srcport);
924 offset = 0;//pick(size);
926 submitted->offset = offset;
927 submitted->size = chunksize;
928 req_target = xseg_get_target(xseg, submitted);
929 req_data = xseg_get_data(xseg, submitted);
932 submitted->op = X_INFO;
934 submitted->op = X_READ;
936 submitted->op = X_WRITE;
937 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
940 p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
942 if (xseg_signal(xseg, p) < 0)
943 perror("Cannot signal peer");
946 received = xseg_receive(xseg, srcport, 0);
948 xseg_cancel_wait(xseg, srcport);
950 if (nr_received == 0)
951 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
952 (unsigned long long)received->elapsed);
954 if (!(received->state & XS_SERVED)) {
956 //report_request(received);
959 if (xseg_put_request(xseg, received, srcport))
960 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
963 if (!submitted && !received)
964 xseg_wait_signal(xseg, 10000000L);
966 if (nr_received >= loops)
969 gettimeofday(&tv2, NULL);
971 fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
972 nr_submitted, nr_received, nr_failed, nr_mismatch);
973 long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
974 fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
979 int cmd_report(uint32_t portno)
981 struct xseg_port *port = xseg_get_port(xseg, portno);
983 printf("port %u is not assigned\n", portno);
986 struct xq *fq, *rq, *pq;
987 fq = xseg_get_queue(xseg, port, free_queue);
988 rq = xseg_get_queue(xseg, port, request_queue);
989 pq = xseg_get_queue(xseg, port, reply_queue);
990 fprintf(stderr, "port %u:\n"
991 " requests: %llu/%llu src gw: %u dst gw: %u\n"
992 " free_queue [%p] count : %llu\n"
993 " request_queue [%p] count : %llu\n"
994 " reply_queue [%p] count : %llu\n",
995 portno, (unsigned long long)port->alloc_reqs,
996 (unsigned long long)port->max_alloc_reqs,
997 xseg->src_gw[portno],
998 xseg->dst_gw[portno],
999 (void *)fq, (unsigned long long)xq_count(fq),
1000 (void *)rq, (unsigned long long)xq_count(rq),
1001 (void *)pq, (unsigned long long)xq_count(pq));
1010 xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
1012 fprintf(stderr, "cannot join segment!\n");
1017 static void print_hanlder(char *name, struct xobject_h *obj_h)
1019 fprintf(stderr, "%20s: free: %4llu, allocated: %4llu, allocated space: %7llu (object size: %llu)\n",
1021 (unsigned long long) obj_h->nr_free,
1022 (unsigned long long) obj_h->nr_allocated,
1023 (unsigned long long) obj_h->allocated_space,
1024 (unsigned long long) obj_h->obj_size);
1027 int cmd_reportall(void)
1035 fprintf(stderr, "Heap usage: %llu / %llu\n",
1036 (unsigned long long)xseg->heap->cur,
1037 (unsigned long long)xseg->config.heap_size);
1038 fprintf(stderr, "Handlers: \n");
1039 print_hanlder("Requests handler", xseg->request_h);
1040 print_hanlder("Ports handler", xseg->port_h);
1041 print_hanlder("Objects handler", xseg->object_handlers);
1042 fprintf(stderr, "\n");
1044 for (t = 0; t < xseg->config.nr_ports; t++)
1051 int finish_req(struct xseg_request *req, enum req_action action)
1053 if (action == COMPLETE){
1054 req->state &= ~XS_FAILED;
1055 req->state |= XS_SERVED;
1057 req->state |= XS_FAILED;
1058 req->state &= ~XS_SERVED;
1060 xport p = xseg_respond(xseg, req, srcport, X_ALLOC);
1061 xseg_signal(xseg, p);
1065 int cmd_requests(xport portno, enum req_action action )
1070 struct xobject_h *obj_h = xseg->request_h;
1071 void *container = XPTR(&obj_h->container);
1072 xhash_t *allocated = XPTR_TAKE(obj_h->allocated, container);
1074 xhash_iter_init(allocated, &it);
1077 xlock_acquire(&obj_h->lock, 1);
1078 while (xhash_iterate(allocated, &it, &key, &val)){
1079 void *mem = XPTR_TAKE(val, container);
1080 struct xseg_request *req = mem, *t;
1081 for (i = 0; i < xheap_get_chunk_size(mem)/obj_h->obj_size; i++) {
1083 if (t->src_portno == portno){
1084 if (action == REPORT)
1086 else if (action == FAIL)
1087 finish_req(t, action);
1088 else if (action == COMPLETE)
1089 finish_req(t, COMPLETE);
1093 xlock_release(&obj_h->lock);
1095 fprintf(stderr, "\n");
1099 int cmd_create(void)
1101 int r = xseg_create(&cfg);
1103 fprintf(stderr, "cannot create segment!\n");
1107 fprintf(stderr, "Segment initialized.\n");
1111 int cmd_destroy(void)
1113 if (!xseg && cmd_join())
1118 fprintf(stderr, "Segment destroyed.\n");
1122 int cmd_alloc_requests(unsigned long nr)
1124 return xseg_alloc_requests(xseg, srcport, nr);
1127 int cmd_free_requests(unsigned long nr)
1129 return xseg_free_requests(xseg, srcport, nr);
1132 int cmd_put_requests(void)
1134 struct xseg_request *req;
1137 req = xseg_accept(xseg, dstport, 0);
1140 if (xseg_put_request(xseg, req, srcport))
1141 fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1147 int cmd_finish(unsigned long nr, int fail)
1149 struct xseg_request *req;
1150 char *buf = malloc(sizeof(char) * 8128);
1151 char *req_target, *req_data;
1152 xseg_bind_port(xseg, srcport, NULL);
1156 xseg_prepare_wait(xseg, srcport);
1157 req = xseg_accept(xseg, srcport, 0);
1159 req_target = xseg_get_target(xseg, req);
1160 req_data = xseg_get_data(xseg, req);
1161 xseg_cancel_wait(xseg, srcport);
1163 req->state &= ~XS_SERVED;
1165 if (req->op == X_READ)
1166 mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1167 else if (req->op == X_WRITE)
1168 memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1169 else if (req->op == X_INFO)
1170 *((uint64_t *) req->data) = 4294967296;
1172 req->state |= XS_SERVED;
1173 req->serviced = req->size;
1176 p = xseg_respond(xseg, req, srcport, X_ALLOC);
1177 xseg_signal(xseg, p);
1181 xseg_wait_signal(xseg, 10000000L);
1189 void handle_reply(struct xseg_request *req)
1191 char *req_data = xseg_get_data(xseg, req);
1192 char *req_target = xseg_get_target(xseg, req);
1193 if (!(req->state & XS_SERVED)) {
1194 report_request(req);
1200 fwrite(req_data, 1, req->datalen, stdout);
1204 fprintf(stdout, "wrote: ");
1205 fwrite(req_data, 1, req->datalen, stdout);
1209 fprintf(stderr, "deleted %s\n", req_target);
1214 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1217 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1225 if (xseg_put_request(xseg, req, srcport))
1226 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1229 int cmd_wait(uint32_t nr)
1231 struct xseg_request *req;
1233 init_local_signal();
1236 req = xseg_receive(xseg, srcport, 0);
1245 ret = xseg_prepare_wait(xseg, srcport);
1249 ret = xseg_wait_signal(xseg, 1000000);
1250 ret = xseg_cancel_wait(xseg, srcport);
1258 int cmd_put_replies(void)
1260 struct xseg_request *req;
1263 req = xseg_receive(xseg, dstport, 0);
1266 fprintf(stderr, "request: %08llx%08llx\n"
1269 0LL, (unsigned long long)req->serial,
1272 report_request(req);
1274 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1276 if (xseg_put_request(xseg, req, srcport))
1277 fprintf(stderr, "Cannot put reply\n");
1283 int cmd_bind(long portno)
1285 struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1287 fprintf(stderr, "failed to bind port %ld\n", portno);
1291 fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1295 int cmd_signal(uint32_t portno)
1297 return xseg_signal(xseg, portno);
1300 int parse_ports(char *str)
1311 if ((s > str) && isdigit(str[0])) {
1312 srcport = atol(str);
1325 if ((s > str) && isdigit(str[0])) {
1326 dstport = atol(str);
1337 int main(int argc, char **argv)
1349 if (xseg_parse_spec(spec, &cfg)) {
1350 fprintf(stderr, "Cannot parse spec\n");
1354 if (xseg_initialize()) {
1355 fprintf(stderr, "cannot initialize!\n");
1359 for (i = 2; i < argc; i++) {
1361 if (!strcmp(argv[i], "create")) {
1366 if (!strcmp(argv[i], "join")) {
1369 fprintf(stderr, "Segment joined.\n");
1373 if (!strcmp(argv[i], "destroy")) {
1374 ret = cmd_destroy();
1381 if (!strcmp(argv[i], "reportall")) {
1382 ret = cmd_reportall();
1386 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1387 ret = cmd_bind(atol(argv[i+1]));
1392 if (!strcmp(argv[i], "showreqs") && (i + 1 < argc)) {
1393 ret = cmd_requests(atol(argv[i+1]), REPORT);
1398 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1399 ret = cmd_signal(atol(argv[i+1]));
1404 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1405 ret = cmd_bridge(atol(argv[i+1]),
1413 if (srcport == -1) {
1414 if (!parse_ports(argv[i]))
1415 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1419 if (dstport == -1) {
1420 if (!parse_ports(argv[i]))
1421 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1425 if (!strcmp(argv[i], "failreqs") && (i + 1 < argc)) {
1426 ret = cmd_requests(atol(argv[i+1]), FAIL);
1432 if (!strcmp(argv[i], "report")) {
1433 ret = cmd_report(dstport);
1437 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1438 ret = cmd_alloc_requests(atol(argv[i+1]));
1443 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1444 ret = cmd_free_requests(atol(argv[i+1]));
1449 if (!strcmp(argv[i], "put_requests")) {
1450 ret = cmd_put_requests();
1454 if (!strcmp(argv[i], "put_replies")) {
1455 ret = cmd_put_replies();
1459 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1460 ret = cmd_finish(atol(argv[i+1]), 0);
1465 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1466 ret = cmd_finish(atol(argv[i+1]), 1);
1471 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1472 ret = cmd_wait(atol(argv[i+1]));
1477 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1478 long nr_loops = atol(argv[i+1]);
1479 unsigned int seed = atoi(argv[i+2]);
1480 unsigned int targetlen = atoi(argv[i+3]);
1481 unsigned int chunksize = atoi(argv[i+4]);
1482 unsigned long objectsize = atol(argv[i+5]);
1483 ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1488 if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1489 long nr_loops = atol(argv[i+1]);
1490 unsigned int seed = atoi(argv[i+2]);
1491 unsigned int targetlen = atoi(argv[i+3]);
1492 ret = cmd_rnddelete(nr_loops, seed, targetlen);
1497 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1498 long nr_loops = atol(argv[i+1]);
1499 unsigned int seed = atoi(argv[i+2]);
1500 unsigned int targetlen = atoi(argv[i+3]);
1501 unsigned int chunksize = atoi(argv[i+4]);
1502 unsigned long objectsize = atol(argv[i+5]);
1503 ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1508 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1509 long nr_loops = atol(argv[i+1]);
1510 long concurrent_reqs = atol(argv[i+2]);
1511 int op = atoi(argv[i+3]);
1512 ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1517 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1518 char *target = argv[i+1];
1519 uint64_t offset = atol(argv[i+2]);
1520 uint64_t size = atol(argv[i+3]);
1521 ret = cmd_read(target, offset, size);
1526 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1527 char *target = argv[i+1];
1528 uint64_t offset = atol(argv[i+2]);
1529 ret = cmd_write(target, offset);
1534 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1535 char *target = argv[i+1];
1536 uint64_t offset = atol(argv[i+2]);
1537 ret = cmd_truncate(target, offset);
1542 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1543 char *target = argv[i+1];
1544 ret = cmd_delete(target);
1549 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1550 char *target = argv[i+1];
1551 ret = cmd_acquire(target);
1556 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1557 char *target = argv[i+1];
1558 ret = cmd_release(target);
1563 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1564 char *src = argv[i+1];
1565 char *dst = argv[i+2];
1566 ret = cmd_copy(src, dst);
1571 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1572 char *src = argv[i+1];
1573 char *dst = argv[i+2];
1574 ret = cmd_clone(src, dst);
1579 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1580 char *target = argv[i+1];
1581 ret = cmd_info(target);
1587 if (!parse_ports(argv[i]))
1588 fprintf(stderr, "invalid argument: %s\n", argv[i]);