blockd: Make request logging optional
[archipelago] / xseg / peers / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait     <nr_replies>\n"
33                 "    complete <nr_requests>\n"
34                 "    fail     <nr_requests>\n"
35                 "    rndwrite <nr_loops> <seed> <namesize> <datasize> <objectsize>\n"
36                 "    rndread  <nr_loops> <seed> <namesize> <datasize> <objectsize>\n"
37                 "    info     <name>\n"
38                 "    read     <name> <offset> <size>\n"
39                 "    write    <name> <offset> < data\n"
40                 "    truncate <name> <size>\n"
41                 "    delete   <name>\n"
42                 "    acquire  <name>\n"
43                 "    release  <name>\n"
44                 "    copy     <src>  <dst>\n"
45                 "    clone    <src>  <dst>\n"
46         );
47         return 1;
48 }
49
50 char *namebuf;
51 char *chunk;
52 struct xseg_config cfg;
53 struct xseg *xseg;
54 uint32_t srcport, dstport;
55
56
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
60  */
61
62 void mkname_heavy(char *name, uint32_t namesize, uint32_t seed)
63 {
64         int i;
65         char c;
66         for (i = 0; i < namesize; i += 1) {
67                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
68                 c = '0' + ((c + (c >> 4)) & 0xf);
69                 if (c > '9')
70                         c += 'a'-'0'-10;
71                 name[i] = c;
72                 seed *= ((seed % 137911) | 1) * 137911;
73         }
74 }
75
76 void mkname_light(char *name, uint32_t namesize, uint32_t seed)
77 {
78         int i;
79         char c;
80         for (i = 0; i < namesize; i += 1) {
81                 c = seed;
82                 name[i] = 'A' + (c & 0xf);
83                 seed += 1;
84         }
85 }
86
87 uint64_t pick(uint64_t size)
88 {
89         return (uint64_t)((double)(RAND_MAX) / random());
90 }
91
92 void mkchunk(   char *chunk, uint32_t datasize,
93                 char *name, uint32_t namesize, uint64_t offset)
94 {
95         long i, r, bufsize = namesize + 16;
96         char buf[bufsize];
97         r = datasize % bufsize;
98         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name);
99
100         for (i = 0; i <= (long)datasize - bufsize; i += bufsize)
101                 memcpy(chunk + i, buf, bufsize);
102
103         memcpy(chunk + datasize - r, buf, r);
104 }
105
106 int chkchunk(   char *chunk, uint32_t datasize,
107                 char *name, uint32_t namesize, uint64_t offset)
108 {
109         long i, r;
110         int bufsize = namesize + 16;
111         char buf[bufsize];
112         r = datasize % namesize;
113         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name);
114
115         for (i = 0; i <= (long)datasize - bufsize; i += bufsize)
116                 if (memcmp(chunk + i, buf, bufsize)) {
117                         /*printf("mismatch: '%*s'* vs '%*s'\n",
118                                 bufsize, buf, datasize, chunk);
119                         */
120                         return 0;
121                 }
122
123         if (memcmp(chunk + datasize - r, buf, r))
124                 return 0;
125
126         return 1;
127 }
128
129
130 #define ALLOC_MIN 4096
131 #define ALLOC_MAX 1048576
132
133 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
134 {
135         static uint64_t alloc_size;
136         static char *buf;
137         uint64_t size = 0;
138         char *p;
139         size_t r;
140
141         if (alloc_size < ALLOC_MIN)
142                 alloc_size = ALLOC_MIN;
143
144         if (alloc_size > ALLOC_MAX)
145                 alloc_size = ALLOC_MAX;
146
147         p = realloc(buf, alloc_size);
148         if (!p) {
149                 if (buf)
150                         free(buf);
151                 buf = NULL;
152                 goto out;
153         }
154
155         buf = p;
156
157         while (!feof(fp)) {
158                 r = fread(buf + size, 1, alloc_size - size, fp);
159                 if (!r)
160                         break;
161                 size += r;
162                 if (size >= alloc_size) {
163                         p = realloc(buf, alloc_size * 2);
164                         if (!p) {
165                                 if (buf)
166                                         free(buf);
167                                 buf = NULL;
168                                 size = 0;
169                                 goto out;
170                         }
171                         buf = p;
172                         alloc_size *= 2;
173                 }
174         }
175
176 out:
177         *retbuf = buf;
178         *retsize = size;
179 }
180
181 void report_request(struct xseg_request *req)
182 {
183         uint32_t max = req->datasize;
184         if (max > 128)
185                 max = 128;
186         req->data[max-1] = 0;
187         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
188         fprintf(stderr, "data: %s\n", req->data);
189 }
190
191 int cmd_info(char *name)
192 {
193         uint32_t namesize = strlen(name);
194         size_t size = sizeof(uint64_t);
195         int r;
196         xserial srl;
197         struct xseg_request *req;
198
199         req = xseg_get_request(xseg, srcport);
200         if (!req) {
201                 fprintf(stderr, "No request!\n");
202                 return -1;
203         }
204
205         r = xseg_prep_request(req, namesize, size);
206         if (r < 0) {
207                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
208                         (unsigned long) namesize, (unsigned long) size);
209                 xseg_put_request(xseg, srcport, req);
210                 return -1;
211         }
212
213         strncpy(req->name, name, namesize);
214         req->offset = 0;
215         req->size = size;
216         req->op = X_INFO;
217
218         srl = xseg_submit(xseg, dstport, req);
219         if (srl == None)
220                 return -1;
221
222         xseg_signal(xseg, dstport);
223
224         return 0;
225 }
226
227 int cmd_read(char *name, uint64_t offset, uint64_t size)
228 {
229         uint32_t namesize = strlen(name);
230         int r;
231         xserial srl;
232         struct xseg_request *req = xseg_get_request(xseg, srcport);
233         if (!req) {
234                 fprintf(stderr, "No request\n");
235                 return -1;
236         }
237
238         r = xseg_prep_request(req, namesize, size);
239         if (r < 0) {
240                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
241                         (unsigned long)namesize, (unsigned long long)size);
242                 xseg_put_request(xseg, srcport, req);
243                 return -1;
244         }
245
246         strncpy(req->name, name, namesize);
247         req->offset = offset;
248         req->size = size;
249         req->op = X_READ;
250
251         srl = xseg_submit(xseg, dstport, req);
252         if (srl == None)
253                 return -1;
254
255         xseg_signal(xseg, dstport);
256         return 0;
257 }
258
259 int cmd_write(char *name, uint64_t offset)
260 {
261         char *buf = NULL;
262         int r;
263         xserial srl;
264         uint64_t size = 0;
265         uint32_t namesize = strlen(name);
266         struct xseg_request *req;
267
268         inputbuf(stdin, &buf, &size);
269         if (!size) {
270                 fprintf(stderr, "No input\n");
271                 return -1;
272         }
273
274         req = xseg_get_request(xseg, srcport);
275         if (!req) {
276                 fprintf(stderr, "No request\n");
277                 return -1;
278         }
279
280         r = xseg_prep_request(req, namesize, size);
281         if (r < 0) {
282                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
283                         (unsigned long)namesize, (unsigned long long)size);
284                 xseg_put_request(xseg, srcport, req);
285                 return -1;
286         }
287
288         strncpy(req->name, name, namesize);
289         memcpy(req->buffer, buf, size);
290         req->offset = offset;
291         req->size = size;
292         req->op = X_WRITE;
293
294         srl = xseg_submit(xseg, dstport, req);
295         if (srl == None) {
296                 fprintf(stderr, "Cannot submit\n");
297                 return -1;
298         }
299
300         return 0;
301 }
302
303 int cmd_truncate(char *name, uint64_t offset)
304 {
305         return 0;
306 }
307
308 int cmd_delete(char *name)
309 {
310         return 0;
311 }
312
313 int cmd_acquire(char *name)
314 {
315         return 0;
316 }
317
318 int cmd_release(char *name)
319 {
320         return 0;
321 }
322
323 int cmd_copy(char *src, char *dst)
324 {
325         return 0;
326 }
327
328 int cmd_clone(char *src, char *dst)
329 {
330         return 0;
331 }
332
333 void log_req(   uint32_t portno2, uint32_t portno1, int op, int method,
334                 struct xseg_request *req)
335 {
336         return;
337 }
338
339 #define LOG_ACCEPT  0
340 #define LOG_RECEIVE 1
341
342 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
343 {
344         struct xseg_request *req;
345         int logfd, method;
346         if (!strcmp(logfile, "-"))
347                 logfd = 1;
348         else {
349                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
350                 if (logfd < 0) {
351                         perror(logfile);
352                         return -1;
353                 }
354         }
355
356         if (!strcmp(how, "full"))
357                 method = 0;
358         else if (!strcmp(how, "full"))
359                 method = 1;
360         else
361                 method = 2;
362
363         for (;;) {
364                 int reloop = 0, active;
365                 xseg_prepare_wait(xseg, portno1);
366                 xseg_prepare_wait(xseg, portno2);
367                 req = NULL;
368
369                 for (;;) {
370                         active = 0;
371
372                         req = xseg_accept(xseg, portno1);
373                         if (req) {
374                                 xseg_submit(xseg, portno2, req);
375                                 log_req(portno1, portno2, LOG_ACCEPT, method, req);
376                                 active += 1;
377                         }
378
379                         req = xseg_accept(xseg, portno2);
380                         if (req) {
381                                 xseg_submit(xseg, portno1, req);
382                                 log_req(portno2, portno1, LOG_ACCEPT, method, req);
383                                 active += 1;
384                         }
385
386                         req = xseg_receive(xseg, portno1);
387                         if (req) {
388                                 xseg_respond(xseg, portno2, req);
389                                 log_req(portno1, portno2, LOG_RECEIVE, method, req);
390                                 active += 1;
391                         }
392
393                         req = xseg_receive(xseg, portno2);
394                         if (req) {
395                                 xseg_respond(xseg, portno1, req);
396                                 log_req(portno2, portno1, LOG_RECEIVE, method, req);
397                                 active += 1;
398                         }
399
400                         if (active == 0) {
401                                 if (reloop)
402                                         break;
403                                 /* wait on multiple queues? */
404                                 xseg_wait_signal(xseg, portno1, 100000);
405                                 break;
406                         } else {
407                                 xseg_cancel_wait(xseg, portno1);        
408                                 xseg_cancel_wait(xseg, portno2);        
409                                 reloop = 1;
410                         }
411                 }
412         }
413
414         return 0;
415 }
416
417 int cmd_rndwrite(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size)
418 {
419         if (loops < 0)
420                 return help();
421
422         if (namesize >= chunksize) {
423                 fprintf(stderr, "namesize >= chunksize\n");
424                 return -1;
425         }
426
427         char *p = realloc(namebuf, namesize+1);
428         if (!p) {
429                 fprintf(stderr, "Cannot allocate memory\n");
430                 return -1;
431         }
432         namebuf = p;
433
434         p = realloc(chunk, chunksize);
435         if (!p) {
436                 fprintf(stderr, "Cannot allocate memory\n");
437                 return -1;
438         }
439         chunk = p;
440         memset(chunk, 0, chunksize);
441
442         srandom(seed);
443
444         struct xseg_request *submitted = NULL, *received;
445         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
446         int reported = 0, r;
447         uint64_t offset;
448         xserial srl;
449
450         for (;;) {
451                 xseg_prepare_wait(xseg, srcport);
452                 if (nr_submitted < loops &&
453                     (submitted = xseg_get_request(xseg, srcport))) {
454                         xseg_cancel_wait(xseg, srcport);
455                         r = xseg_prep_request(submitted, namesize, chunksize);
456                         if (r < 0) {
457                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
458                                         namesize, chunksize);
459                                 xseg_put_request(xseg, submitted->portno, submitted);
460                                 return -1;
461                         }
462
463                         nr_submitted += 1;
464                         reported = 0;
465                         seed = random();
466                         mkname(namebuf, namesize, seed);
467                         namebuf[namesize] = 0;
468                         //printf("%ld: %s\n", nr_submitted, namebuf);
469                         strncpy(submitted->name, namebuf, namesize);
470                         offset = 0;// pick(size);
471                         mkchunk(submitted->buffer, chunksize, namebuf, namesize, offset);
472
473                         submitted->offset = offset;
474                         submitted->size = chunksize;
475                         submitted->op = X_WRITE;
476                         submitted->flags |= XF_NOSYNC;
477
478                         srl = xseg_submit(xseg, dstport, submitted);
479                         (void)srl;
480                         xseg_signal(xseg, dstport);
481                 }
482
483                 received = xseg_receive(xseg, srcport);
484                 if (received) {
485                         xseg_cancel_wait(xseg, srcport);
486                         nr_received += 1;
487                         if (!(received->state & XS_SERVED)) {
488                                 nr_failed += 1;
489                                 report_request(received);
490                         }
491                         if (xseg_put_request(xseg, received->portno, received))
492                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
493                 }
494
495                 if (!submitted && !received)
496                         xseg_wait_signal(xseg, srcport, 1000000);
497
498                         if (nr_submitted % 1000 == 0 && !reported) {
499                                 reported = 1;
500                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
501                                         nr_submitted, nr_received, nr_failed);
502                         }
503
504                         if (nr_received >= loops)
505                                 break;
506         }
507
508         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
509                 nr_submitted, nr_received, nr_failed);
510         return 0;
511 }
512
513 /* note:
514  * prepare/wait rhythm,
515  * files are converted to independent chunk access patterns,
516 */
517
518 int cmd_rndread(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size)
519 {
520         if (loops < 0)
521                 return help();
522
523         if (namesize >= chunksize) {
524                 fprintf(stderr, "namesize >= chunksize\n");
525                 return -1;
526         }
527
528         char *p = realloc(namebuf, namesize+1);
529         if (!p) {
530                 fprintf(stderr, "Cannot allocate memory\n");
531                 return -1;
532         }
533         namebuf = p;
534
535         p = realloc(chunk, chunksize);
536         if (!p) {
537                 fprintf(stderr, "Cannot allocate memory\n");
538                 return -1;
539         }
540         chunk = p;
541         memset(chunk, 0, chunksize);
542
543         srandom(seed);
544
545         struct xseg_request *submitted = NULL, *received;
546         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
547         int reported = 0, r;
548         uint64_t offset;
549         xserial srl;
550
551         for (;;) {
552                 submitted = NULL;
553                 xseg_prepare_wait(xseg, srcport);
554                 if (nr_submitted < loops &&
555                     (submitted = xseg_get_request(xseg, srcport))) {
556                         xseg_cancel_wait(xseg, srcport);
557                         r = xseg_prep_request(submitted, namesize, chunksize);
558                         if (r < 0) {
559                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
560                                         namesize, chunksize);
561                                 xseg_put_request(xseg, submitted->portno, submitted);
562                                 return -1;
563                         }
564
565                         nr_submitted += 1;
566                         reported = 0;
567                         seed = random();
568                         mkname(namebuf, namesize, seed);
569                         namebuf[namesize] = 0;
570                         //printf("%ld: %s\n", nr_submitted, namebuf);
571                         offset = 0;//pick(size);
572
573                         strncpy(submitted->name, namebuf, namesize);
574                         submitted->offset = offset;
575                         submitted->size = chunksize;
576                         submitted->op = X_READ;
577
578                         srl = xseg_submit(xseg, dstport, submitted);
579                         (void)srl;
580                         xseg_signal(xseg, dstport);
581                 }
582
583                 received = xseg_receive(xseg, srcport);
584                 if (received) {
585                         xseg_cancel_wait(xseg, srcport);
586                         nr_received += 1;
587                         if (!(received->state & XS_SERVED)) {
588                                 nr_failed += 1;
589                                 report_request(received);
590                         } else if (!chkchunk(received->data, received->datasize,
591                                         received->name, received->namesize, received->offset)) {
592                                 nr_mismatch += 1;
593                         }
594
595                         if (xseg_put_request(xseg, received->portno, received))
596                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
597                 }
598
599                 if (!submitted && !received)
600                         xseg_wait_signal(xseg, srcport, 1000000);
601
602                 if (nr_submitted % 1000 == 0 && !reported) {
603                         reported = 1;
604                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
605                         nr_submitted, nr_received, nr_failed, nr_mismatch);
606                 }
607
608                 if (nr_received >= loops)
609                         break;
610         }
611
612         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
613                 nr_submitted, nr_received, nr_failed, nr_mismatch);
614         return 0;
615 }
616
617 int cmd_report(uint32_t port)
618 {
619         struct xq *fq, *rq, *pq;
620         fq = &xseg->ports[port].free_queue;
621         rq = &xseg->ports[port].request_queue;
622         pq = &xseg->ports[port].reply_queue;
623         fprintf(stderr, "port %u:\n"
624                 "       free_queue [%p] count : %u\n"
625                 "    request_queue [%p] count : %u\n"
626                 "      reply_queue [%p] count : %u\n",
627                 port,
628                 (void *)fq, xq_count(fq),
629                 (void *)rq, xq_count(rq),
630                 (void *)pq, xq_count(pq));
631         return 0;
632 }
633
634 int cmd_join(void)
635 {
636         if (xseg)
637                 return 0;
638
639         xseg = xseg_join(cfg.type, cfg.name);
640         if (!xseg) {
641                 fprintf(stderr, "cannot join segment!\n");
642                 return -1;
643         }
644         return 0;
645 }
646
647 int cmd_reportall(void)
648 {
649         uint32_t t;
650
651         if (cmd_join())
652                 return -1;
653
654         fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
655         for (t = 0; t < xseg->config.nr_ports; t++)
656                 cmd_report(t);
657
658         return 0;
659 }
660
661 int cmd_create(void)
662 {
663         int r = xseg_create(&cfg);
664         if (r) {
665                 fprintf(stderr, "cannot create segment!\n");
666                 return -1;
667         }
668
669         fprintf(stderr, "Segment initialized.\n");
670         return 0;
671 }
672
673 int cmd_destroy(void)
674 {
675         if (!xseg && cmd_join())
676                 return -1;
677         xseg_destroy(xseg);
678         xseg = NULL;
679         fprintf(stderr, "Segment destroyed.\n");
680         return 0;
681 }
682
683 int cmd_alloc_requests(unsigned long nr)
684 {
685         return xseg_alloc_requests(xseg, srcport, nr);
686 }
687
688 int cmd_free_requests(unsigned long nr)
689 {
690         return xseg_free_requests(xseg, srcport, nr);
691 }
692
693 int cmd_put_requests(void)
694 {
695         struct xseg_request *req;
696
697         for (;;) {
698                 req = xseg_accept(xseg, dstport);
699                 if (!req)
700                         break;
701                 if (xseg_put_request(xseg, req->portno, req))
702                         fprintf(stderr, "Cannot put request at port %u\n", req->portno);
703         }
704
705         return 0;
706 }
707
708 int cmd_finish(unsigned long nr, int fail)
709 {
710         struct xseg_request *req;
711
712         for (; nr--;) {
713                 req = xseg_accept(xseg, srcport);
714                 if (!req)
715                         break;
716                 if (fail)
717                         req->state &= ~XS_SERVED;
718                 else
719                         req->state |= XS_SERVED;
720                 xseg_respond(xseg, dstport, req);
721                 xseg_signal(xseg, dstport);
722         }
723
724         return 0;
725 }
726
727 void handle_reply(struct xseg_request *req)
728 {
729         if (!(req->state & XS_SERVED)) {
730                 report_request(req);
731                 goto put;
732         }
733
734         switch (req->op) {
735         case X_READ:
736                 fwrite(req->data, 1, req->datasize, stdout);
737                 break;
738
739         case X_WRITE:
740         case X_SYNC:
741         case X_DELETE:
742         case X_TRUNCATE:
743         case X_COMMIT:
744         case X_CLONE:
745         case X_INFO:
746                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req->data));
747                 break;
748
749         default:
750                 break;
751         }
752
753 put:
754         if (xseg_put_request(xseg, req->portno, req))
755                 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
756 }
757
758 int cmd_wait(uint32_t nr)
759 {
760         struct xseg_request *req;
761         long ret;
762
763         for (;;) {
764                 req = xseg_receive(xseg, srcport);
765                 if (req) {
766                         handle_reply(req);
767                         nr--;
768                         if (nr == 0)
769                                 break;
770                         continue;
771                 }
772
773                 ret = xseg_prepare_wait(xseg, srcport);
774                 if (ret)
775                         return -1;
776
777                 ret = xseg_wait_signal(xseg, srcport, 1000000);
778                 ret = xseg_cancel_wait(xseg, srcport);
779                 if (ret)
780                         return -1;
781         }
782
783         return 0;
784 }
785
786 int cmd_put_replies(void)
787 {
788         struct xseg_request *req;
789
790         for (;;) {
791                 req = xseg_receive(xseg, dstport);
792                 if (!req)
793                         break;
794                 fprintf(stderr, "request: %08llx%08llx\n"
795                         "     op: %u\n"
796                         "  state: %u\n",
797                         0LL, (unsigned long long)req->serial,
798                         req->op,
799                         req->state);
800                 report_request(req);
801
802                 //fwrite(req->buffer, 1, req->buffersize, stdout);
803
804                 if (xseg_put_request(xseg, req->portno, req))
805                         fprintf(stderr, "Cannot put reply\n");
806         }
807
808         return 0;
809 }
810
811 int cmd_bind(long portno)
812 {
813         struct xseg_port *port = xseg_bind_port(xseg, portno);
814         if (!port) {
815                 fprintf(stderr, "failed to bind port %ld\n", portno);
816                 return 1;
817         }
818
819         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
820         return 0;
821 }
822
823 int cmd_signal(uint32_t portno)
824 {
825         return xseg_signal(xseg, portno);
826 }
827
828 int parse_ports(char *str)
829 {
830         int ret = 0;
831         char *s = str;
832
833         for (;;) {
834                 if (*s == 0)
835                         return 0;
836
837                 if (*s == ':') {
838                         *s = 0;
839                         if ((s > str) && isdigit(str[0])) {
840                                 srcport = atol(str);
841                                 ret ++;
842                         }
843                         break;
844                 }
845                 s ++;
846         }
847
848         s += 1;
849         str = s;
850
851         for (;;) {
852                 if (*s == 0) {
853                         if ((s > str) && isdigit(str[0])) {
854                                 dstport = atol(str);
855                                 ret ++;
856                         }
857                         break;
858                 }
859                 s ++;
860         }
861
862         return ret;
863 }
864
865 int main(int argc, char **argv)
866 {
867         int i, ret = 0;
868         char *spec;
869
870         if (argc < 3)
871                 return help();
872
873         srcport = -1;
874         dstport = -1;
875         spec = argv[1];
876
877         if (xseg_parse_spec(spec, &cfg)) {
878                 fprintf(stderr, "Cannot parse spec\n");
879                 return -1;
880         }
881
882         if (xseg_initialize("posix")) {
883                 fprintf(stderr, "cannot initialize!\n");
884                 return -1;
885         }
886
887         for (i = 2; i < argc; i++) {
888
889                 if (!strcmp(argv[i], "create")) {
890                         ret = cmd_create();
891                         continue;
892                 }
893
894                 if (!strcmp(argv[i], "join")) {
895                         ret = cmd_join();
896                         if (!ret)
897                                 fprintf(stderr, "Segment joined.\n");
898                         continue;
899                 }
900
901                 if (!strcmp(argv[i], "destroy")) {
902                         ret = cmd_destroy();
903                         continue;
904                 }
905
906                 if (cmd_join())
907                         return -1;
908
909                 if (!strcmp(argv[i], "reportall")) {
910                         ret = cmd_reportall();
911                         continue;
912                 }
913
914                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
915                         ret = cmd_bind(atol(argv[i+1]));
916                         i += 1;
917                         continue;
918                 }
919
920                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
921                         ret = cmd_signal(atol(argv[i+1]));
922                         i += 1;
923                         continue;
924                 }
925
926                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
927                         ret = cmd_bridge(atol(argv[i+1]),
928                                          atol(argv[i+2]),
929                                          argv[i+3],
930                                          argv[i+4]);
931                         i += 4;
932                         continue;
933                 }
934
935                 if (srcport == -1) {
936                         if (!parse_ports(argv[i]))
937                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
938                         continue;
939                 }
940
941                 if (dstport == -1) {
942                         if (!parse_ports(argv[i]))
943                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
944                         continue;
945                 }
946
947                 if (!strcmp(argv[i], "report")) {
948                         ret = cmd_report(dstport);
949                         continue;
950                 }
951
952                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
953                         ret = cmd_alloc_requests(atol(argv[i+1]));
954                         i += 1;
955                         continue;
956                 }
957
958                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
959                         ret = cmd_free_requests(atol(argv[i+1]));
960                         i += 1;
961                         continue;
962                 }
963
964                 if (!strcmp(argv[i], "put_requests")) {
965                         ret = cmd_put_requests();
966                         continue;
967                 }
968
969                 if (!strcmp(argv[i], "put_replies")) {
970                         ret = cmd_put_replies();
971                         continue;
972                 }
973
974                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
975                         ret = cmd_finish(atol(argv[i+1]), 0);
976                         i += 1;
977                         continue;
978                 }
979
980                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
981                         ret = cmd_finish(atol(argv[i+1]), 1);
982                         i += 1;
983                         continue;
984                 }
985
986                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
987                         ret = cmd_wait(atol(argv[i+1]));
988                         i += 1;
989                         continue;
990                 }
991
992                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
993                         long nr_loops = atol(argv[i+1]);
994                         unsigned int seed = atoi(argv[i+2]);
995                         unsigned int namesize = atoi(argv[i+3]);
996                         unsigned int chunksize = atoi(argv[i+4]);
997                         unsigned long objectsize = atol(argv[i+5]);
998                         ret = cmd_rndwrite(nr_loops, seed, namesize, chunksize, objectsize);
999                         i += 5;
1000                         continue;
1001                 }
1002
1003                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1004                         long nr_loops = atol(argv[i+1]);
1005                         unsigned int seed = atoi(argv[i+2]);
1006                         unsigned int namesize = atoi(argv[i+3]);
1007                         unsigned int chunksize = atoi(argv[i+4]);
1008                         unsigned long objectsize = atol(argv[i+5]);
1009                         ret = cmd_rndread(nr_loops, seed, namesize, chunksize, objectsize);
1010                         i += 5;
1011                         continue;
1012                 }
1013
1014                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1015                         char *name = argv[i+1];
1016                         uint64_t offset = atol(argv[i+2]);
1017                         uint64_t size   = atol(argv[i+3]);
1018                         ret = cmd_read(name, offset, size);
1019                         i += 3;
1020                         continue;
1021                 }
1022
1023                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1024                         char *name = argv[i+1];
1025                         uint64_t offset = atol(argv[i+2]);
1026                         ret = cmd_write(name, offset);
1027                         i += 2;
1028                         continue;
1029                 }
1030
1031                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1032                         char *name = argv[i+1];
1033                         uint64_t offset = atol(argv[i+2]);
1034                         ret = cmd_truncate(name, offset);
1035                         i += 2;
1036                         continue;
1037                 }
1038
1039                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1040                         char *name = argv[i+1];
1041                         ret = cmd_delete(name);
1042                         i += 1;
1043                         continue;
1044                 }
1045
1046                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1047                         char *name = argv[i+1];
1048                         ret = cmd_acquire(name);
1049                         i += 1;
1050                         continue;
1051                 }
1052
1053                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1054                         char *name = argv[i+1];
1055                         ret = cmd_release(name);
1056                         i += 1;
1057                         continue;
1058                 }
1059
1060                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1061                         char *src = argv[i+1];
1062                         char *dst = argv[i+2];
1063                         ret = cmd_copy(src, dst);
1064                         i += 2;
1065                         continue;
1066                 }
1067
1068                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1069                         char *src = argv[i+1];
1070                         char *dst = argv[i+2];
1071                         ret = cmd_clone(src, dst);
1072                         i += 2;
1073                         continue;
1074                 }
1075
1076                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1077                         char *name = argv[i+1];
1078                         ret = cmd_info(name);
1079                         i += 1;
1080                         continue;
1081                 }
1082
1083
1084                 if (!parse_ports(argv[i]))
1085                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1086         }
1087
1088         /* xseg_leave(); */
1089         return ret;
1090 }