Rename typedef'd None to Noneidx
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait        <nr_replies>\n"
33                 "    complete    <nr_requests>\n"
34                 "    fail        <nr_requests>\n"
35                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
37                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
38                 "    info        <target>\n"
39                 "    read        <target> <offset> <size>\n"
40                 "    write       <target> <offset> < data\n"
41                 "    truncate    <target> <size>\n"
42                 "    delete      <target>\n"
43                 "    acquire     <target>\n"
44                 "    release     <target>\n"
45                 "    copy        <src>  <dst>\n"
46                 "    clone       <src>  <dst>\n"
47         );
48         return 1;
49 }
50
51 char *namebuf;
52 char *chunk;
53 struct xseg_config cfg;
54 struct xseg *xseg;
55 uint32_t srcport, dstport;
56 uint64_t reqs;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
60  */
61
62 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
63 {
64         int i;
65         char c;
66         for (i = 0; i < namelen; i += 1) {
67                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
68                 c = '0' + ((c + (c >> 4)) & 0xf);
69                 if (c > '9')
70                         c += 'a'-'0'-10;
71                 name[i] = c;
72                 seed *= ((seed % 137911) | 1) * 137911;
73         }
74 }
75
76 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
77 {
78         int i;
79         char c;
80         for (i = 0; i < namelen; i += 1) {
81                 c = seed;
82                 name[i] = 'A' + (c & 0xf);
83                 seed += 1;
84         }
85 }
86
87 uint64_t pick(uint64_t size)
88 {
89         return (uint64_t)((double)(RAND_MAX) / random());
90 }
91
92 void mkchunk(   char *chunk, uint32_t datalen,
93                 char *target, uint32_t targetlen, uint64_t offset)
94 {
95         long i, r, bufsize = targetlen + 16;
96         char buf[bufsize];
97         r = datalen % bufsize;
98         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
99
100         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
101                 memcpy(chunk + i, buf, bufsize);
102
103         memcpy(chunk + datalen - r, buf, r);
104 }
105
106 int chkchunk(   char *chunk, uint32_t datalen,
107                 char *target, uint32_t targetlen, uint64_t offset)
108 {
109         long i, r;
110         int bufsize = targetlen + 16;
111         char buf[bufsize];
112         r = datalen % targetlen;
113         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
114
115         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
116                 if (memcmp(chunk + i, buf, bufsize)) {
117                         /*printf("mismatch: '%*s'* vs '%*s'\n",
118                                 bufsize, buf, datalen, chunk);
119                         */
120                         return 0;
121                 }
122
123         if (memcmp(chunk + datalen - r, buf, r))
124                 return 0;
125
126         return 1;
127 }
128
129
130 #define ALLOC_MIN 4096
131 #define ALLOC_MAX 1048576
132
133 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
134 {
135         static uint64_t alloc_size;
136         static char *buf;
137         uint64_t size = 0;
138         char *p;
139         size_t r;
140
141         if (alloc_size < ALLOC_MIN)
142                 alloc_size = ALLOC_MIN;
143
144         if (alloc_size > ALLOC_MAX)
145                 alloc_size = ALLOC_MAX;
146
147         p = realloc(buf, alloc_size);
148         if (!p) {
149                 if (buf)
150                         free(buf);
151                 buf = NULL;
152                 goto out;
153         }
154
155         buf = p;
156
157         while (!feof(fp)) {
158                 r = fread(buf + size, 1, alloc_size - size, fp);
159                 if (!r)
160                         break;
161                 size += r;
162                 if (size >= alloc_size) {
163                         p = realloc(buf, alloc_size * 2);
164                         if (!p) {
165                                 if (buf)
166                                         free(buf);
167                                 buf = NULL;
168                                 size = 0;
169                                 goto out;
170                         }
171                         buf = p;
172                         alloc_size *= 2;
173                 }
174         }
175
176 out:
177         *retbuf = buf;
178         *retsize = size;
179 }
180
181 void report_request(struct xseg_request *req)
182 {
183         uint32_t max = req->datalen;
184         if (max > 128)
185                 max = 128;
186         req->data[max-1] = 0;
187         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
188         fprintf(stderr, "data: %s\n", req->data);
189 }
190
191 int cmd_info(char *target)
192 {
193         uint32_t targetlen = strlen(target);
194         size_t size = sizeof(uint64_t);
195         int r;
196         xserial srl;
197         struct xseg_request *req;
198
199         req = xseg_get_request(xseg, srcport);
200         if (!req) {
201                 fprintf(stderr, "No request!\n");
202                 return -1;
203         }
204
205         r = xseg_prep_request(req, targetlen, size);
206         if (r < 0) {
207                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
208                         (unsigned long) targetlen, (unsigned long) size);
209                 xseg_put_request(xseg, srcport, req);
210                 return -1;
211         }
212
213         strncpy(req->target, target, targetlen);
214         req->offset = 0;
215         req->size = size;
216         req->op = X_INFO;
217
218         srl = xseg_submit(xseg, dstport, req);
219         if (srl == Noneidx)
220                 return -1;
221
222         xseg_signal(xseg, dstport);
223
224         return 0;
225 }
226
227 int cmd_read(char *target, uint64_t offset, uint64_t size)
228 {
229         uint32_t targetlen = strlen(target);
230         int r;
231         xserial srl;
232         struct xseg_request *req = xseg_get_request(xseg, srcport);
233         if (!req) {
234                 fprintf(stderr, "No request\n");
235                 return -1;
236         }
237
238         r = xseg_prep_request(req, targetlen, size);
239         if (r < 0) {
240                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
241                         (unsigned long)targetlen, (unsigned long long)size);
242                 xseg_put_request(xseg, srcport, req);
243                 return -1;
244         }
245
246         strncpy(req->target, target, targetlen);
247         req->offset = offset;
248         req->size = size;
249         req->op = X_READ;
250
251         srl = xseg_submit(xseg, dstport, req);
252         if (srl == Noneidx)
253                 return -1;
254
255         xseg_signal(xseg, dstport);
256         return 0;
257 }
258
259 int cmd_write(char *target, uint64_t offset)
260 {
261         char *buf = NULL;
262         int r;
263         xserial srl;
264         uint64_t size = 0;
265         uint32_t targetlen = strlen(target);
266         struct xseg_request *req;
267
268         inputbuf(stdin, &buf, &size);
269         if (!size) {
270                 fprintf(stderr, "No input\n");
271                 return -1;
272         }
273
274         req = xseg_get_request(xseg, srcport);
275         if (!req) {
276                 fprintf(stderr, "No request\n");
277                 return -1;
278         }
279
280         r = xseg_prep_request(req, targetlen, size);
281         if (r < 0) {
282                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
283                         (unsigned long)targetlen, (unsigned long long)size);
284                 xseg_put_request(xseg, srcport, req);
285                 return -1;
286         }
287
288         strncpy(req->target, target, targetlen);
289         memcpy(req->buffer, buf, size);
290         req->offset = offset;
291         req->size = size;
292         req->op = X_WRITE;
293
294         srl = xseg_submit(xseg, dstport, req);
295         if (srl == Noneidx) {
296                 fprintf(stderr, "Cannot submit\n");
297                 return -1;
298         }
299
300         return 0;
301 }
302
303 int cmd_truncate(char *target, uint64_t offset)
304 {
305         return 0;
306 }
307
308 int cmd_delete(char *target)
309 {
310         return 0;
311 }
312
313 int cmd_acquire(char *target)
314 {
315         return 0;
316 }
317
318 int cmd_release(char *target)
319 {
320         return 0;
321 }
322
323 int cmd_copy(char *src, char *dst)
324 {
325         return 0;
326 }
327
328 int cmd_clone(char *src, char *dst)
329 {
330         return 0;
331 }
332
333 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
334                 struct xseg_request *req)
335 {
336         FILE *logfp;
337         char target[64], data[64];
338         /* null terminate name in case of req->target is less than 63 characters,
339          * and next character after name (aka first byte of next buffer) is not
340          * null
341          */
342         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
343
344         logfp = fdopen(logfd, "a");
345         if (!logfp)
346                 return;
347
348         switch(method) {
349         case 0:
350                 strncpy(target, req->target, end);
351                 target[end] = 0;
352                 strncpy(data, req->data, 63);
353                 data[63] = 0;
354
355                 fprintf(logfp,
356                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
357                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
358                         (unsigned int)portno1,
359                         (unsigned int)portno2,
360                         (unsigned int)req->op,
361                         (unsigned long long)req->offset,
362                         (unsigned long)req->size,
363                         (unsigned int)req->state,
364                         (unsigned int)req->targetlen, target,
365                         (unsigned long long)req->datalen, data);
366                 break;
367         case 1:
368                 fprintf(logfp,
369                         "src port: %u, dst port: %u, op: %u\n",
370                         (unsigned int)portno1,
371                         (unsigned int)portno2,
372                         (unsigned int)req->op);
373                 break;
374         case 2:
375                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
376                         (unsigned int)portno1,
377                         (unsigned int)portno2,
378                         (unsigned long long)++reqs);
379         }
380
381         fclose(logfp);
382         return;
383 }
384
385 #define LOG_ACCEPT  0
386 #define LOG_RECEIVE 1
387
388 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
389 {
390         struct xseg_request *req;
391         int logfd, method;
392         if (!strcmp(logfile, "-"))
393                 logfd = 1;
394         else {
395                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
396                 if (logfd < 0) {
397                         perror(logfile);
398                         return -1;
399                 }
400         }
401
402         if (!strcmp(how, "full"))
403                 method = 0;
404         else if (!strcmp(how, "summary"))
405                 method = 1;
406         else
407                 method = 2;
408
409         for (;;) {
410                 int reloop = 0, active;
411                 xseg_prepare_wait(xseg, portno1);
412                 xseg_prepare_wait(xseg, portno2);
413                 req = NULL;
414
415                 for (;;) {
416                         active = 0;
417
418                         req = xseg_accept(xseg, portno1);
419                         if (req) {
420                                 xseg_submit(xseg, portno2, req);
421                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
422                                 active += 1;
423                         }
424
425                         req = xseg_accept(xseg, portno2);
426                         if (req) {
427                                 xseg_submit(xseg, portno1, req);
428                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
429                                 active += 1;
430                         }
431
432                         req = xseg_receive(xseg, portno1);
433                         if (req) {
434                                 xseg_respond(xseg, portno2, req);
435                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
436                                 active += 1;
437                         }
438
439                         req = xseg_receive(xseg, portno2);
440                         if (req) {
441                                 xseg_respond(xseg, portno1, req);
442                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
443                                 active += 1;
444                         }
445
446                         if (active == 0) {
447                                 if (reloop)
448                                         break;
449                                 /* wait on multiple queues? */
450                                 xseg_wait_signal(xseg, 100000);
451                                 break;
452                         } else {
453                                 xseg_cancel_wait(xseg, portno1);        
454                                 xseg_cancel_wait(xseg, portno2);        
455                                 reloop = 1;
456                         }
457                 }
458         }
459
460         close(logfd);
461
462         return 0;
463 }
464
465 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
466 {
467         if (loops < 0)
468                 return help();
469
470         if (targetlen >= chunksize) {
471                 fprintf(stderr, "targetlen >= chunksize\n");
472                 return -1;
473         }
474
475         char *p = realloc(namebuf, targetlen+1);
476         if (!p) {
477                 fprintf(stderr, "Cannot allocate memory\n");
478                 return -1;
479         }
480         namebuf = p;
481
482         p = realloc(chunk, chunksize);
483         if (!p) {
484                 fprintf(stderr, "Cannot allocate memory\n");
485                 return -1;
486         }
487         chunk = p;
488         memset(chunk, 0, chunksize);
489
490         srandom(seed);
491
492         struct xseg_request *submitted = NULL, *received;
493         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
494         int reported = 0, r;
495         uint64_t offset;
496         xserial srl;
497
498         for (;;) {
499                 xseg_prepare_wait(xseg, srcport);
500                 if (nr_submitted < loops &&
501                     (submitted = xseg_get_request(xseg, srcport))) {
502                         xseg_cancel_wait(xseg, srcport);
503                         r = xseg_prep_request(submitted, targetlen, chunksize);
504                         if (r < 0) {
505                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
506                                         targetlen, chunksize);
507                                 xseg_put_request(xseg, submitted->portno, submitted);
508                                 return -1;
509                         }
510
511                         nr_submitted += 1;
512                         reported = 0;
513                         seed = random();
514                         mkname(namebuf, targetlen, seed);
515                         namebuf[targetlen] = 0;
516                         //printf("%ld: %s\n", nr_submitted, namebuf);
517                         strncpy(submitted->target, namebuf, targetlen);
518                         offset = 0;// pick(size);
519                         mkchunk(submitted->buffer, chunksize, namebuf, targetlen, offset);
520
521                         submitted->offset = offset;
522                         submitted->size = chunksize;
523                         submitted->op = X_WRITE;
524                         submitted->flags |= XF_NOSYNC;
525
526                         srl = xseg_submit(xseg, dstport, submitted);
527                         (void)srl;
528                         xseg_signal(xseg, dstport);
529                 }
530
531                 received = xseg_receive(xseg, srcport);
532                 if (received) {
533                         xseg_cancel_wait(xseg, srcport);
534                         nr_received += 1;
535                         if (!(received->state & XS_SERVED)) {
536                                 nr_failed += 1;
537                                 report_request(received);
538                         }
539                         if (xseg_put_request(xseg, received->portno, received))
540                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
541                 }
542
543                 if (!submitted && !received)
544                         xseg_wait_signal(xseg, 1000000);
545
546                         if (nr_submitted % 1000 == 0 && !reported) {
547                                 reported = 1;
548                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
549                                         nr_submitted, nr_received, nr_failed);
550                         }
551
552                         if (nr_received >= loops)
553                                 break;
554         }
555
556         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
557                 nr_submitted, nr_received, nr_failed);
558         return 0;
559 }
560
561 /* note:
562  * prepare/wait rhythm,
563  * files are converted to independent chunk access patterns,
564 */
565
566 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
567 {
568         if (loops < 0)
569                 return help();
570
571         if (targetlen >= chunksize) {
572                 fprintf(stderr, "targetlen >= chunksize\n");
573                 return -1;
574         }
575
576         char *p = realloc(namebuf, targetlen+1);
577         if (!p) {
578                 fprintf(stderr, "Cannot allocate memory\n");
579                 return -1;
580         }
581         namebuf = p;
582
583         p = realloc(chunk, chunksize);
584         if (!p) {
585                 fprintf(stderr, "Cannot allocate memory\n");
586                 return -1;
587         }
588         chunk = p;
589         memset(chunk, 0, chunksize);
590
591         srandom(seed);
592
593         struct xseg_request *submitted = NULL, *received;
594         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
595         int reported = 0, r;
596         uint64_t offset;
597         xserial srl;
598
599         for (;;) {
600                 submitted = NULL;
601                 xseg_prepare_wait(xseg, srcport);
602                 if (nr_submitted < loops &&
603                     (submitted = xseg_get_request(xseg, srcport))) {
604                         xseg_cancel_wait(xseg, srcport);
605                         r = xseg_prep_request(submitted, targetlen, chunksize);
606                         if (r < 0) {
607                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
608                                         targetlen, chunksize);
609                                 xseg_put_request(xseg, submitted->portno, submitted);
610                                 return -1;
611                         }
612
613                         nr_submitted += 1;
614                         reported = 0;
615                         seed = random();
616                         mkname(namebuf, targetlen, seed);
617                         namebuf[targetlen] = 0;
618                         //printf("%ld: %s\n", nr_submitted, namebuf);
619                         offset = 0;//pick(size);
620
621                         strncpy(submitted->target, namebuf, targetlen);
622                         submitted->offset = offset;
623                         submitted->size = chunksize;
624                         submitted->op = X_READ;
625
626                         srl = xseg_submit(xseg, dstport, submitted);
627                         (void)srl;
628                         xseg_signal(xseg, dstport);
629                 }
630
631                 received = xseg_receive(xseg, srcport);
632                 if (received) {
633                         xseg_cancel_wait(xseg, srcport);
634                         nr_received += 1;
635                         if (!(received->state & XS_SERVED)) {
636                                 nr_failed += 1;
637                                 report_request(received);
638                         } else if (!chkchunk(received->data, received->datalen,
639                                         received->target, received->targetlen, received->offset)) {
640                                 nr_mismatch += 1;
641                         }
642
643                         if (xseg_put_request(xseg, received->portno, received))
644                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
645                 }
646
647                 if (!submitted && !received)
648                         xseg_wait_signal(xseg, 1000000);
649
650                 if (nr_submitted % 1000 == 0 && !reported) {
651                         reported = 1;
652                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
653                         nr_submitted, nr_received, nr_failed, nr_mismatch);
654                 }
655
656                 if (nr_received >= loops)
657                         break;
658         }
659
660         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
661                 nr_submitted, nr_received, nr_failed, nr_mismatch);
662         return 0;
663 }
664
665 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
666 {
667         if (loops < 0)
668                 return help();
669
670         struct xseg_request *submitted = NULL, *received;
671         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
672         int reported = 0, r;
673         uint64_t offset;
674         uint32_t targetlen = 10, chunksize = 4096;
675         struct timeval tv1, tv2;
676         xserial srl;
677
678         xseg_bind_port(xseg, srcport);
679
680         gettimeofday(&tv1, NULL);
681         for (;;) {
682                 submitted = NULL;
683                 xseg_prepare_wait(xseg, srcport);
684                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
685                     (submitted = xseg_get_request(xseg, srcport))) {
686                         xseg_cancel_wait(xseg, srcport);
687                         r = xseg_prep_request(submitted, targetlen, chunksize);
688                         if (r < 0) {
689                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
690                                         targetlen, chunksize);
691                                 xseg_put_request(xseg, submitted->portno, submitted);
692                                 return -1;
693                         }
694
695                         ++nr_flying;
696                         nr_submitted += 1;
697                         reported = 0;
698                         offset = 0;//pick(size);
699
700                         submitted->offset = offset;
701                         submitted->size = chunksize;
702
703                         if (op == 0)
704                                 submitted->op = X_INFO;
705                         else if (op == 1)
706                                 submitted->op = X_READ;
707                         else if (op == 2) {
708                                 submitted->op = X_WRITE;
709                                 mkchunk(submitted->buffer, submitted->datalen, submitted->target, submitted->targetlen, submitted->offset);
710                         }
711
712                         srl = xseg_submit(xseg, dstport, submitted);
713                         (void)srl;
714                         if (xseg_signal(xseg, dstport) < 0)
715                                 perror("Cannot signal peer");
716                 }
717                 received = xseg_receive(xseg, srcport);
718                 if (received) {
719                         xseg_cancel_wait(xseg, srcport);
720                         --nr_flying;
721                         if (nr_received == 0)
722                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
723                                         (unsigned long long)received->elapsed);
724                         nr_received += 1;
725                         if (!(received->state & XS_SERVED)) {
726                                 nr_failed += 1;
727                                 //report_request(received);
728                         }
729
730                         if (xseg_put_request(xseg, received->portno, received))
731                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
732                 }
733
734                 if (!submitted && !received)
735                         xseg_wait_signal(xseg, 10000000L);
736
737                 if (nr_received >= loops)
738                         break;
739         }
740         gettimeofday(&tv2, NULL);
741
742         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
743                 nr_submitted, nr_received, nr_failed, nr_mismatch);
744         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
745         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
746
747         return 0;
748 }
749
750 int cmd_report(uint32_t port)
751 {
752         struct xq *fq, *rq, *pq;
753         fq = &xseg->ports[port].free_queue;
754         rq = &xseg->ports[port].request_queue;
755         pq = &xseg->ports[port].reply_queue;
756         fprintf(stderr, "port %u:\n"
757                 "       free_queue [%p] count : %u\n"
758                 "    request_queue [%p] count : %u\n"
759                 "      reply_queue [%p] count : %u\n",
760                 port,
761                 (void *)fq, xq_count(fq),
762                 (void *)rq, xq_count(rq),
763                 (void *)pq, xq_count(pq));
764         return 0;
765 }
766
767 int cmd_join(void)
768 {
769         if (xseg)
770                 return 0;
771
772         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
773         if (!xseg) {
774                 fprintf(stderr, "cannot join segment!\n");
775                 return -1;
776         }
777         return 0;
778 }
779
780 int cmd_reportall(void)
781 {
782         uint32_t t;
783
784         if (cmd_join())
785                 return -1;
786
787         fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
788         for (t = 0; t < xseg->config.nr_ports; t++)
789                 cmd_report(t);
790
791         return 0;
792 }
793
794 int cmd_create(void)
795 {
796         int r = xseg_create(&cfg);
797         if (r) {
798                 fprintf(stderr, "cannot create segment!\n");
799                 return -1;
800         }
801
802         fprintf(stderr, "Segment initialized.\n");
803         return 0;
804 }
805
806 int cmd_destroy(void)
807 {
808         if (!xseg && cmd_join())
809                 return -1;
810         xseg_leave(xseg);
811         xseg_destroy(xseg);
812         xseg = NULL;
813         fprintf(stderr, "Segment destroyed.\n");
814         return 0;
815 }
816
817 int cmd_alloc_requests(unsigned long nr)
818 {
819         return xseg_alloc_requests(xseg, srcport, nr);
820 }
821
822 int cmd_free_requests(unsigned long nr)
823 {
824         return xseg_free_requests(xseg, srcport, nr);
825 }
826
827 int cmd_put_requests(void)
828 {
829         struct xseg_request *req;
830
831         for (;;) {
832                 req = xseg_accept(xseg, dstport);
833                 if (!req)
834                         break;
835                 if (xseg_put_request(xseg, req->portno, req))
836                         fprintf(stderr, "Cannot put request at port %u\n", req->portno);
837         }
838
839         return 0;
840 }
841
842 int cmd_finish(unsigned long nr, int fail)
843 {
844         struct xseg_request *req;
845         char *buf = malloc(sizeof(char) * 8128);
846
847         xseg_bind_port(xseg, srcport);
848
849         for (; nr--;) {
850                 xseg_prepare_wait(xseg, srcport);
851                 req = xseg_accept(xseg, srcport);
852                 if (req) {
853                         xseg_cancel_wait(xseg, srcport);
854                         if (fail == 1)
855                                 req->state &= ~XS_SERVED;
856                         else {
857                                 if (req->op == X_READ)
858                                         mkchunk(req->buffer, req->datalen, req->target, req->targetlen, req->offset);
859                                 else if (req->op == X_WRITE) 
860                                         memcpy(buf, req->data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
861                                 else if (req->op == X_INFO)
862                                         *((uint64_t *) req->data) = 4294967296;
863                                 
864                                 req->state |= XS_SERVED;
865                                 req->serviced = req->size;
866                         }
867
868                         xseg_respond(xseg, dstport, req);
869                         xseg_signal(xseg, dstport);
870                         continue;
871                 }
872                 ++nr;
873                 xseg_wait_signal(xseg, 10000000L);
874         }
875
876         free(buf);
877
878         return 0;
879 }
880
881 void handle_reply(struct xseg_request *req)
882 {
883         if (!(req->state & XS_SERVED)) {
884                 report_request(req);
885                 goto put;
886         }
887
888         switch (req->op) {
889         case X_READ:
890                 fwrite(req->data, 1, req->datalen, stdout);
891                 break;
892
893         case X_WRITE:
894         case X_SYNC:
895         case X_DELETE:
896         case X_TRUNCATE:
897         case X_COMMIT:
898         case X_CLONE:
899         case X_INFO:
900                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req->data));
901                 break;
902
903         default:
904                 break;
905         }
906
907 put:
908         if (xseg_put_request(xseg, req->portno, req))
909                 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
910 }
911
912 int cmd_wait(uint32_t nr)
913 {
914         struct xseg_request *req;
915         long ret;
916
917         for (;;) {
918                 req = xseg_receive(xseg, srcport);
919                 if (req) {
920                         handle_reply(req);
921                         nr--;
922                         if (nr == 0)
923                                 break;
924                         continue;
925                 }
926
927                 ret = xseg_prepare_wait(xseg, srcport);
928                 if (ret)
929                         return -1;
930
931                 ret = xseg_wait_signal(xseg, 1000000);
932                 ret = xseg_cancel_wait(xseg, srcport);
933                 if (ret)
934                         return -1;
935         }
936
937         return 0;
938 }
939
940 int cmd_put_replies(void)
941 {
942         struct xseg_request *req;
943
944         for (;;) {
945                 req = xseg_receive(xseg, dstport);
946                 if (!req)
947                         break;
948                 fprintf(stderr, "request: %08llx%08llx\n"
949                         "     op: %u\n"
950                         "  state: %u\n",
951                         0LL, (unsigned long long)req->serial,
952                         req->op,
953                         req->state);
954                 report_request(req);
955
956                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
957
958                 if (xseg_put_request(xseg, req->portno, req))
959                         fprintf(stderr, "Cannot put reply\n");
960         }
961
962         return 0;
963 }
964
965 int cmd_bind(long portno)
966 {
967         struct xseg_port *port = xseg_bind_port(xseg, portno);
968         if (!port) {
969                 fprintf(stderr, "failed to bind port %ld\n", portno);
970                 return 1;
971         }
972
973         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
974         return 0;
975 }
976
977 int cmd_signal(uint32_t portno)
978 {
979         return xseg_signal(xseg, portno);
980 }
981
982 int parse_ports(char *str)
983 {
984         int ret = 0;
985         char *s = str;
986
987         for (;;) {
988                 if (*s == 0)
989                         return 0;
990
991                 if (*s == ':') {
992                         *s = 0;
993                         if ((s > str) && isdigit(str[0])) {
994                                 srcport = atol(str);
995                                 ret ++;
996                         }
997                         break;
998                 }
999                 s ++;
1000         }
1001
1002         s += 1;
1003         str = s;
1004
1005         for (;;) {
1006                 if (*s == 0) {
1007                         if ((s > str) && isdigit(str[0])) {
1008                                 dstport = atol(str);
1009                                 ret ++;
1010                         }
1011                         break;
1012                 }
1013                 s ++;
1014         }
1015
1016         return ret;
1017 }
1018
1019 int main(int argc, char **argv)
1020 {
1021         int i, ret = 0;
1022         char *spec;
1023
1024         if (argc < 3)
1025                 return help();
1026
1027         srcport = -1;
1028         dstport = -1;
1029         spec = argv[1];
1030
1031         if (xseg_parse_spec(spec, &cfg)) {
1032                 fprintf(stderr, "Cannot parse spec\n");
1033                 return -1;
1034         }
1035
1036         if (xseg_initialize()) {
1037                 fprintf(stderr, "cannot initialize!\n");
1038                 return -1;
1039         }
1040
1041         for (i = 2; i < argc; i++) {
1042
1043                 if (!strcmp(argv[i], "create")) {
1044                         ret = cmd_create();
1045                         continue;
1046                 }
1047
1048                 if (!strcmp(argv[i], "join")) {
1049                         ret = cmd_join();
1050                         if (!ret)
1051                                 fprintf(stderr, "Segment joined.\n");
1052                         continue;
1053                 }
1054
1055                 if (!strcmp(argv[i], "destroy")) {
1056                         ret = cmd_destroy();
1057                         continue;
1058                 }
1059
1060                 if (cmd_join())
1061                         return -1;
1062
1063                 if (!strcmp(argv[i], "reportall")) {
1064                         ret = cmd_reportall();
1065                         continue;
1066                 }
1067
1068                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1069                         ret = cmd_bind(atol(argv[i+1]));
1070                         i += 1;
1071                         continue;
1072                 }
1073
1074                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1075                         ret = cmd_signal(atol(argv[i+1]));
1076                         i += 1;
1077                         continue;
1078                 }
1079
1080                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1081                         ret = cmd_bridge(atol(argv[i+1]),
1082                                          atol(argv[i+2]),
1083                                          argv[i+3],
1084                                          argv[i+4]);
1085                         i += 4;
1086                         continue;
1087                 }
1088
1089                 if (srcport == -1) {
1090                         if (!parse_ports(argv[i]))
1091                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1092                         continue;
1093                 }
1094
1095                 if (dstport == -1) {
1096                         if (!parse_ports(argv[i]))
1097                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1098                         continue;
1099                 }
1100
1101                 if (!strcmp(argv[i], "report")) {
1102                         ret = cmd_report(dstport);
1103                         continue;
1104                 }
1105
1106                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1107                         ret = cmd_alloc_requests(atol(argv[i+1]));
1108                         i += 1;
1109                         continue;
1110                 }
1111
1112                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1113                         ret = cmd_free_requests(atol(argv[i+1]));
1114                         i += 1;
1115                         continue;
1116                 }
1117
1118                 if (!strcmp(argv[i], "put_requests")) {
1119                         ret = cmd_put_requests();
1120                         continue;
1121                 }
1122
1123                 if (!strcmp(argv[i], "put_replies")) {
1124                         ret = cmd_put_replies();
1125                         continue;
1126                 }
1127
1128                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1129                         ret = cmd_finish(atol(argv[i+1]), 0);
1130                         i += 1;
1131                         continue;
1132                 }
1133
1134                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1135                         ret = cmd_finish(atol(argv[i+1]), 1);
1136                         i += 1;
1137                         continue;
1138                 }
1139
1140                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1141                         ret = cmd_wait(atol(argv[i+1]));
1142                         i += 1;
1143                         continue;
1144                 }
1145
1146                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1147                         long nr_loops = atol(argv[i+1]);
1148                         unsigned int seed = atoi(argv[i+2]);
1149                         unsigned int targetlen = atoi(argv[i+3]);
1150                         unsigned int chunksize = atoi(argv[i+4]);
1151                         unsigned long objectsize = atol(argv[i+5]);
1152                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1153                         i += 5;
1154                         continue;
1155                 }
1156
1157                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1158                         long nr_loops = atol(argv[i+1]);
1159                         unsigned int seed = atoi(argv[i+2]);
1160                         unsigned int targetlen = atoi(argv[i+3]);
1161                         unsigned int chunksize = atoi(argv[i+4]);
1162                         unsigned long objectsize = atol(argv[i+5]);
1163                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1164                         i += 5;
1165                         continue;
1166                 }
1167
1168                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1169                         long nr_loops = atol(argv[i+1]);
1170                         long concurrent_reqs = atol(argv[i+2]);
1171                         int op = atoi(argv[i+3]);
1172                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1173                         i += 3;
1174                         continue;
1175                 }
1176
1177                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1178                         char *target = argv[i+1];
1179                         uint64_t offset = atol(argv[i+2]);
1180                         uint64_t size   = atol(argv[i+3]);
1181                         ret = cmd_read(target, offset, size);
1182                         i += 3;
1183                         continue;
1184                 }
1185
1186                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1187                         char *target = argv[i+1];
1188                         uint64_t offset = atol(argv[i+2]);
1189                         ret = cmd_write(target, offset);
1190                         i += 2;
1191                         continue;
1192                 }
1193
1194                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1195                         char *target = argv[i+1];
1196                         uint64_t offset = atol(argv[i+2]);
1197                         ret = cmd_truncate(target, offset);
1198                         i += 2;
1199                         continue;
1200                 }
1201
1202                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1203                         char *target = argv[i+1];
1204                         ret = cmd_delete(target);
1205                         i += 1;
1206                         continue;
1207                 }
1208
1209                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1210                         char *target = argv[i+1];
1211                         ret = cmd_acquire(target);
1212                         i += 1;
1213                         continue;
1214                 }
1215
1216                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1217                         char *target = argv[i+1];
1218                         ret = cmd_release(target);
1219                         i += 1;
1220                         continue;
1221                 }
1222
1223                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1224                         char *src = argv[i+1];
1225                         char *dst = argv[i+2];
1226                         ret = cmd_copy(src, dst);
1227                         i += 2;
1228                         continue;
1229                 }
1230
1231                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1232                         char *src = argv[i+1];
1233                         char *dst = argv[i+2];
1234                         ret = cmd_clone(src, dst);
1235                         i += 2;
1236                         continue;
1237                 }
1238
1239                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1240                         char *target = argv[i+1];
1241                         ret = cmd_info(target);
1242                         i += 1;
1243                         continue;
1244                 }
1245
1246
1247                 if (!parse_ports(argv[i]))
1248                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1249         }
1250
1251         /* xseg_leave(); */
1252         return ret;
1253 }