split signal initialization into local and remote part
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait        <nr_replies>\n"
33                 "    complete    <nr_requests>\n"
34                 "    fail        <nr_requests>\n"
35                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
37                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
38                 "    info        <target>\n"
39                 "    read        <target> <offset> <size>\n"
40                 "    write       <target> <offset> < data\n"
41                 "    truncate    <target> <size>\n"
42                 "    delete      <target>\n"
43                 "    acquire     <target>\n"
44                 "    release     <target>\n"
45                 "    copy        <src>  <dst>\n"
46                 "    clone       <src>  <dst>\n"
47         );
48         return 1;
49 }
50
51 char *namebuf;
52 char *chunk;
53 struct xseg_config cfg;
54 struct xseg *xseg;
55 uint32_t srcport, dstport;
56 uint64_t reqs;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
60  */
61
62 xport sport = NoPort;
63 static void init_local_signal() 
64 {
65         if (xseg && sport != srcport){
66                 xseg_init_local_signal(xseg, srcport);
67                 sport = srcport;
68         }
69 }
70
71 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
72 {
73         int i;
74         char c;
75         for (i = 0; i < namelen; i += 1) {
76                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
77                 c = '0' + ((c + (c >> 4)) & 0xf);
78                 if (c > '9')
79                         c += 'a'-'0'-10;
80                 name[i] = c;
81                 seed *= ((seed % 137911) | 1) * 137911;
82         }
83 }
84
85 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
86 {
87         int i;
88         char c;
89         for (i = 0; i < namelen; i += 1) {
90                 c = seed;
91                 name[i] = 'A' + (c & 0xf);
92                 seed += 1;
93         }
94 }
95
96 uint64_t pick(uint64_t size)
97 {
98         return (uint64_t)((double)(RAND_MAX) / random());
99 }
100
101 void mkchunk(   char *chunk, uint32_t datalen,
102                 char *target, uint32_t targetlen, uint64_t offset)
103 {
104         long i, r, bufsize = targetlen + 16;
105         char buf[bufsize];
106         r = datalen % bufsize;
107         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
108
109         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
110                 memcpy(chunk + i, buf, bufsize);
111
112         memcpy(chunk + datalen - r, buf, r);
113 }
114
115 int chkchunk(   char *chunk, uint32_t datalen,
116                 char *target, uint32_t targetlen, uint64_t offset)
117 {
118         long i, r;
119         int bufsize = targetlen + 16;
120         char buf[bufsize];
121         r = datalen % targetlen;
122         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
123
124         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125                 if (memcmp(chunk + i, buf, bufsize)) {
126                         /*printf("mismatch: '%*s'* vs '%*s'\n",
127                                 bufsize, buf, datalen, chunk);
128                         */
129                         return 0;
130                 }
131
132         if (memcmp(chunk + datalen - r, buf, r))
133                 return 0;
134
135         return 1;
136 }
137
138
139 #define ALLOC_MIN 4096
140 #define ALLOC_MAX 1048576
141
142 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
143 {
144         static uint64_t alloc_size;
145         static char *buf;
146         uint64_t size = 0;
147         char *p;
148         size_t r;
149
150         if (alloc_size < ALLOC_MIN)
151                 alloc_size = ALLOC_MIN;
152
153         if (alloc_size > ALLOC_MAX)
154                 alloc_size = ALLOC_MAX;
155
156         p = realloc(buf, alloc_size);
157         if (!p) {
158                 if (buf)
159                         free(buf);
160                 buf = NULL;
161                 goto out;
162         }
163
164         buf = p;
165
166         while (!feof(fp)) {
167                 r = fread(buf + size, 1, alloc_size - size, fp);
168                 if (!r)
169                         break;
170                 size += r;
171                 if (size >= alloc_size) {
172                         p = realloc(buf, alloc_size * 2);
173                         if (!p) {
174                                 if (buf)
175                                         free(buf);
176                                 buf = NULL;
177                                 size = 0;
178                                 goto out;
179                         }
180                         buf = p;
181                         alloc_size *= 2;
182                 }
183         }
184
185 out:
186         *retbuf = buf;
187         *retsize = size;
188 }
189
190 void report_request(struct xseg_request *req)
191 {
192         uint32_t max = req->datalen;
193         char *data = xseg_get_data(xseg, req);
194         if (max > 128)
195                 max = 128;
196         data[max-1] = 0;
197         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
198 }
199
200 int cmd_info(char *target)
201 {
202         uint32_t targetlen = strlen(target);
203         size_t size = sizeof(uint64_t);
204         int r;
205         xport p;
206         struct xseg_request *req;
207         char *req_target;
208
209         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
210         if (!req) {
211                 fprintf(stderr, "No request!\n");
212                 return -1;
213         }
214
215         r = xseg_prep_request(xseg, req, targetlen, size);
216         if (r < 0) {
217                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
218                         (unsigned long) targetlen, (unsigned long) size);
219                 xseg_put_request(xseg, req, srcport);
220                 return -1;
221         }
222
223         req_target = xseg_get_target(xseg, req);
224         strncpy(req_target, target, targetlen);
225         req->offset = 0;
226         req->size = size;
227         req->op = X_INFO;
228
229         p = xseg_submit(xseg, req, srcport, X_ALLOC);
230         if (p == NoPort)
231                 return -1;
232
233         xseg_signal(xseg, p);
234
235         return 0;
236 }
237
238 int cmd_read(char *target, uint64_t offset, uint64_t size)
239 {
240         uint32_t targetlen = strlen(target);
241         int r;
242         xport p;
243         char *req_target;
244         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
245         printf("%x\n", req);
246         if (!req) {
247                 fprintf(stderr, "No request\n");
248                 return -1;
249         }
250
251         r = xseg_prep_request(xseg, req, targetlen, size);
252         if (r < 0) {
253                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
254                         (unsigned long)targetlen, (unsigned long long)size);
255                 xseg_put_request(xseg, req, srcport);
256                 return -1;
257         }
258
259         req_target = xseg_get_target(xseg, req);
260         strncpy(req_target, target, targetlen);
261         req->offset = offset;
262         req->size = size;
263         req->op = X_READ;
264         report_request(req);
265         p = xseg_submit(xseg, req, srcport, X_ALLOC);
266         if (p == NoPort)
267                 return -1;
268
269         xseg_signal(xseg, p);
270         return 0;
271 }
272
273 int cmd_write(char *target, uint64_t offset)
274 {
275         char *buf = NULL;
276         int r;
277         xport p;
278         uint64_t size = 0;
279         char *req_target, *req_data;
280         uint32_t targetlen = strlen(target);
281         struct xseg_request *req;
282
283         inputbuf(stdin, &buf, &size);
284         if (!size) {
285                 fprintf(stderr, "No input\n");
286                 return -1;
287         }
288
289         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
290         if (!req) {
291                 fprintf(stderr, "No request\n");
292                 return -1;
293         }
294
295         r = xseg_prep_request(xseg, req, targetlen, size);
296         if (r < 0) {
297                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
298                         (unsigned long)targetlen, (unsigned long long)size);
299                 xseg_put_request(xseg, req, srcport);
300                 return -1;
301         }
302
303         req_target = xseg_get_target(xseg, req);
304         strncpy(req_target, target, targetlen);
305         
306         req_data = xseg_get_data(xseg, req);
307         memcpy(req_data, buf, size);
308         req->offset = offset;
309         req->size = size;
310         req->op = X_WRITE;
311
312         p = xseg_submit(xseg, req, srcport, X_ALLOC);
313         if (p == NoPort) {
314                 fprintf(stderr, "Cannot submit\n");
315                 return -1;
316         }
317
318         return 0;
319 }
320
321 int cmd_truncate(char *target, uint64_t offset)
322 {
323         return 0;
324 }
325
326 int cmd_delete(char *target)
327 {
328         return 0;
329 }
330
331 int cmd_acquire(char *target)
332 {
333         return 0;
334 }
335
336 int cmd_release(char *target)
337 {
338         return 0;
339 }
340
341 int cmd_copy(char *src, char *dst)
342 {
343         return 0;
344 }
345
346 int cmd_clone(char *src, char *dst)
347 {
348         return 0;
349 }
350
351 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
352                 struct xseg_request *req)
353 {
354         FILE *logfp;
355         char target[64], data[64];
356         char *req_target, *req_data;
357         /* null terminate name in case of req->target is less than 63 characters,
358          * and next character after name (aka first byte of next buffer) is not
359          * null
360          */
361         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
362         
363         req_target = xseg_get_target(xseg, req);
364         req_data = xseg_get_data(xseg, req);
365
366         logfp = fdopen(logfd, "a");
367         if (!logfp)
368                 return;
369
370         switch(method) {
371         case 0:
372                 strncpy(target, req_target, end);
373                 target[end] = 0;
374                 strncpy(data, req_data, 63);
375                 data[63] = 0;
376
377                 fprintf(logfp,
378                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
379                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
380                         (unsigned int)portno1,
381                         (unsigned int)portno2,
382                         (unsigned int)req->op,
383                         (unsigned long long)req->offset,
384                         (unsigned long)req->size,
385                         (unsigned int)req->state,
386                         (unsigned int)req->targetlen, target,
387                         (unsigned long long)req->datalen, data);
388                 break;
389         case 1:
390                 fprintf(logfp,
391                         "src port: %u, dst port: %u, op: %u\n",
392                         (unsigned int)portno1,
393                         (unsigned int)portno2,
394                         (unsigned int)req->op);
395                 break;
396         case 2:
397                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
398                         (unsigned int)portno1,
399                         (unsigned int)portno2,
400                         (unsigned long long)++reqs);
401         }
402
403         fclose(logfp);
404         return;
405 }
406
407 #define LOG_ACCEPT  0
408 #define LOG_RECEIVE 1
409
410 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
411 {
412         struct xseg_request *req;
413         int logfd, method;
414         if (!strcmp(logfile, "-"))
415                 logfd = 1;
416         else {
417                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
418                 if (logfd < 0) {
419                         perror(logfile);
420                         return -1;
421                 }
422         }
423
424         if (!strcmp(how, "full"))
425                 method = 0;
426         else if (!strcmp(how, "summary"))
427                 method = 1;
428         else
429                 method = 2;
430
431         for (;;) {
432                 int reloop = 0, active;
433                 xseg_prepare_wait(xseg, portno1);
434                 xseg_prepare_wait(xseg, portno2);
435                 req = NULL;
436
437                 for (;;) {
438                         active = 0;
439
440                         //FIXME
441                         req = xseg_accept(xseg, portno1);
442                         if (req) {
443                                 xseg_submit(xseg, req, portno2, X_ALLOC);
444                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
445                                 active += 1;
446                         }
447
448                         req = xseg_accept(xseg, portno2);
449                         if (req) {
450                                 xseg_submit(xseg, req, portno1, X_ALLOC);
451                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
452                                 active += 1;
453                         }
454
455                         req = xseg_receive(xseg, portno1);
456                         if (req) {
457                                 xseg_respond(xseg, req, portno2, X_ALLOC);
458                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
459                                 active += 1;
460                         }
461
462                         req = xseg_receive(xseg, portno2);
463                         if (req) {
464                                 xseg_respond(xseg, req, portno1, X_ALLOC);
465                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
466                                 active += 1;
467                         }
468
469                         if (active == 0) {
470                                 if (reloop)
471                                         break;
472                                 /* wait on multiple queues? */
473                                 xseg_wait_signal(xseg, 100000);
474                                 break;
475                         } else {
476                                 xseg_cancel_wait(xseg, portno1);        
477                                 xseg_cancel_wait(xseg, portno2);        
478                                 reloop = 1;
479                         }
480                 }
481         }
482
483         close(logfd);
484
485         return 0;
486 }
487
488 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
489 {
490         if (loops < 0)
491                 return help();
492
493         if (targetlen >= chunksize) {
494                 fprintf(stderr, "targetlen >= chunksize\n");
495                 return -1;
496         }
497
498         char *p = realloc(namebuf, targetlen+1);
499         if (!p) {
500                 fprintf(stderr, "Cannot allocate memory\n");
501                 return -1;
502         }
503         namebuf = p;
504
505         p = realloc(chunk, chunksize);
506         if (!p) {
507                 fprintf(stderr, "Cannot allocate memory\n");
508                 return -1;
509         }
510         chunk = p;
511         memset(chunk, 0, chunksize);
512
513         srandom(seed);
514
515         struct xseg_request *submitted = NULL, *received;
516         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
517         int reported = 0, r;
518         uint64_t offset;
519         xport port;
520         char *req_data, *req_target;
521         seed = random();
522         init_local_signal();
523
524         for (;;) {
525                 xseg_prepare_wait(xseg, srcport);
526                 if (nr_submitted < loops &&
527                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
528                         xseg_cancel_wait(xseg, srcport);
529                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
530                         if (r < 0) {
531                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
532                                         targetlen, chunksize);
533                                 xseg_put_request(xseg, submitted, srcport);
534                                 return -1;
535                         }
536                         
537                         req_target = xseg_get_target(xseg, submitted);
538                         req_data = xseg_get_data(xseg, submitted);
539
540                         reported = 0;
541                         mkname(namebuf, targetlen, seed);
542                         namebuf[targetlen] = 0;
543                         //printf("%ld: %s\n", nr_submitted, namebuf);
544                         strncpy(req_target, namebuf, targetlen);
545                         offset = 0;// pick(size);
546                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
547
548                         submitted->offset = offset;
549                         submitted->size = chunksize;
550                         submitted->op = X_WRITE;
551                         submitted->flags |= XF_NOSYNC;
552
553                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
554                         if (port == NoPort) {
555                                 xseg_put_request(xseg, submitted, srcport);
556                         } else {
557                                 seed = random();
558                                 nr_submitted += 1;
559                                 xseg_signal(xseg, port);
560                         }
561                 }
562
563                 received = xseg_receive(xseg, srcport);
564                 if (received) {
565                         xseg_cancel_wait(xseg, srcport);
566                         nr_received += 1;
567                         if (!(received->state & XS_SERVED)) {
568                                 nr_failed += 1;
569                                 report_request(received);
570                         }
571                         if (xseg_put_request(xseg, received, srcport))
572                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
573                 }
574
575                 if (!submitted && !received)
576                         xseg_wait_signal(xseg, 1000000);
577
578                         if (nr_submitted % 1000 == 0 && !reported) {
579                                 reported = 1;
580                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
581                                         nr_submitted, nr_received, nr_failed);
582                         }
583
584                         if (nr_received >= loops)
585                                 break;
586         }
587
588         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
589                 nr_submitted, nr_received, nr_failed);
590         return 0;
591 }
592
593 /* note:
594  * prepare/wait rhythm,
595  * files are converted to independent chunk access patterns,
596 */
597
598 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
599 {
600         if (loops < 0)
601                 return help();
602
603         if (targetlen >= chunksize) {
604                 fprintf(stderr, "targetlen >= chunksize\n");
605                 return -1;
606         }
607
608         char *p = realloc(namebuf, targetlen+1);
609         if (!p) {
610                 fprintf(stderr, "Cannot allocate memory\n");
611                 return -1;
612         }
613         namebuf = p;
614
615         p = realloc(chunk, chunksize);
616         if (!p) {
617                 fprintf(stderr, "Cannot allocate memory\n");
618                 return -1;
619         }
620         chunk = p;
621         memset(chunk, 0, chunksize);
622
623         srandom(seed);
624
625         struct xseg_request *submitted = NULL, *received;
626         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
627         int reported = 0, r;
628         uint64_t offset;
629         xport port;
630         char *req_data, *req_target;
631         init_local_signal();
632
633         seed = random();
634         for (;;) {
635                 submitted = NULL;
636                 xseg_prepare_wait(xseg, srcport);
637                 if (nr_submitted < loops &&
638                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
639                         xseg_cancel_wait(xseg, srcport);
640                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
641                         if (r < 0) {
642                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
643                                         targetlen, chunksize);
644                                 xseg_put_request(xseg, submitted, srcport);
645                                 return -1;
646                         }
647
648                         req_target = xseg_get_target(xseg, submitted);
649                         reported = 0;
650                         mkname(namebuf, targetlen, seed);
651                         namebuf[targetlen] = 0;
652                         //printf("%ld: %s\n", nr_submitted, namebuf);
653                         offset = 0;//pick(size);
654
655                         strncpy(req_target, namebuf, targetlen);
656                         submitted->offset = offset;
657                         submitted->size = chunksize;
658                         submitted->op = X_READ;
659                         port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
660                         if (port == NoPort) {
661                                 xseg_put_request(xseg, submitted, srcport);
662                         } else {
663                                 seed = random();
664                                 nr_submitted += 1;
665                                 xseg_signal(xseg, port);
666                         }
667                 }
668
669                 received = xseg_receive(xseg, srcport);
670                 if (received) {
671                         xseg_cancel_wait(xseg, srcport);
672                         nr_received += 1;
673                         req_target = xseg_get_target(xseg, received);
674                         req_data = xseg_get_data(xseg, received);
675                         if (!(received->state & XS_SERVED)) {
676                                 nr_failed += 1;
677                                 report_request(received);
678                         } else if (!chkchunk(req_data, received->datalen,
679                                         req_target, received->targetlen, received->offset)) {
680         //                      report_request(received);
681                                 nr_mismatch += 1;
682                         }
683
684                         if (xseg_put_request(xseg, received, srcport))
685                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
686                 }
687
688                 if (!submitted && !received)
689                         xseg_wait_signal(xseg, 1000000);
690
691                 if (nr_submitted % 1000 == 0 && !reported) {
692                         reported = 1;
693                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
694                         nr_submitted, nr_received, nr_failed, nr_mismatch);
695                 }
696
697                 if (nr_received >= loops)
698                         break;
699         }
700
701         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
702                 nr_submitted, nr_received, nr_failed, nr_mismatch);
703         return 0;
704 }
705
706 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
707 {
708         if (loops < 0)
709                 return help();
710
711         struct xseg_request *submitted = NULL, *received;
712         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
713         int reported = 0, r;
714         uint64_t offset;
715         uint32_t targetlen = 10, chunksize = 4096;
716         struct timeval tv1, tv2;
717         xport p;
718         char *req_data, *req_target;
719
720         xseg_bind_port(xseg, srcport);
721
722         gettimeofday(&tv1, NULL);
723         for (;;) {
724                 submitted = NULL;
725                 xseg_prepare_wait(xseg, srcport);
726                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
727                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
728                         xseg_cancel_wait(xseg, srcport);
729                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
730                         if (r < 0) {
731                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
732                                         targetlen, chunksize);
733                                 xseg_put_request(xseg, submitted, srcport);
734                                 return -1;
735                         }
736                         
737                         //FIXME
738                         ++nr_flying;
739                         nr_submitted += 1;
740                         reported = 0;
741                         offset = 0;//pick(size);
742
743                         submitted->offset = offset;
744                         submitted->size = chunksize;
745                         req_target = xseg_get_target(xseg, submitted);
746                         req_data = xseg_get_data(xseg, submitted);
747
748                         if (op == 0)
749                                 submitted->op = X_INFO;
750                         else if (op == 1)
751                                 submitted->op = X_READ;
752                         else if (op == 2) {
753                                 submitted->op = X_WRITE;
754                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
755                         }
756
757                         p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
758                         if ( p != NoPort){
759                                 if (xseg_signal(xseg, p) < 0)
760                                         perror("Cannot signal peer");
761                         }
762                 }
763                 received = xseg_receive(xseg, srcport);
764                 if (received) {
765                         xseg_cancel_wait(xseg, srcport);
766                         --nr_flying;
767                         if (nr_received == 0)
768                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
769                                         (unsigned long long)received->elapsed);
770                         nr_received += 1;
771                         if (!(received->state & XS_SERVED)) {
772                                 nr_failed += 1;
773                                 //report_request(received);
774                         }
775
776                         if (xseg_put_request(xseg, received, srcport))
777                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
778                 }
779
780                 if (!submitted && !received)
781                         xseg_wait_signal(xseg, 10000000L);
782
783                 if (nr_received >= loops)
784                         break;
785         }
786         gettimeofday(&tv2, NULL);
787
788         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
789                 nr_submitted, nr_received, nr_failed, nr_mismatch);
790         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
791         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
792
793         return 0;
794 }
795
796 int cmd_report(uint32_t portno)
797 {
798         struct xseg_port *port = xseg_get_port(xseg, portno);
799         if (!port) {
800                 printf("port %u is not assigned\n", portno);
801                 return 0;
802         }
803         struct xq *fq, *rq, *pq;
804         fq = xseg_get_queue(xseg, port, free_queue);
805         rq = xseg_get_queue(xseg, port, request_queue);
806         pq = xseg_get_queue(xseg, port, reply_queue);
807         fprintf(stderr, "port %u:\n"
808                 "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
809                 "       free_queue [%p] count : %u\n"
810                 "    request_queue [%p] count : %u\n"
811                 "      reply_queue [%p] count : %u\n",
812                 portno, port->alloc_reqs, port->max_alloc_reqs,
813                 xseg->src_gw[portno], xseg->dst_gw[portno],
814                 (void *)fq, xq_count(fq),
815                 (void *)rq, xq_count(rq),
816                 (void *)pq, xq_count(pq));
817         return 0;
818 }
819
820 int cmd_join(void)
821 {
822         if (xseg)
823                 return 0;
824
825         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
826         if (!xseg) {
827                 fprintf(stderr, "cannot join segment!\n");
828                 return -1;
829         }
830         return 0;
831 }
832
833 int cmd_reportall(void)
834 {
835         uint32_t t;
836
837         if (cmd_join())
838                 return -1;
839
840
841         fprintf(stderr, "Heap usage: %llu / %llu\n", xseg->heap->cur, xseg->config.heap_size);
842         for (t = 0; t < xseg->config.nr_ports; t++)
843                 cmd_report(t);
844
845         return 0;
846 }
847
848 int cmd_create(void)
849 {
850         int r = xseg_create(&cfg);
851         if (r) {
852                 fprintf(stderr, "cannot create segment!\n");
853                 return -1;
854         }
855
856         fprintf(stderr, "Segment initialized.\n");
857         return 0;
858 }
859
860 int cmd_destroy(void)
861 {
862         if (!xseg && cmd_join())
863                 return -1;
864         xseg_leave(xseg);
865         xseg_destroy(xseg);
866         xseg = NULL;
867         fprintf(stderr, "Segment destroyed.\n");
868         return 0;
869 }
870
871 int cmd_alloc_requests(unsigned long nr)
872 {
873         return xseg_alloc_requests(xseg, srcport, nr);
874 }
875
876 int cmd_free_requests(unsigned long nr)
877 {
878         return xseg_free_requests(xseg, srcport, nr);
879 }
880
881 int cmd_put_requests(void)
882 {
883         struct xseg_request *req;
884
885         for (;;) {
886                 req = xseg_accept(xseg, dstport);
887                 if (!req)
888                         break;
889                 if (xseg_put_request(xseg, req, srcport))
890                         fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
891         }
892
893         return 0;
894 }
895
896 int cmd_finish(unsigned long nr, int fail)
897 {
898         struct xseg_request *req;
899         char *buf = malloc(sizeof(char) * 8128);
900         char *req_target, *req_data;
901         xseg_bind_port(xseg, srcport);
902         xport p;
903
904         for (; nr--;) {
905                 xseg_prepare_wait(xseg, srcport);
906                 req = xseg_accept(xseg, srcport);
907                 if (req) {
908                         req_target = xseg_get_target(xseg, req);
909                         req_data = xseg_get_data(xseg, req);
910                         xseg_cancel_wait(xseg, srcport);
911                         if (fail == 1)
912                                 req->state &= ~XS_SERVED;
913                         else {
914                                 if (req->op == X_READ)
915                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
916                                 else if (req->op == X_WRITE) 
917                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
918                                 else if (req->op == X_INFO)
919                                         *((uint64_t *) req->data) = 4294967296;
920                                 
921                                 req->state |= XS_SERVED;
922                                 req->serviced = req->size;
923                         }
924
925                         p = xseg_respond(xseg, req, srcport, X_ALLOC);
926                         xseg_signal(xseg, p);
927                         continue;
928                 }
929                 ++nr;
930                 xseg_wait_signal(xseg, 10000000L);
931         }
932
933         free(buf);
934
935         return 0;
936 }
937
938 void handle_reply(struct xseg_request *req)
939 {
940         char *req_data = xseg_get_data(xseg, req);
941         if (!(req->state & XS_SERVED)) {
942                 report_request(req);
943                 goto put;
944         }
945
946         switch (req->op) {
947         case X_READ:
948                 fwrite(req_data, 1, req->datalen, stdout);
949                 break;
950
951         case X_WRITE:
952         case X_SYNC:
953         case X_DELETE:
954         case X_TRUNCATE:
955         case X_COMMIT:
956         case X_CLONE:
957         case X_INFO:
958                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
959                 break;
960
961         default:
962                 break;
963         }
964
965 put:
966         if (xseg_put_request(xseg, req, srcport))
967                 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
968 }
969
970 int cmd_wait(uint32_t nr)
971 {
972         struct xseg_request *req;
973         long ret;
974
975         for (;;) {
976                 req = xseg_receive(xseg, srcport);
977                 if (req) {
978                         handle_reply(req);
979                         nr--;
980                         if (nr == 0)
981                                 break;
982                         continue;
983                 }
984
985                 ret = xseg_prepare_wait(xseg, srcport);
986                 if (ret)
987                         return -1;
988
989                 ret = xseg_wait_signal(xseg, 1000000);
990                 ret = xseg_cancel_wait(xseg, srcport);
991                 if (ret)
992                         return -1;
993         }
994
995         return 0;
996 }
997
998 int cmd_put_replies(void)
999 {
1000         struct xseg_request *req;
1001
1002         for (;;) {
1003                 req = xseg_receive(xseg, dstport);
1004                 if (!req)
1005                         break;
1006                 fprintf(stderr, "request: %08llx%08llx\n"
1007                         "     op: %u\n"
1008                         "  state: %u\n",
1009                         0LL, (unsigned long long)req->serial,
1010                         req->op,
1011                         req->state);
1012                 report_request(req);
1013
1014                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1015
1016                 if (xseg_put_request(xseg, req, srcport))
1017                         fprintf(stderr, "Cannot put reply\n");
1018         }
1019
1020         return 0;
1021 }
1022
1023 int cmd_bind(long portno)
1024 {
1025         struct xseg_port *port = xseg_bind_port(xseg, portno);
1026         if (!port) {
1027                 fprintf(stderr, "failed to bind port %ld\n", portno);
1028                 return 1;
1029         }
1030
1031         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1032         return 0;
1033 }
1034
1035 int cmd_signal(uint32_t portno)
1036 {
1037         return xseg_signal(xseg, portno);
1038 }
1039
1040 int parse_ports(char *str)
1041 {
1042         int ret = 0;
1043         char *s = str;
1044
1045         for (;;) {
1046                 if (*s == 0)
1047                         return 0;
1048
1049                 if (*s == ':') {
1050                         *s = 0;
1051                         if ((s > str) && isdigit(str[0])) {
1052                                 srcport = atol(str);
1053                                 ret ++;
1054                         }
1055                         break;
1056                 }
1057                 s ++;
1058         }
1059
1060         s += 1;
1061         str = s;
1062
1063         for (;;) {
1064                 if (*s == 0) {
1065                         if ((s > str) && isdigit(str[0])) {
1066                                 dstport = atol(str);
1067                                 ret ++;
1068                         }
1069                         break;
1070                 }
1071                 s ++;
1072         }
1073
1074         return ret;
1075 }
1076
1077 int main(int argc, char **argv)
1078 {
1079         int i, ret = 0;
1080         char *spec;
1081
1082         if (argc < 3)
1083                 return help();
1084
1085         srcport = -1;
1086         dstport = -1;
1087         spec = argv[1];
1088
1089         if (xseg_parse_spec(spec, &cfg)) {
1090                 fprintf(stderr, "Cannot parse spec\n");
1091                 return -1;
1092         }
1093
1094         if (xseg_initialize()) {
1095                 fprintf(stderr, "cannot initialize!\n");
1096                 return -1;
1097         }
1098
1099         for (i = 2; i < argc; i++) {
1100
1101                 if (!strcmp(argv[i], "create")) {
1102                         ret = cmd_create();
1103                         continue;
1104                 }
1105
1106                 if (!strcmp(argv[i], "join")) {
1107                         ret = cmd_join();
1108                         if (!ret)
1109                                 fprintf(stderr, "Segment joined.\n");
1110                         continue;
1111                 }
1112
1113                 if (!strcmp(argv[i], "destroy")) {
1114                         ret = cmd_destroy();
1115                         continue;
1116                 }
1117
1118                 if (cmd_join())
1119                         return -1;
1120
1121                 if (!strcmp(argv[i], "reportall")) {
1122                         ret = cmd_reportall();
1123                         continue;
1124                 }
1125
1126                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1127                         ret = cmd_bind(atol(argv[i+1]));
1128                         i += 1;
1129                         continue;
1130                 }
1131
1132                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1133                         ret = cmd_signal(atol(argv[i+1]));
1134                         i += 1;
1135                         continue;
1136                 }
1137
1138                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1139                         ret = cmd_bridge(atol(argv[i+1]),
1140                                          atol(argv[i+2]),
1141                                          argv[i+3],
1142                                          argv[i+4]);
1143                         i += 4;
1144                         continue;
1145                 }
1146
1147                 if (srcport == -1) {
1148                         if (!parse_ports(argv[i]))
1149                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1150                         continue;
1151                 }
1152
1153                 if (dstport == -1) {
1154                         if (!parse_ports(argv[i]))
1155                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1156                         continue;
1157                 }
1158
1159                 if (!strcmp(argv[i], "report")) {
1160                         ret = cmd_report(dstport);
1161                         continue;
1162                 }
1163
1164                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1165                         ret = cmd_alloc_requests(atol(argv[i+1]));
1166                         i += 1;
1167                         continue;
1168                 }
1169
1170                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1171                         ret = cmd_free_requests(atol(argv[i+1]));
1172                         i += 1;
1173                         continue;
1174                 }
1175
1176                 if (!strcmp(argv[i], "put_requests")) {
1177                         ret = cmd_put_requests();
1178                         continue;
1179                 }
1180
1181                 if (!strcmp(argv[i], "put_replies")) {
1182                         ret = cmd_put_replies();
1183                         continue;
1184                 }
1185
1186                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1187                         ret = cmd_finish(atol(argv[i+1]), 0);
1188                         i += 1;
1189                         continue;
1190                 }
1191
1192                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1193                         ret = cmd_finish(atol(argv[i+1]), 1);
1194                         i += 1;
1195                         continue;
1196                 }
1197
1198                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1199                         ret = cmd_wait(atol(argv[i+1]));
1200                         i += 1;
1201                         continue;
1202                 }
1203
1204                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1205                         long nr_loops = atol(argv[i+1]);
1206                         unsigned int seed = atoi(argv[i+2]);
1207                         unsigned int targetlen = atoi(argv[i+3]);
1208                         unsigned int chunksize = atoi(argv[i+4]);
1209                         unsigned long objectsize = atol(argv[i+5]);
1210                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1211                         i += 5;
1212                         continue;
1213                 }
1214
1215                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1216                         long nr_loops = atol(argv[i+1]);
1217                         unsigned int seed = atoi(argv[i+2]);
1218                         unsigned int targetlen = atoi(argv[i+3]);
1219                         unsigned int chunksize = atoi(argv[i+4]);
1220                         unsigned long objectsize = atol(argv[i+5]);
1221                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1222                         i += 5;
1223                         continue;
1224                 }
1225
1226                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1227                         long nr_loops = atol(argv[i+1]);
1228                         long concurrent_reqs = atol(argv[i+2]);
1229                         int op = atoi(argv[i+3]);
1230                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1231                         i += 3;
1232                         continue;
1233                 }
1234
1235                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1236                         char *target = argv[i+1];
1237                         uint64_t offset = atol(argv[i+2]);
1238                         uint64_t size   = atol(argv[i+3]);
1239                         ret = cmd_read(target, offset, size);
1240                         i += 3;
1241                         continue;
1242                 }
1243
1244                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1245                         char *target = argv[i+1];
1246                         uint64_t offset = atol(argv[i+2]);
1247                         ret = cmd_write(target, offset);
1248                         i += 2;
1249                         continue;
1250                 }
1251
1252                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1253                         char *target = argv[i+1];
1254                         uint64_t offset = atol(argv[i+2]);
1255                         ret = cmd_truncate(target, offset);
1256                         i += 2;
1257                         continue;
1258                 }
1259
1260                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1261                         char *target = argv[i+1];
1262                         ret = cmd_delete(target);
1263                         i += 1;
1264                         continue;
1265                 }
1266
1267                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1268                         char *target = argv[i+1];
1269                         ret = cmd_acquire(target);
1270                         i += 1;
1271                         continue;
1272                 }
1273
1274                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1275                         char *target = argv[i+1];
1276                         ret = cmd_release(target);
1277                         i += 1;
1278                         continue;
1279                 }
1280
1281                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1282                         char *src = argv[i+1];
1283                         char *dst = argv[i+2];
1284                         ret = cmd_copy(src, dst);
1285                         i += 2;
1286                         continue;
1287                 }
1288
1289                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1290                         char *src = argv[i+1];
1291                         char *dst = argv[i+2];
1292                         ret = cmd_clone(src, dst);
1293                         i += 2;
1294                         continue;
1295                 }
1296
1297                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1298                         char *target = argv[i+1];
1299                         ret = cmd_info(target);
1300                         i += 1;
1301                         continue;
1302                 }
1303
1304
1305                 if (!parse_ports(argv[i]))
1306                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1307         }
1308
1309         /* xseg_leave(); */
1310         return ret;
1311 }