fad71c334372db0b07babc7299f834410b0ad51f
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait        <nr_replies>\n"
33                 "    complete    <nr_requests>\n"
34                 "    fail        <nr_requests>\n"
35                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
37                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
38                 "    info        <target>\n"
39                 "    read        <target> <offset> <size>\n"
40                 "    write       <target> <offset> < data\n"
41                 "    truncate    <target> <size>\n"
42                 "    delete      <target>\n"
43                 "    acquire     <target>\n"
44                 "    release     <target>\n"
45                 "    copy        <src>  <dst>\n"
46                 "    clone       <src>  <dst>\n"
47         );
48         return 1;
49 }
50
51 char *namebuf;
52 char *chunk;
53 struct xseg_config cfg;
54 struct xseg *xseg;
55 uint32_t srcport, dstport;
56 uint64_t reqs;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
60  */
61
62 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
63 {
64         int i;
65         char c;
66         for (i = 0; i < namelen; i += 1) {
67                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
68                 c = '0' + ((c + (c >> 4)) & 0xf);
69                 if (c > '9')
70                         c += 'a'-'0'-10;
71                 name[i] = c;
72                 seed *= ((seed % 137911) | 1) * 137911;
73         }
74 }
75
76 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
77 {
78         int i;
79         char c;
80         for (i = 0; i < namelen; i += 1) {
81                 c = seed;
82                 name[i] = 'A' + (c & 0xf);
83                 seed += 1;
84         }
85 }
86
87 uint64_t pick(uint64_t size)
88 {
89         return (uint64_t)((double)(RAND_MAX) / random());
90 }
91
92 void mkchunk(   char *chunk, uint32_t datalen,
93                 char *target, uint32_t targetlen, uint64_t offset)
94 {
95         long i, r, bufsize = targetlen + 16;
96         char buf[bufsize];
97         r = datalen % bufsize;
98         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
99
100         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
101                 memcpy(chunk + i, buf, bufsize);
102
103         memcpy(chunk + datalen - r, buf, r);
104 }
105
106 int chkchunk(   char *chunk, uint32_t datalen,
107                 char *target, uint32_t targetlen, uint64_t offset)
108 {
109         long i, r;
110         int bufsize = targetlen + 16;
111         char buf[bufsize];
112         r = datalen % targetlen;
113         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
114
115         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
116                 if (memcmp(chunk + i, buf, bufsize)) {
117                         /*printf("mismatch: '%*s'* vs '%*s'\n",
118                                 bufsize, buf, datalen, chunk);
119                         */
120                         return 0;
121                 }
122
123         if (memcmp(chunk + datalen - r, buf, r))
124                 return 0;
125
126         return 1;
127 }
128
129
130 #define ALLOC_MIN 4096
131 #define ALLOC_MAX 1048576
132
133 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
134 {
135         static uint64_t alloc_size;
136         static char *buf;
137         uint64_t size = 0;
138         char *p;
139         size_t r;
140
141         if (alloc_size < ALLOC_MIN)
142                 alloc_size = ALLOC_MIN;
143
144         if (alloc_size > ALLOC_MAX)
145                 alloc_size = ALLOC_MAX;
146
147         p = realloc(buf, alloc_size);
148         if (!p) {
149                 if (buf)
150                         free(buf);
151                 buf = NULL;
152                 goto out;
153         }
154
155         buf = p;
156
157         while (!feof(fp)) {
158                 r = fread(buf + size, 1, alloc_size - size, fp);
159                 if (!r)
160                         break;
161                 size += r;
162                 if (size >= alloc_size) {
163                         p = realloc(buf, alloc_size * 2);
164                         if (!p) {
165                                 if (buf)
166                                         free(buf);
167                                 buf = NULL;
168                                 size = 0;
169                                 goto out;
170                         }
171                         buf = p;
172                         alloc_size *= 2;
173                 }
174         }
175
176 out:
177         *retbuf = buf;
178         *retsize = size;
179 }
180
181 void report_request(struct xseg_request *req)
182 {
183         uint32_t max = req->datalen;
184         char *data = xseg_get_data(xseg, req);
185         if (max > 128)
186                 max = 128;
187         data[max-1] = 0;
188         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
189         fprintf(stderr, "data: %s\n", data);
190 }
191
192 int cmd_info(char *target)
193 {
194         uint32_t targetlen = strlen(target);
195         size_t size = sizeof(uint64_t);
196         int r;
197         xserial srl;
198         struct xseg_request *req;
199         char *req_target;
200
201         req = xseg_get_request(xseg, srcport);
202         if (!req) {
203                 fprintf(stderr, "No request!\n");
204                 return -1;
205         }
206
207         r = xseg_prep_request(xseg, req, targetlen, size);
208         if (r < 0) {
209                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
210                         (unsigned long) targetlen, (unsigned long) size);
211                 xseg_put_request(xseg, srcport, req);
212                 return -1;
213         }
214
215         req_target = xseg_get_target(xseg, req);
216         strncpy(req_target, target, targetlen);
217         req->offset = 0;
218         req->size = size;
219         req->op = X_INFO;
220
221         srl = xseg_submit(xseg, dstport, req);
222         if (srl == Noneidx)
223                 return -1;
224
225         xseg_signal(xseg, dstport);
226
227         return 0;
228 }
229
230 int cmd_read(char *target, uint64_t offset, uint64_t size)
231 {
232         uint32_t targetlen = strlen(target);
233         int r;
234         xserial srl;
235         char *req_target;
236         struct xseg_request *req = xseg_get_request(xseg, srcport);
237         if (!req) {
238                 fprintf(stderr, "No request\n");
239                 return -1;
240         }
241
242         r = xseg_prep_request(xseg, req, targetlen, size);
243         if (r < 0) {
244                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
245                         (unsigned long)targetlen, (unsigned long long)size);
246                 xseg_put_request(xseg, srcport, req);
247                 return -1;
248         }
249
250         req_target = xseg_get_target(xseg, req);
251         strncpy(req_target, target, targetlen);
252         req->offset = offset;
253         req->size = size;
254         req->op = X_READ;
255
256         srl = xseg_submit(xseg, dstport, req);
257         if (srl == Noneidx)
258                 return -1;
259
260         xseg_signal(xseg, dstport);
261         return 0;
262 }
263
264 int cmd_write(char *target, uint64_t offset)
265 {
266         char *buf = NULL;
267         int r;
268         xserial srl;
269         uint64_t size = 0;
270         char *req_target, *req_data;
271         uint32_t targetlen = strlen(target);
272         struct xseg_request *req;
273
274         inputbuf(stdin, &buf, &size);
275         if (!size) {
276                 fprintf(stderr, "No input\n");
277                 return -1;
278         }
279
280         req = xseg_get_request(xseg, srcport);
281         if (!req) {
282                 fprintf(stderr, "No request\n");
283                 return -1;
284         }
285
286         r = xseg_prep_request(xseg, req, targetlen, size);
287         if (r < 0) {
288                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
289                         (unsigned long)targetlen, (unsigned long long)size);
290                 xseg_put_request(xseg, srcport, req);
291                 return -1;
292         }
293
294         req_target = xseg_get_target(xseg, req);
295         strncpy(req_target, target, targetlen);
296         
297         req_data = xseg_get_data(xseg, req);
298         memcpy(req_data, buf, size);
299         req->offset = offset;
300         req->size = size;
301         req->op = X_WRITE;
302
303         srl = xseg_submit(xseg, dstport, req);
304         if (srl == Noneidx) {
305                 fprintf(stderr, "Cannot submit\n");
306                 return -1;
307         }
308
309         return 0;
310 }
311
312 int cmd_truncate(char *target, uint64_t offset)
313 {
314         return 0;
315 }
316
317 int cmd_delete(char *target)
318 {
319         return 0;
320 }
321
322 int cmd_acquire(char *target)
323 {
324         return 0;
325 }
326
327 int cmd_release(char *target)
328 {
329         return 0;
330 }
331
332 int cmd_copy(char *src, char *dst)
333 {
334         return 0;
335 }
336
337 int cmd_clone(char *src, char *dst)
338 {
339         return 0;
340 }
341
342 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
343                 struct xseg_request *req)
344 {
345         FILE *logfp;
346         char target[64], data[64];
347         char *req_target, *req_data;
348         /* null terminate name in case of req->target is less than 63 characters,
349          * and next character after name (aka first byte of next buffer) is not
350          * null
351          */
352         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
353         
354         req_target = xseg_get_target(xseg, req);
355         req_data = xseg_get_data(xseg, req);
356
357         logfp = fdopen(logfd, "a");
358         if (!logfp)
359                 return;
360
361         switch(method) {
362         case 0:
363                 strncpy(target, req_target, end);
364                 target[end] = 0;
365                 strncpy(data, req_data, 63);
366                 data[63] = 0;
367
368                 fprintf(logfp,
369                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
370                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
371                         (unsigned int)portno1,
372                         (unsigned int)portno2,
373                         (unsigned int)req->op,
374                         (unsigned long long)req->offset,
375                         (unsigned long)req->size,
376                         (unsigned int)req->state,
377                         (unsigned int)req->targetlen, target,
378                         (unsigned long long)req->datalen, data);
379                 break;
380         case 1:
381                 fprintf(logfp,
382                         "src port: %u, dst port: %u, op: %u\n",
383                         (unsigned int)portno1,
384                         (unsigned int)portno2,
385                         (unsigned int)req->op);
386                 break;
387         case 2:
388                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
389                         (unsigned int)portno1,
390                         (unsigned int)portno2,
391                         (unsigned long long)++reqs);
392         }
393
394         fclose(logfp);
395         return;
396 }
397
398 #define LOG_ACCEPT  0
399 #define LOG_RECEIVE 1
400
401 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
402 {
403         struct xseg_request *req;
404         int logfd, method;
405         if (!strcmp(logfile, "-"))
406                 logfd = 1;
407         else {
408                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
409                 if (logfd < 0) {
410                         perror(logfile);
411                         return -1;
412                 }
413         }
414
415         if (!strcmp(how, "full"))
416                 method = 0;
417         else if (!strcmp(how, "summary"))
418                 method = 1;
419         else
420                 method = 2;
421
422         for (;;) {
423                 int reloop = 0, active;
424                 xseg_prepare_wait(xseg, portno1);
425                 xseg_prepare_wait(xseg, portno2);
426                 req = NULL;
427
428                 for (;;) {
429                         active = 0;
430
431                         req = xseg_accept(xseg, portno1);
432                         if (req) {
433                                 xseg_submit(xseg, portno2, req);
434                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
435                                 active += 1;
436                         }
437
438                         req = xseg_accept(xseg, portno2);
439                         if (req) {
440                                 xseg_submit(xseg, portno1, req);
441                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
442                                 active += 1;
443                         }
444
445                         req = xseg_receive(xseg, portno1);
446                         if (req) {
447                                 xseg_respond(xseg, portno2, req);
448                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
449                                 active += 1;
450                         }
451
452                         req = xseg_receive(xseg, portno2);
453                         if (req) {
454                                 xseg_respond(xseg, portno1, req);
455                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
456                                 active += 1;
457                         }
458
459                         if (active == 0) {
460                                 if (reloop)
461                                         break;
462                                 /* wait on multiple queues? */
463                                 xseg_wait_signal(xseg, 100000);
464                                 break;
465                         } else {
466                                 xseg_cancel_wait(xseg, portno1);        
467                                 xseg_cancel_wait(xseg, portno2);        
468                                 reloop = 1;
469                         }
470                 }
471         }
472
473         close(logfd);
474
475         return 0;
476 }
477
478 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
479 {
480         if (loops < 0)
481                 return help();
482
483         if (targetlen >= chunksize) {
484                 fprintf(stderr, "targetlen >= chunksize\n");
485                 return -1;
486         }
487
488         char *p = realloc(namebuf, targetlen+1);
489         if (!p) {
490                 fprintf(stderr, "Cannot allocate memory\n");
491                 return -1;
492         }
493         namebuf = p;
494
495         p = realloc(chunk, chunksize);
496         if (!p) {
497                 fprintf(stderr, "Cannot allocate memory\n");
498                 return -1;
499         }
500         chunk = p;
501         memset(chunk, 0, chunksize);
502
503         srandom(seed);
504
505         struct xseg_request *submitted = NULL, *received;
506         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
507         int reported = 0, r;
508         uint64_t offset;
509         xserial srl;
510         char *req_data, *req_target;
511
512         for (;;) {
513                 xseg_prepare_wait(xseg, srcport);
514                 if (nr_submitted < loops &&
515                     (submitted = xseg_get_request(xseg, srcport))) {
516                         xseg_cancel_wait(xseg, srcport);
517                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
518                         if (r < 0) {
519                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
520                                         targetlen, chunksize);
521                                 xseg_put_request(xseg, submitted->portno, submitted);
522                                 return -1;
523                         }
524                         
525                         req_target = xseg_get_target(xseg, submitted);
526                         req_data = xseg_get_data(xseg, submitted);
527
528                         nr_submitted += 1;
529                         reported = 0;
530                         seed = random();
531                         mkname(namebuf, targetlen, seed);
532                         namebuf[targetlen] = 0;
533                         //printf("%ld: %s\n", nr_submitted, namebuf);
534                         strncpy(req_target, namebuf, targetlen);
535                         offset = 0;// pick(size);
536                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
537
538                         submitted->offset = offset;
539                         submitted->size = chunksize;
540                         submitted->op = X_WRITE;
541                         submitted->flags |= XF_NOSYNC;
542
543                         srl = xseg_submit(xseg, dstport, submitted);
544                         (void)srl;
545                         xseg_signal(xseg, dstport);
546                 }
547
548                 received = xseg_receive(xseg, srcport);
549                 if (received) {
550                         xseg_cancel_wait(xseg, srcport);
551                         nr_received += 1;
552                         if (!(received->state & XS_SERVED)) {
553                                 nr_failed += 1;
554                                 report_request(received);
555                         }
556                         if (xseg_put_request(xseg, received->portno, received))
557                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
558                 }
559
560                 if (!submitted && !received)
561                         xseg_wait_signal(xseg, 1000000);
562
563                         if (nr_submitted % 1000 == 0 && !reported) {
564                                 reported = 1;
565                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
566                                         nr_submitted, nr_received, nr_failed);
567                         }
568
569                         if (nr_received >= loops)
570                                 break;
571         }
572
573         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
574                 nr_submitted, nr_received, nr_failed);
575         return 0;
576 }
577
578 /* note:
579  * prepare/wait rhythm,
580  * files are converted to independent chunk access patterns,
581 */
582
583 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
584 {
585         if (loops < 0)
586                 return help();
587
588         if (targetlen >= chunksize) {
589                 fprintf(stderr, "targetlen >= chunksize\n");
590                 return -1;
591         }
592
593         char *p = realloc(namebuf, targetlen+1);
594         if (!p) {
595                 fprintf(stderr, "Cannot allocate memory\n");
596                 return -1;
597         }
598         namebuf = p;
599
600         p = realloc(chunk, chunksize);
601         if (!p) {
602                 fprintf(stderr, "Cannot allocate memory\n");
603                 return -1;
604         }
605         chunk = p;
606         memset(chunk, 0, chunksize);
607
608         srandom(seed);
609
610         struct xseg_request *submitted = NULL, *received;
611         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
612         int reported = 0, r;
613         uint64_t offset;
614         xserial srl;
615         char *req_data, *req_target;
616
617         for (;;) {
618                 submitted = NULL;
619                 xseg_prepare_wait(xseg, srcport);
620                 if (nr_submitted < loops &&
621                     (submitted = xseg_get_request(xseg, srcport))) {
622                         xseg_cancel_wait(xseg, srcport);
623                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
624                         if (r < 0) {
625                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
626                                         targetlen, chunksize);
627                                 xseg_put_request(xseg, submitted->portno, submitted);
628                                 return -1;
629                         }
630
631                         req_target = xseg_get_target(xseg, submitted);
632                         nr_submitted += 1;
633                         reported = 0;
634                         seed = random();
635                         mkname(namebuf, targetlen, seed);
636                         namebuf[targetlen] = 0;
637                         //printf("%ld: %s\n", nr_submitted, namebuf);
638                         offset = 0;//pick(size);
639
640                         strncpy(req_target, namebuf, targetlen);
641                         submitted->offset = offset;
642                         submitted->size = chunksize;
643                         submitted->op = X_READ;
644
645                         srl = xseg_submit(xseg, dstport, submitted);
646                         (void)srl;
647                         xseg_signal(xseg, dstport);
648                 }
649
650                 received = xseg_receive(xseg, srcport);
651                 if (received) {
652                         xseg_cancel_wait(xseg, srcport);
653                         nr_received += 1;
654                         req_target = xseg_get_target(xseg, received);
655                         req_data = xseg_get_data(xseg, received);
656                         if (!(received->state & XS_SERVED)) {
657                                 nr_failed += 1;
658                                 report_request(received);
659                         } else if (!chkchunk(req_data, received->datalen,
660                                         req_target, received->targetlen, received->offset)) {
661                                 nr_mismatch += 1;
662                         }
663
664                         if (xseg_put_request(xseg, received->portno, received))
665                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
666                 }
667
668                 if (!submitted && !received)
669                         xseg_wait_signal(xseg, 1000000);
670
671                 if (nr_submitted % 1000 == 0 && !reported) {
672                         reported = 1;
673                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
674                         nr_submitted, nr_received, nr_failed, nr_mismatch);
675                 }
676
677                 if (nr_received >= loops)
678                         break;
679         }
680
681         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
682                 nr_submitted, nr_received, nr_failed, nr_mismatch);
683         return 0;
684 }
685
686 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
687 {
688         if (loops < 0)
689                 return help();
690
691         struct xseg_request *submitted = NULL, *received;
692         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
693         int reported = 0, r;
694         uint64_t offset;
695         uint32_t targetlen = 10, chunksize = 4096;
696         struct timeval tv1, tv2;
697         xserial srl;
698         char *req_data, *req_target;
699
700         xseg_bind_port(xseg, srcport);
701
702         gettimeofday(&tv1, NULL);
703         for (;;) {
704                 submitted = NULL;
705                 xseg_prepare_wait(xseg, srcport);
706                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
707                     (submitted = xseg_get_request(xseg, srcport))) {
708                         xseg_cancel_wait(xseg, srcport);
709                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
710                         if (r < 0) {
711                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
712                                         targetlen, chunksize);
713                                 xseg_put_request(xseg, submitted->portno, submitted);
714                                 return -1;
715                         }
716
717                         ++nr_flying;
718                         nr_submitted += 1;
719                         reported = 0;
720                         offset = 0;//pick(size);
721
722                         submitted->offset = offset;
723                         submitted->size = chunksize;
724                         req_target = xseg_get_target(xseg, submitted);
725                         req_data = xseg_get_data(xseg, submitted);
726
727                         if (op == 0)
728                                 submitted->op = X_INFO;
729                         else if (op == 1)
730                                 submitted->op = X_READ;
731                         else if (op == 2) {
732                                 submitted->op = X_WRITE;
733                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
734                         }
735
736                         srl = xseg_submit(xseg, dstport, submitted);
737                         (void)srl;
738                         if (xseg_signal(xseg, dstport) < 0)
739                                 perror("Cannot signal peer");
740                 }
741                 received = xseg_receive(xseg, srcport);
742                 if (received) {
743                         xseg_cancel_wait(xseg, srcport);
744                         --nr_flying;
745                         if (nr_received == 0)
746                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
747                                         (unsigned long long)received->elapsed);
748                         nr_received += 1;
749                         if (!(received->state & XS_SERVED)) {
750                                 nr_failed += 1;
751                                 //report_request(received);
752                         }
753
754                         if (xseg_put_request(xseg, received->portno, received))
755                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
756                 }
757
758                 if (!submitted && !received)
759                         xseg_wait_signal(xseg, 10000000L);
760
761                 if (nr_received >= loops)
762                         break;
763         }
764         gettimeofday(&tv2, NULL);
765
766         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
767                 nr_submitted, nr_received, nr_failed, nr_mismatch);
768         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
769         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
770
771         return 0;
772 }
773
774 int cmd_report(uint32_t portno)
775 {
776         struct xseg_port *port = xseg_get_port(xseg, portno);
777         struct xq *fq, *rq, *pq;
778         fq = xseg_get_queue(xseg, port, free_queue);
779         rq = xseg_get_queue(xseg, port, request_queue);
780         pq = xseg_get_queue(xseg, port, reply_queue);
781         fprintf(stderr, "port %u:\n"
782                 "       free_queue [%p] count : %u\n"
783                 "    request_queue [%p] count : %u\n"
784                 "      reply_queue [%p] count : %u\n",
785                 portno,
786                 (void *)fq, xq_count(fq),
787                 (void *)rq, xq_count(rq),
788                 (void *)pq, xq_count(pq));
789         return 0;
790 }
791
792 int cmd_join(void)
793 {
794         if (xseg)
795                 return 0;
796
797         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
798         if (!xseg) {
799                 fprintf(stderr, "cannot join segment!\n");
800                 return -1;
801         }
802         return 0;
803 }
804
805 int cmd_reportall(void)
806 {
807         uint32_t t;
808
809         if (cmd_join())
810                 return -1;
811
812         //fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
813         for (t = 0; t < xseg->config.nr_ports; t++)
814                 cmd_report(t);
815
816         return 0;
817 }
818
819 int cmd_create(void)
820 {
821         int r = xseg_create(&cfg);
822         if (r) {
823                 fprintf(stderr, "cannot create segment!\n");
824                 return -1;
825         }
826
827         fprintf(stderr, "Segment initialized.\n");
828         return 0;
829 }
830
831 int cmd_destroy(void)
832 {
833         if (!xseg && cmd_join())
834                 return -1;
835         xseg_leave(xseg);
836         xseg_destroy(xseg);
837         xseg = NULL;
838         fprintf(stderr, "Segment destroyed.\n");
839         return 0;
840 }
841
842 int cmd_alloc_requests(unsigned long nr)
843 {
844         return xseg_alloc_requests(xseg, srcport, nr);
845 }
846
847 int cmd_free_requests(unsigned long nr)
848 {
849         return xseg_free_requests(xseg, srcport, nr);
850 }
851
852 int cmd_put_requests(void)
853 {
854         struct xseg_request *req;
855
856         for (;;) {
857                 req = xseg_accept(xseg, dstport);
858                 if (!req)
859                         break;
860                 if (xseg_put_request(xseg, req->portno, req))
861                         fprintf(stderr, "Cannot put request at port %u\n", req->portno);
862         }
863
864         return 0;
865 }
866
867 int cmd_finish(unsigned long nr, int fail)
868 {
869         struct xseg_request *req;
870         char *buf = malloc(sizeof(char) * 8128);
871         char *req_target, *req_data;
872         xseg_bind_port(xseg, srcport);
873
874         for (; nr--;) {
875                 xseg_prepare_wait(xseg, srcport);
876                 req = xseg_accept(xseg, srcport);
877                 if (req) {
878                         req_target = xseg_get_target(xseg, req);
879                         req_data = xseg_get_data(xseg, req);
880                         xseg_cancel_wait(xseg, srcport);
881                         if (fail == 1)
882                                 req->state &= ~XS_SERVED;
883                         else {
884                                 if (req->op == X_READ)
885                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
886                                 else if (req->op == X_WRITE) 
887                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
888                                 else if (req->op == X_INFO)
889                                         *((uint64_t *) req->data) = 4294967296;
890                                 
891                                 req->state |= XS_SERVED;
892                                 req->serviced = req->size;
893                         }
894
895                         xseg_respond(xseg, dstport, req);
896                         xseg_signal(xseg, dstport);
897                         continue;
898                 }
899                 ++nr;
900                 xseg_wait_signal(xseg, 10000000L);
901         }
902
903         free(buf);
904
905         return 0;
906 }
907
908 void handle_reply(struct xseg_request *req)
909 {
910         char *req_data = xseg_get_data(xseg, req);
911         if (!(req->state & XS_SERVED)) {
912                 report_request(req);
913                 goto put;
914         }
915
916         switch (req->op) {
917         case X_READ:
918                 fwrite(req_data, 1, req->datalen, stdout);
919                 break;
920
921         case X_WRITE:
922         case X_SYNC:
923         case X_DELETE:
924         case X_TRUNCATE:
925         case X_COMMIT:
926         case X_CLONE:
927         case X_INFO:
928                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
929                 break;
930
931         default:
932                 break;
933         }
934
935 put:
936         if (xseg_put_request(xseg, req->portno, req))
937                 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
938 }
939
940 int cmd_wait(uint32_t nr)
941 {
942         struct xseg_request *req;
943         long ret;
944
945         for (;;) {
946                 req = xseg_receive(xseg, srcport);
947                 if (req) {
948                         handle_reply(req);
949                         nr--;
950                         if (nr == 0)
951                                 break;
952                         continue;
953                 }
954
955                 ret = xseg_prepare_wait(xseg, srcport);
956                 if (ret)
957                         return -1;
958
959                 ret = xseg_wait_signal(xseg, 1000000);
960                 ret = xseg_cancel_wait(xseg, srcport);
961                 if (ret)
962                         return -1;
963         }
964
965         return 0;
966 }
967
968 int cmd_put_replies(void)
969 {
970         struct xseg_request *req;
971
972         for (;;) {
973                 req = xseg_receive(xseg, dstport);
974                 if (!req)
975                         break;
976                 fprintf(stderr, "request: %08llx%08llx\n"
977                         "     op: %u\n"
978                         "  state: %u\n",
979                         0LL, (unsigned long long)req->serial,
980                         req->op,
981                         req->state);
982                 report_request(req);
983
984                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
985
986                 if (xseg_put_request(xseg, req->portno, req))
987                         fprintf(stderr, "Cannot put reply\n");
988         }
989
990         return 0;
991 }
992
993 int cmd_bind(long portno)
994 {
995         struct xseg_port *port = xseg_bind_port(xseg, portno);
996         if (!port) {
997                 fprintf(stderr, "failed to bind port %ld\n", portno);
998                 return 1;
999         }
1000
1001         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1002         return 0;
1003 }
1004
1005 int cmd_signal(uint32_t portno)
1006 {
1007         return xseg_signal(xseg, portno);
1008 }
1009
1010 int parse_ports(char *str)
1011 {
1012         int ret = 0;
1013         char *s = str;
1014
1015         for (;;) {
1016                 if (*s == 0)
1017                         return 0;
1018
1019                 if (*s == ':') {
1020                         *s = 0;
1021                         if ((s > str) && isdigit(str[0])) {
1022                                 srcport = atol(str);
1023                                 ret ++;
1024                         }
1025                         break;
1026                 }
1027                 s ++;
1028         }
1029
1030         s += 1;
1031         str = s;
1032
1033         for (;;) {
1034                 if (*s == 0) {
1035                         if ((s > str) && isdigit(str[0])) {
1036                                 dstport = atol(str);
1037                                 ret ++;
1038                         }
1039                         break;
1040                 }
1041                 s ++;
1042         }
1043
1044         return ret;
1045 }
1046
1047 int main(int argc, char **argv)
1048 {
1049         int i, ret = 0;
1050         char *spec;
1051
1052         if (argc < 3)
1053                 return help();
1054
1055         srcport = -1;
1056         dstport = -1;
1057         spec = argv[1];
1058
1059         if (xseg_parse_spec(spec, &cfg)) {
1060                 fprintf(stderr, "Cannot parse spec\n");
1061                 return -1;
1062         }
1063
1064         if (xseg_initialize()) {
1065                 fprintf(stderr, "cannot initialize!\n");
1066                 return -1;
1067         }
1068
1069         for (i = 2; i < argc; i++) {
1070
1071                 if (!strcmp(argv[i], "create")) {
1072                         ret = cmd_create();
1073                         continue;
1074                 }
1075
1076                 if (!strcmp(argv[i], "join")) {
1077                         ret = cmd_join();
1078                         if (!ret)
1079                                 fprintf(stderr, "Segment joined.\n");
1080                         continue;
1081                 }
1082
1083                 if (!strcmp(argv[i], "destroy")) {
1084                         ret = cmd_destroy();
1085                         continue;
1086                 }
1087
1088                 if (cmd_join())
1089                         return -1;
1090
1091                 if (!strcmp(argv[i], "reportall")) {
1092                         ret = cmd_reportall();
1093                         continue;
1094                 }
1095
1096                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1097                         ret = cmd_bind(atol(argv[i+1]));
1098                         i += 1;
1099                         continue;
1100                 }
1101
1102                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1103                         ret = cmd_signal(atol(argv[i+1]));
1104                         i += 1;
1105                         continue;
1106                 }
1107
1108                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1109                         ret = cmd_bridge(atol(argv[i+1]),
1110                                          atol(argv[i+2]),
1111                                          argv[i+3],
1112                                          argv[i+4]);
1113                         i += 4;
1114                         continue;
1115                 }
1116
1117                 if (srcport == -1) {
1118                         if (!parse_ports(argv[i]))
1119                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1120                         continue;
1121                 }
1122
1123                 if (dstport == -1) {
1124                         if (!parse_ports(argv[i]))
1125                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1126                         continue;
1127                 }
1128
1129                 if (!strcmp(argv[i], "report")) {
1130                         ret = cmd_report(dstport);
1131                         continue;
1132                 }
1133
1134                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1135                         ret = cmd_alloc_requests(atol(argv[i+1]));
1136                         i += 1;
1137                         continue;
1138                 }
1139
1140                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1141                         ret = cmd_free_requests(atol(argv[i+1]));
1142                         i += 1;
1143                         continue;
1144                 }
1145
1146                 if (!strcmp(argv[i], "put_requests")) {
1147                         ret = cmd_put_requests();
1148                         continue;
1149                 }
1150
1151                 if (!strcmp(argv[i], "put_replies")) {
1152                         ret = cmd_put_replies();
1153                         continue;
1154                 }
1155
1156                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1157                         ret = cmd_finish(atol(argv[i+1]), 0);
1158                         i += 1;
1159                         continue;
1160                 }
1161
1162                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1163                         ret = cmd_finish(atol(argv[i+1]), 1);
1164                         i += 1;
1165                         continue;
1166                 }
1167
1168                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1169                         ret = cmd_wait(atol(argv[i+1]));
1170                         i += 1;
1171                         continue;
1172                 }
1173
1174                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1175                         long nr_loops = atol(argv[i+1]);
1176                         unsigned int seed = atoi(argv[i+2]);
1177                         unsigned int targetlen = atoi(argv[i+3]);
1178                         unsigned int chunksize = atoi(argv[i+4]);
1179                         unsigned long objectsize = atol(argv[i+5]);
1180                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1181                         i += 5;
1182                         continue;
1183                 }
1184
1185                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1186                         long nr_loops = atol(argv[i+1]);
1187                         unsigned int seed = atoi(argv[i+2]);
1188                         unsigned int targetlen = atoi(argv[i+3]);
1189                         unsigned int chunksize = atoi(argv[i+4]);
1190                         unsigned long objectsize = atol(argv[i+5]);
1191                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1192                         i += 5;
1193                         continue;
1194                 }
1195
1196                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1197                         long nr_loops = atol(argv[i+1]);
1198                         long concurrent_reqs = atol(argv[i+2]);
1199                         int op = atoi(argv[i+3]);
1200                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1201                         i += 3;
1202                         continue;
1203                 }
1204
1205                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1206                         char *target = argv[i+1];
1207                         uint64_t offset = atol(argv[i+2]);
1208                         uint64_t size   = atol(argv[i+3]);
1209                         ret = cmd_read(target, offset, size);
1210                         i += 3;
1211                         continue;
1212                 }
1213
1214                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1215                         char *target = argv[i+1];
1216                         uint64_t offset = atol(argv[i+2]);
1217                         ret = cmd_write(target, offset);
1218                         i += 2;
1219                         continue;
1220                 }
1221
1222                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1223                         char *target = argv[i+1];
1224                         uint64_t offset = atol(argv[i+2]);
1225                         ret = cmd_truncate(target, offset);
1226                         i += 2;
1227                         continue;
1228                 }
1229
1230                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1231                         char *target = argv[i+1];
1232                         ret = cmd_delete(target);
1233                         i += 1;
1234                         continue;
1235                 }
1236
1237                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1238                         char *target = argv[i+1];
1239                         ret = cmd_acquire(target);
1240                         i += 1;
1241                         continue;
1242                 }
1243
1244                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1245                         char *target = argv[i+1];
1246                         ret = cmd_release(target);
1247                         i += 1;
1248                         continue;
1249                 }
1250
1251                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1252                         char *src = argv[i+1];
1253                         char *dst = argv[i+2];
1254                         ret = cmd_copy(src, dst);
1255                         i += 2;
1256                         continue;
1257                 }
1258
1259                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1260                         char *src = argv[i+1];
1261                         char *dst = argv[i+2];
1262                         ret = cmd_clone(src, dst);
1263                         i += 2;
1264                         continue;
1265                 }
1266
1267                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1268                         char *target = argv[i+1];
1269                         ret = cmd_info(target);
1270                         i += 1;
1271                         continue;
1272                 }
1273
1274
1275                 if (!parse_ports(argv[i]))
1276                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1277         }
1278
1279         /* xseg_leave(); */
1280         return ret;
1281 }