9e5e378847292b55061a4349c8dc4291f50c744e
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait        <nr_replies>\n"
33                 "    complete    <nr_requests>\n"
34                 "    fail        <nr_requests>\n"
35                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
37                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
38                 "    info        <target>\n"
39                 "    read        <target> <offset> <size>\n"
40                 "    write       <target> <offset> < data\n"
41                 "    truncate    <target> <size>\n"
42                 "    delete      <target>\n"
43                 "    acquire     <target>\n"
44                 "    release     <target>\n"
45                 "    copy        <src>  <dst>\n"
46                 "    clone       <src>  <dst>\n"
47         );
48         return 1;
49 }
50
51 char *namebuf;
52 char *chunk;
53 struct xseg_config cfg;
54 struct xseg *xseg;
55 uint32_t srcport, dstport;
56 uint64_t reqs;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
60  */
61
62 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
63 {
64         int i;
65         char c;
66         for (i = 0; i < namelen; i += 1) {
67                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
68                 c = '0' + ((c + (c >> 4)) & 0xf);
69                 if (c > '9')
70                         c += 'a'-'0'-10;
71                 name[i] = c;
72                 seed *= ((seed % 137911) | 1) * 137911;
73         }
74 }
75
76 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
77 {
78         int i;
79         char c;
80         for (i = 0; i < namelen; i += 1) {
81                 c = seed;
82                 name[i] = 'A' + (c & 0xf);
83                 seed += 1;
84         }
85 }
86
87 uint64_t pick(uint64_t size)
88 {
89         return (uint64_t)((double)(RAND_MAX) / random());
90 }
91
92 void mkchunk(   char *chunk, uint32_t datalen,
93                 char *target, uint32_t targetlen, uint64_t offset)
94 {
95         long i, r, bufsize = targetlen + 16;
96         char buf[bufsize];
97         r = datalen % bufsize;
98         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
99
100         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
101                 memcpy(chunk + i, buf, bufsize);
102
103         memcpy(chunk + datalen - r, buf, r);
104 }
105
106 int chkchunk(   char *chunk, uint32_t datalen,
107                 char *target, uint32_t targetlen, uint64_t offset)
108 {
109         long i, r;
110         int bufsize = targetlen + 16;
111         char buf[bufsize];
112         r = datalen % targetlen;
113         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
114
115         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
116                 if (memcmp(chunk + i, buf, bufsize)) {
117                         /*printf("mismatch: '%*s'* vs '%*s'\n",
118                                 bufsize, buf, datalen, chunk);
119                         */
120                         return 0;
121                 }
122
123         if (memcmp(chunk + datalen - r, buf, r))
124                 return 0;
125
126         return 1;
127 }
128
129
130 #define ALLOC_MIN 4096
131 #define ALLOC_MAX 1048576
132
133 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
134 {
135         static uint64_t alloc_size;
136         static char *buf;
137         uint64_t size = 0;
138         char *p;
139         size_t r;
140
141         if (alloc_size < ALLOC_MIN)
142                 alloc_size = ALLOC_MIN;
143
144         if (alloc_size > ALLOC_MAX)
145                 alloc_size = ALLOC_MAX;
146
147         p = realloc(buf, alloc_size);
148         if (!p) {
149                 if (buf)
150                         free(buf);
151                 buf = NULL;
152                 goto out;
153         }
154
155         buf = p;
156
157         while (!feof(fp)) {
158                 r = fread(buf + size, 1, alloc_size - size, fp);
159                 if (!r)
160                         break;
161                 size += r;
162                 if (size >= alloc_size) {
163                         p = realloc(buf, alloc_size * 2);
164                         if (!p) {
165                                 if (buf)
166                                         free(buf);
167                                 buf = NULL;
168                                 size = 0;
169                                 goto out;
170                         }
171                         buf = p;
172                         alloc_size *= 2;
173                 }
174         }
175
176 out:
177         *retbuf = buf;
178         *retsize = size;
179 }
180
181 void report_request(struct xseg_request *req)
182 {
183         uint32_t max = req->datalen;
184         char *data = xseg_get_data(xseg, req);
185         if (max > 128)
186                 max = 128;
187         data[max-1] = 0;
188         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
189         fprintf(stderr, "data: %s\n", data);
190 }
191
192 int cmd_info(char *target)
193 {
194         uint32_t targetlen = strlen(target);
195         size_t size = sizeof(uint64_t);
196         int r;
197         xserial srl;
198         struct xseg_request *req;
199         char *req_target;
200
201         req = xseg_get_request(xseg, srcport);
202         if (!req) {
203                 fprintf(stderr, "No request!\n");
204                 return -1;
205         }
206
207         r = xseg_prep_request(xseg, req, targetlen, size);
208         if (r < 0) {
209                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
210                         (unsigned long) targetlen, (unsigned long) size);
211                 xseg_put_request(xseg, srcport, req);
212                 return -1;
213         }
214
215         req_target = xseg_get_target(xseg, req);
216         strncpy(req_target, target, targetlen);
217         req->offset = 0;
218         req->size = size;
219         req->op = X_INFO;
220
221         srl = xseg_submit(xseg, dstport, req);
222         if (srl == Noneidx)
223                 return -1;
224
225         xseg_signal(xseg, dstport);
226
227         return 0;
228 }
229
230 int cmd_read(char *target, uint64_t offset, uint64_t size)
231 {
232         uint32_t targetlen = strlen(target);
233         int r;
234         xserial srl;
235         char *req_target;
236         struct xseg_request *req = xseg_get_request(xseg, srcport);
237         printf("%x\n", req);
238         if (!req) {
239                 fprintf(stderr, "No request\n");
240                 return -1;
241         }
242
243         r = xseg_prep_request(xseg, req, targetlen, size);
244         if (r < 0) {
245                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
246                         (unsigned long)targetlen, (unsigned long long)size);
247                 xseg_put_request(xseg, srcport, req);
248                 return -1;
249         }
250
251         req_target = xseg_get_target(xseg, req);
252         strncpy(req_target, target, targetlen);
253         req->offset = offset;
254         req->size = size;
255         req->op = X_READ;
256         report_request(req);
257         srl = xseg_submit(xseg, dstport, req);
258         if (srl == Noneidx)
259                 return -1;
260
261         xseg_signal(xseg, dstport);
262         return 0;
263 }
264
265 int cmd_write(char *target, uint64_t offset)
266 {
267         char *buf = NULL;
268         int r;
269         xserial srl;
270         uint64_t size = 0;
271         char *req_target, *req_data;
272         uint32_t targetlen = strlen(target);
273         struct xseg_request *req;
274
275         inputbuf(stdin, &buf, &size);
276         if (!size) {
277                 fprintf(stderr, "No input\n");
278                 return -1;
279         }
280
281         req = xseg_get_request(xseg, srcport);
282         if (!req) {
283                 fprintf(stderr, "No request\n");
284                 return -1;
285         }
286
287         r = xseg_prep_request(xseg, req, targetlen, size);
288         if (r < 0) {
289                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
290                         (unsigned long)targetlen, (unsigned long long)size);
291                 xseg_put_request(xseg, srcport, req);
292                 return -1;
293         }
294
295         req_target = xseg_get_target(xseg, req);
296         strncpy(req_target, target, targetlen);
297         
298         req_data = xseg_get_data(xseg, req);
299         memcpy(req_data, buf, size);
300         req->offset = offset;
301         req->size = size;
302         req->op = X_WRITE;
303
304         srl = xseg_submit(xseg, dstport, req);
305         if (srl == Noneidx) {
306                 fprintf(stderr, "Cannot submit\n");
307                 return -1;
308         }
309
310         return 0;
311 }
312
313 int cmd_truncate(char *target, uint64_t offset)
314 {
315         return 0;
316 }
317
318 int cmd_delete(char *target)
319 {
320         return 0;
321 }
322
323 int cmd_acquire(char *target)
324 {
325         return 0;
326 }
327
328 int cmd_release(char *target)
329 {
330         return 0;
331 }
332
333 int cmd_copy(char *src, char *dst)
334 {
335         return 0;
336 }
337
338 int cmd_clone(char *src, char *dst)
339 {
340         return 0;
341 }
342
343 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
344                 struct xseg_request *req)
345 {
346         FILE *logfp;
347         char target[64], data[64];
348         char *req_target, *req_data;
349         /* null terminate name in case of req->target is less than 63 characters,
350          * and next character after name (aka first byte of next buffer) is not
351          * null
352          */
353         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
354         
355         req_target = xseg_get_target(xseg, req);
356         req_data = xseg_get_data(xseg, req);
357
358         logfp = fdopen(logfd, "a");
359         if (!logfp)
360                 return;
361
362         switch(method) {
363         case 0:
364                 strncpy(target, req_target, end);
365                 target[end] = 0;
366                 strncpy(data, req_data, 63);
367                 data[63] = 0;
368
369                 fprintf(logfp,
370                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
371                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
372                         (unsigned int)portno1,
373                         (unsigned int)portno2,
374                         (unsigned int)req->op,
375                         (unsigned long long)req->offset,
376                         (unsigned long)req->size,
377                         (unsigned int)req->state,
378                         (unsigned int)req->targetlen, target,
379                         (unsigned long long)req->datalen, data);
380                 break;
381         case 1:
382                 fprintf(logfp,
383                         "src port: %u, dst port: %u, op: %u\n",
384                         (unsigned int)portno1,
385                         (unsigned int)portno2,
386                         (unsigned int)req->op);
387                 break;
388         case 2:
389                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
390                         (unsigned int)portno1,
391                         (unsigned int)portno2,
392                         (unsigned long long)++reqs);
393         }
394
395         fclose(logfp);
396         return;
397 }
398
399 #define LOG_ACCEPT  0
400 #define LOG_RECEIVE 1
401
402 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
403 {
404         struct xseg_request *req;
405         int logfd, method;
406         if (!strcmp(logfile, "-"))
407                 logfd = 1;
408         else {
409                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
410                 if (logfd < 0) {
411                         perror(logfile);
412                         return -1;
413                 }
414         }
415
416         if (!strcmp(how, "full"))
417                 method = 0;
418         else if (!strcmp(how, "summary"))
419                 method = 1;
420         else
421                 method = 2;
422
423         for (;;) {
424                 int reloop = 0, active;
425                 xseg_prepare_wait(xseg, portno1);
426                 xseg_prepare_wait(xseg, portno2);
427                 req = NULL;
428
429                 for (;;) {
430                         active = 0;
431
432                         req = xseg_accept(xseg, portno1);
433                         if (req) {
434                                 xseg_submit(xseg, portno2, req);
435                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
436                                 active += 1;
437                         }
438
439                         req = xseg_accept(xseg, portno2);
440                         if (req) {
441                                 xseg_submit(xseg, portno1, req);
442                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
443                                 active += 1;
444                         }
445
446                         req = xseg_receive(xseg, portno1);
447                         if (req) {
448                                 xseg_respond(xseg, portno2, req);
449                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
450                                 active += 1;
451                         }
452
453                         req = xseg_receive(xseg, portno2);
454                         if (req) {
455                                 xseg_respond(xseg, portno1, req);
456                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
457                                 active += 1;
458                         }
459
460                         if (active == 0) {
461                                 if (reloop)
462                                         break;
463                                 /* wait on multiple queues? */
464                                 xseg_wait_signal(xseg, 100000);
465                                 break;
466                         } else {
467                                 xseg_cancel_wait(xseg, portno1);        
468                                 xseg_cancel_wait(xseg, portno2);        
469                                 reloop = 1;
470                         }
471                 }
472         }
473
474         close(logfd);
475
476         return 0;
477 }
478
479 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
480 {
481         if (loops < 0)
482                 return help();
483
484         if (targetlen >= chunksize) {
485                 fprintf(stderr, "targetlen >= chunksize\n");
486                 return -1;
487         }
488
489         char *p = realloc(namebuf, targetlen+1);
490         if (!p) {
491                 fprintf(stderr, "Cannot allocate memory\n");
492                 return -1;
493         }
494         namebuf = p;
495
496         p = realloc(chunk, chunksize);
497         if (!p) {
498                 fprintf(stderr, "Cannot allocate memory\n");
499                 return -1;
500         }
501         chunk = p;
502         memset(chunk, 0, chunksize);
503
504         srandom(seed);
505
506         struct xseg_request *submitted = NULL, *received;
507         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
508         int reported = 0, r;
509         uint64_t offset;
510         xserial srl;
511         char *req_data, *req_target;
512
513         for (;;) {
514                 xseg_prepare_wait(xseg, srcport);
515                 if (nr_submitted < loops &&
516                     (submitted = xseg_get_request(xseg, srcport))) {
517                         xseg_cancel_wait(xseg, srcport);
518                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
519                         if (r < 0) {
520                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
521                                         targetlen, chunksize);
522                                 xseg_put_request(xseg, submitted->portno, submitted);
523                                 return -1;
524                         }
525                         
526                         req_target = xseg_get_target(xseg, submitted);
527                         req_data = xseg_get_data(xseg, submitted);
528
529                         nr_submitted += 1;
530                         reported = 0;
531                         seed = random();
532                         mkname(namebuf, targetlen, seed);
533                         namebuf[targetlen] = 0;
534                         //printf("%ld: %s\n", nr_submitted, namebuf);
535                         strncpy(req_target, namebuf, targetlen);
536                         offset = 0;// pick(size);
537                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
538
539                         submitted->offset = offset;
540                         submitted->size = chunksize;
541                         submitted->op = X_WRITE;
542                         submitted->flags |= XF_NOSYNC;
543
544                         srl = xseg_submit(xseg, dstport, submitted);
545                         (void)srl;
546                         xseg_signal(xseg, dstport);
547                 }
548
549                 received = xseg_receive(xseg, srcport);
550                 if (received) {
551                         xseg_cancel_wait(xseg, srcport);
552                         nr_received += 1;
553                         if (!(received->state & XS_SERVED)) {
554                                 nr_failed += 1;
555                                 report_request(received);
556                         }
557                         if (xseg_put_request(xseg, received->portno, received))
558                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
559                 }
560
561                 if (!submitted && !received)
562                         xseg_wait_signal(xseg, 1000000);
563
564                         if (nr_submitted % 1000 == 0 && !reported) {
565                                 reported = 1;
566                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
567                                         nr_submitted, nr_received, nr_failed);
568                         }
569
570                         if (nr_received >= loops)
571                                 break;
572         }
573
574         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
575                 nr_submitted, nr_received, nr_failed);
576         return 0;
577 }
578
579 /* note:
580  * prepare/wait rhythm,
581  * files are converted to independent chunk access patterns,
582 */
583
584 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
585 {
586         if (loops < 0)
587                 return help();
588
589         if (targetlen >= chunksize) {
590                 fprintf(stderr, "targetlen >= chunksize\n");
591                 return -1;
592         }
593
594         char *p = realloc(namebuf, targetlen+1);
595         if (!p) {
596                 fprintf(stderr, "Cannot allocate memory\n");
597                 return -1;
598         }
599         namebuf = p;
600
601         p = realloc(chunk, chunksize);
602         if (!p) {
603                 fprintf(stderr, "Cannot allocate memory\n");
604                 return -1;
605         }
606         chunk = p;
607         memset(chunk, 0, chunksize);
608
609         srandom(seed);
610
611         struct xseg_request *submitted = NULL, *received;
612         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
613         int reported = 0, r;
614         uint64_t offset;
615         xserial srl;
616         char *req_data, *req_target;
617
618         for (;;) {
619                 submitted = NULL;
620                 xseg_prepare_wait(xseg, srcport);
621                 if (nr_submitted < loops &&
622                     (submitted = xseg_get_request(xseg, srcport))) {
623                         xseg_cancel_wait(xseg, srcport);
624                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
625                         if (r < 0) {
626                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
627                                         targetlen, chunksize);
628                                 xseg_put_request(xseg, submitted->portno, submitted);
629                                 return -1;
630                         }
631
632                         req_target = xseg_get_target(xseg, submitted);
633                         nr_submitted += 1;
634                         reported = 0;
635                         seed = random();
636                         mkname(namebuf, targetlen, seed);
637                         namebuf[targetlen] = 0;
638                         //printf("%ld: %s\n", nr_submitted, namebuf);
639                         offset = 0;//pick(size);
640
641                         strncpy(req_target, namebuf, targetlen);
642                         submitted->offset = offset;
643                         submitted->size = chunksize;
644                         submitted->op = X_READ;
645
646                         srl = xseg_submit(xseg, dstport, submitted);
647                         (void)srl;
648                         xseg_signal(xseg, dstport);
649                 }
650
651                 received = xseg_receive(xseg, srcport);
652                 if (received) {
653                         xseg_cancel_wait(xseg, srcport);
654                         nr_received += 1;
655                         req_target = xseg_get_target(xseg, received);
656                         req_data = xseg_get_data(xseg, received);
657                         if (!(received->state & XS_SERVED)) {
658                                 nr_failed += 1;
659                                 report_request(received);
660                         } else if (!chkchunk(req_data, received->datalen,
661                                         req_target, received->targetlen, received->offset)) {
662                                 nr_mismatch += 1;
663                         }
664
665                         if (xseg_put_request(xseg, received->portno, received))
666                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
667                 }
668
669                 if (!submitted && !received)
670                         xseg_wait_signal(xseg, 1000000);
671
672                 if (nr_submitted % 1000 == 0 && !reported) {
673                         reported = 1;
674                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
675                         nr_submitted, nr_received, nr_failed, nr_mismatch);
676                 }
677
678                 if (nr_received >= loops)
679                         break;
680         }
681
682         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
683                 nr_submitted, nr_received, nr_failed, nr_mismatch);
684         return 0;
685 }
686
687 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
688 {
689         if (loops < 0)
690                 return help();
691
692         struct xseg_request *submitted = NULL, *received;
693         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
694         int reported = 0, r;
695         uint64_t offset;
696         uint32_t targetlen = 10, chunksize = 4096;
697         struct timeval tv1, tv2;
698         xserial srl;
699         char *req_data, *req_target;
700
701         xseg_bind_port(xseg, srcport);
702
703         gettimeofday(&tv1, NULL);
704         for (;;) {
705                 submitted = NULL;
706                 xseg_prepare_wait(xseg, srcport);
707                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
708                     (submitted = xseg_get_request(xseg, srcport))) {
709                         xseg_cancel_wait(xseg, srcport);
710                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
711                         if (r < 0) {
712                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
713                                         targetlen, chunksize);
714                                 xseg_put_request(xseg, submitted->portno, submitted);
715                                 return -1;
716                         }
717
718                         ++nr_flying;
719                         nr_submitted += 1;
720                         reported = 0;
721                         offset = 0;//pick(size);
722
723                         submitted->offset = offset;
724                         submitted->size = chunksize;
725                         req_target = xseg_get_target(xseg, submitted);
726                         req_data = xseg_get_data(xseg, submitted);
727
728                         if (op == 0)
729                                 submitted->op = X_INFO;
730                         else if (op == 1)
731                                 submitted->op = X_READ;
732                         else if (op == 2) {
733                                 submitted->op = X_WRITE;
734                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
735                         }
736
737                         srl = xseg_submit(xseg, dstport, submitted);
738                         (void)srl;
739                         if (xseg_signal(xseg, dstport) < 0)
740                                 perror("Cannot signal peer");
741                 }
742                 received = xseg_receive(xseg, srcport);
743                 if (received) {
744                         xseg_cancel_wait(xseg, srcport);
745                         --nr_flying;
746                         if (nr_received == 0)
747                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
748                                         (unsigned long long)received->elapsed);
749                         nr_received += 1;
750                         if (!(received->state & XS_SERVED)) {
751                                 nr_failed += 1;
752                                 //report_request(received);
753                         }
754
755                         if (xseg_put_request(xseg, received->portno, received))
756                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
757                 }
758
759                 if (!submitted && !received)
760                         xseg_wait_signal(xseg, 10000000L);
761
762                 if (nr_received >= loops)
763                         break;
764         }
765         gettimeofday(&tv2, NULL);
766
767         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
768                 nr_submitted, nr_received, nr_failed, nr_mismatch);
769         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
770         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
771
772         return 0;
773 }
774
775 int cmd_report(uint32_t portno)
776 {
777         struct xseg_port *port = xseg_get_port(xseg, portno);
778         if (!port) {
779                 printf("port %u is not assigned\n", portno);
780                 return 0;
781         }
782         struct xq *fq, *rq, *pq;
783         fq = xseg_get_queue(xseg, port, free_queue);
784         rq = xseg_get_queue(xseg, port, request_queue);
785         pq = xseg_get_queue(xseg, port, reply_queue);
786         fprintf(stderr, "port %u:\n"
787                 "       free_queue [%p] count : %u\n"
788                 "    request_queue [%p] count : %u\n"
789                 "      reply_queue [%p] count : %u\n",
790                 portno,
791                 (void *)fq, xq_count(fq),
792                 (void *)rq, xq_count(rq),
793                 (void *)pq, xq_count(pq));
794         return 0;
795 }
796
797 int cmd_join(void)
798 {
799         if (xseg)
800                 return 0;
801
802         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
803         if (!xseg) {
804                 fprintf(stderr, "cannot join segment!\n");
805                 return -1;
806         }
807         return 0;
808 }
809
810 int cmd_reportall(void)
811 {
812         uint32_t t;
813
814         if (cmd_join())
815                 return -1;
816
817         //fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
818         for (t = 0; t < xseg->config.nr_ports; t++)
819                 cmd_report(t);
820
821         return 0;
822 }
823
824 int cmd_create(void)
825 {
826         int r = xseg_create(&cfg);
827         if (r) {
828                 fprintf(stderr, "cannot create segment!\n");
829                 return -1;
830         }
831
832         fprintf(stderr, "Segment initialized.\n");
833         return 0;
834 }
835
836 int cmd_destroy(void)
837 {
838         if (!xseg && cmd_join())
839                 return -1;
840         xseg_leave(xseg);
841         xseg_destroy(xseg);
842         xseg = NULL;
843         fprintf(stderr, "Segment destroyed.\n");
844         return 0;
845 }
846
847 int cmd_alloc_requests(unsigned long nr)
848 {
849         return xseg_alloc_requests(xseg, srcport, nr);
850 }
851
852 int cmd_free_requests(unsigned long nr)
853 {
854         return xseg_free_requests(xseg, srcport, nr);
855 }
856
857 int cmd_put_requests(void)
858 {
859         struct xseg_request *req;
860
861         for (;;) {
862                 req = xseg_accept(xseg, dstport);
863                 if (!req)
864                         break;
865                 if (xseg_put_request(xseg, req->portno, req))
866                         fprintf(stderr, "Cannot put request at port %u\n", req->portno);
867         }
868
869         return 0;
870 }
871
872 int cmd_finish(unsigned long nr, int fail)
873 {
874         struct xseg_request *req;
875         char *buf = malloc(sizeof(char) * 8128);
876         char *req_target, *req_data;
877         xseg_bind_port(xseg, srcport);
878
879         for (; nr--;) {
880                 xseg_prepare_wait(xseg, srcport);
881                 req = xseg_accept(xseg, srcport);
882                 if (req) {
883                         req_target = xseg_get_target(xseg, req);
884                         req_data = xseg_get_data(xseg, req);
885                         xseg_cancel_wait(xseg, srcport);
886                         if (fail == 1)
887                                 req->state &= ~XS_SERVED;
888                         else {
889                                 if (req->op == X_READ)
890                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
891                                 else if (req->op == X_WRITE) 
892                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
893                                 else if (req->op == X_INFO)
894                                         *((uint64_t *) req->data) = 4294967296;
895                                 
896                                 req->state |= XS_SERVED;
897                                 req->serviced = req->size;
898                         }
899
900                         xseg_respond(xseg, dstport, req);
901                         xseg_signal(xseg, dstport);
902                         continue;
903                 }
904                 ++nr;
905                 xseg_wait_signal(xseg, 10000000L);
906         }
907
908         free(buf);
909
910         return 0;
911 }
912
913 void handle_reply(struct xseg_request *req)
914 {
915         char *req_data = xseg_get_data(xseg, req);
916         if (!(req->state & XS_SERVED)) {
917                 report_request(req);
918                 goto put;
919         }
920
921         switch (req->op) {
922         case X_READ:
923                 fwrite(req_data, 1, req->datalen, stdout);
924                 break;
925
926         case X_WRITE:
927         case X_SYNC:
928         case X_DELETE:
929         case X_TRUNCATE:
930         case X_COMMIT:
931         case X_CLONE:
932         case X_INFO:
933                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
934                 break;
935
936         default:
937                 break;
938         }
939
940 put:
941         if (xseg_put_request(xseg, req->portno, req))
942                 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
943 }
944
945 int cmd_wait(uint32_t nr)
946 {
947         struct xseg_request *req;
948         long ret;
949
950         for (;;) {
951                 req = xseg_receive(xseg, srcport);
952                 if (req) {
953                         handle_reply(req);
954                         nr--;
955                         if (nr == 0)
956                                 break;
957                         continue;
958                 }
959
960                 ret = xseg_prepare_wait(xseg, srcport);
961                 if (ret)
962                         return -1;
963
964                 ret = xseg_wait_signal(xseg, 1000000);
965                 ret = xseg_cancel_wait(xseg, srcport);
966                 if (ret)
967                         return -1;
968         }
969
970         return 0;
971 }
972
973 int cmd_put_replies(void)
974 {
975         struct xseg_request *req;
976
977         for (;;) {
978                 req = xseg_receive(xseg, dstport);
979                 if (!req)
980                         break;
981                 fprintf(stderr, "request: %08llx%08llx\n"
982                         "     op: %u\n"
983                         "  state: %u\n",
984                         0LL, (unsigned long long)req->serial,
985                         req->op,
986                         req->state);
987                 report_request(req);
988
989                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
990
991                 if (xseg_put_request(xseg, req->portno, req))
992                         fprintf(stderr, "Cannot put reply\n");
993         }
994
995         return 0;
996 }
997
998 int cmd_bind(long portno)
999 {
1000         struct xseg_port *port = xseg_bind_port(xseg, portno);
1001         if (!port) {
1002                 fprintf(stderr, "failed to bind port %ld\n", portno);
1003                 return 1;
1004         }
1005
1006         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1007         return 0;
1008 }
1009
1010 int cmd_signal(uint32_t portno)
1011 {
1012         return xseg_signal(xseg, portno);
1013 }
1014
1015 int parse_ports(char *str)
1016 {
1017         int ret = 0;
1018         char *s = str;
1019
1020         for (;;) {
1021                 if (*s == 0)
1022                         return 0;
1023
1024                 if (*s == ':') {
1025                         *s = 0;
1026                         if ((s > str) && isdigit(str[0])) {
1027                                 srcport = atol(str);
1028                                 ret ++;
1029                         }
1030                         break;
1031                 }
1032                 s ++;
1033         }
1034
1035         s += 1;
1036         str = s;
1037
1038         for (;;) {
1039                 if (*s == 0) {
1040                         if ((s > str) && isdigit(str[0])) {
1041                                 dstport = atol(str);
1042                                 ret ++;
1043                         }
1044                         break;
1045                 }
1046                 s ++;
1047         }
1048
1049         return ret;
1050 }
1051
1052 int main(int argc, char **argv)
1053 {
1054         int i, ret = 0;
1055         char *spec;
1056
1057         if (argc < 3)
1058                 return help();
1059
1060         srcport = -1;
1061         dstport = -1;
1062         spec = argv[1];
1063
1064         if (xseg_parse_spec(spec, &cfg)) {
1065                 fprintf(stderr, "Cannot parse spec\n");
1066                 return -1;
1067         }
1068
1069         if (xseg_initialize()) {
1070                 fprintf(stderr, "cannot initialize!\n");
1071                 return -1;
1072         }
1073
1074         for (i = 2; i < argc; i++) {
1075
1076                 if (!strcmp(argv[i], "create")) {
1077                         ret = cmd_create();
1078                         continue;
1079                 }
1080
1081                 if (!strcmp(argv[i], "join")) {
1082                         ret = cmd_join();
1083                         if (!ret)
1084                                 fprintf(stderr, "Segment joined.\n");
1085                         continue;
1086                 }
1087
1088                 if (!strcmp(argv[i], "destroy")) {
1089                         ret = cmd_destroy();
1090                         continue;
1091                 }
1092
1093                 if (cmd_join())
1094                         return -1;
1095
1096                 if (!strcmp(argv[i], "reportall")) {
1097                         ret = cmd_reportall();
1098                         continue;
1099                 }
1100
1101                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1102                         ret = cmd_bind(atol(argv[i+1]));
1103                         i += 1;
1104                         continue;
1105                 }
1106
1107                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1108                         ret = cmd_signal(atol(argv[i+1]));
1109                         i += 1;
1110                         continue;
1111                 }
1112
1113                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1114                         ret = cmd_bridge(atol(argv[i+1]),
1115                                          atol(argv[i+2]),
1116                                          argv[i+3],
1117                                          argv[i+4]);
1118                         i += 4;
1119                         continue;
1120                 }
1121
1122                 if (srcport == -1) {
1123                         if (!parse_ports(argv[i]))
1124                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1125                         continue;
1126                 }
1127
1128                 if (dstport == -1) {
1129                         if (!parse_ports(argv[i]))
1130                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1131                         continue;
1132                 }
1133
1134                 if (!strcmp(argv[i], "report")) {
1135                         ret = cmd_report(dstport);
1136                         continue;
1137                 }
1138
1139                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1140                         ret = cmd_alloc_requests(atol(argv[i+1]));
1141                         i += 1;
1142                         continue;
1143                 }
1144
1145                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1146                         ret = cmd_free_requests(atol(argv[i+1]));
1147                         i += 1;
1148                         continue;
1149                 }
1150
1151                 if (!strcmp(argv[i], "put_requests")) {
1152                         ret = cmd_put_requests();
1153                         continue;
1154                 }
1155
1156                 if (!strcmp(argv[i], "put_replies")) {
1157                         ret = cmd_put_replies();
1158                         continue;
1159                 }
1160
1161                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1162                         ret = cmd_finish(atol(argv[i+1]), 0);
1163                         i += 1;
1164                         continue;
1165                 }
1166
1167                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1168                         ret = cmd_finish(atol(argv[i+1]), 1);
1169                         i += 1;
1170                         continue;
1171                 }
1172
1173                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1174                         ret = cmd_wait(atol(argv[i+1]));
1175                         i += 1;
1176                         continue;
1177                 }
1178
1179                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1180                         long nr_loops = atol(argv[i+1]);
1181                         unsigned int seed = atoi(argv[i+2]);
1182                         unsigned int targetlen = atoi(argv[i+3]);
1183                         unsigned int chunksize = atoi(argv[i+4]);
1184                         unsigned long objectsize = atol(argv[i+5]);
1185                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1186                         i += 5;
1187                         continue;
1188                 }
1189
1190                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1191                         long nr_loops = atol(argv[i+1]);
1192                         unsigned int seed = atoi(argv[i+2]);
1193                         unsigned int targetlen = atoi(argv[i+3]);
1194                         unsigned int chunksize = atoi(argv[i+4]);
1195                         unsigned long objectsize = atol(argv[i+5]);
1196                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1197                         i += 5;
1198                         continue;
1199                 }
1200
1201                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1202                         long nr_loops = atol(argv[i+1]);
1203                         long concurrent_reqs = atol(argv[i+2]);
1204                         int op = atoi(argv[i+3]);
1205                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1206                         i += 3;
1207                         continue;
1208                 }
1209
1210                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1211                         char *target = argv[i+1];
1212                         uint64_t offset = atol(argv[i+2]);
1213                         uint64_t size   = atol(argv[i+3]);
1214                         ret = cmd_read(target, offset, size);
1215                         i += 3;
1216                         continue;
1217                 }
1218
1219                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1220                         char *target = argv[i+1];
1221                         uint64_t offset = atol(argv[i+2]);
1222                         ret = cmd_write(target, offset);
1223                         i += 2;
1224                         continue;
1225                 }
1226
1227                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1228                         char *target = argv[i+1];
1229                         uint64_t offset = atol(argv[i+2]);
1230                         ret = cmd_truncate(target, offset);
1231                         i += 2;
1232                         continue;
1233                 }
1234
1235                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1236                         char *target = argv[i+1];
1237                         ret = cmd_delete(target);
1238                         i += 1;
1239                         continue;
1240                 }
1241
1242                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1243                         char *target = argv[i+1];
1244                         ret = cmd_acquire(target);
1245                         i += 1;
1246                         continue;
1247                 }
1248
1249                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1250                         char *target = argv[i+1];
1251                         ret = cmd_release(target);
1252                         i += 1;
1253                         continue;
1254                 }
1255
1256                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1257                         char *src = argv[i+1];
1258                         char *dst = argv[i+2];
1259                         ret = cmd_copy(src, dst);
1260                         i += 2;
1261                         continue;
1262                 }
1263
1264                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1265                         char *src = argv[i+1];
1266                         char *dst = argv[i+2];
1267                         ret = cmd_clone(src, dst);
1268                         i += 2;
1269                         continue;
1270                 }
1271
1272                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1273                         char *target = argv[i+1];
1274                         ret = cmd_info(target);
1275                         i += 1;
1276                         continue;
1277                 }
1278
1279
1280                 if (!parse_ports(argv[i]))
1281                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1282         }
1283
1284         /* xseg_leave(); */
1285         return ret;
1286 }