Merge branch 'sosd' into demo
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait     <nr_replies>\n"
33                 "    complete <nr_requests>\n"
34                 "    fail     <nr_requests>\n"
35                 "    rndwrite <nr_loops> <seed> <namesize> <datasize> <objectsize>\n"
36                 "    rndread  <nr_loops> <seed> <namesize> <datasize> <objectsize>\n"
37                 "    info     <name>\n"
38                 "    read     <name> <offset> <size>\n"
39                 "    write    <name> <offset> < data\n"
40                 "    truncate <name> <size>\n"
41                 "    delete   <name>\n"
42                 "    acquire  <name>\n"
43                 "    release  <name>\n"
44                 "    copy     <src>  <dst>\n"
45                 "    clone    <src>  <dst>\n"
46         );
47         return 1;
48 }
49
50 char *namebuf;
51 char *chunk;
52 struct xseg_config cfg;
53 struct xseg *xseg;
54 uint32_t srcport, dstport;
55 uint64_t reqs;
56 struct timval *tv;
57
58 #define mkname mkname_heavy
59 /* heavy distributes duplicates much more widely than light
60  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
61  */
62
63 void mkname_heavy(char *name, uint32_t namesize, uint32_t seed)
64 {
65         int i;
66         char c;
67         for (i = 0; i < namesize; i += 1) {
68                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
69                 c = '0' + ((c + (c >> 4)) & 0xf);
70                 if (c > '9')
71                         c += 'a'-'0'-10;
72                 name[i] = c;
73                 seed *= ((seed % 137911) | 1) * 137911;
74         }
75 }
76
77 void mkname_light(char *name, uint32_t namesize, uint32_t seed)
78 {
79         int i;
80         char c;
81         for (i = 0; i < namesize; i += 1) {
82                 c = seed;
83                 name[i] = 'A' + (c & 0xf);
84                 seed += 1;
85         }
86 }
87
88 uint64_t pick(uint64_t size)
89 {
90         return (uint64_t)((double)(RAND_MAX) / random());
91 }
92
93 void mkchunk(   char *chunk, uint32_t datasize,
94                 char *name, uint32_t namesize, uint64_t offset)
95 {
96         long i, r, bufsize = namesize + 16;
97         char buf[bufsize];
98         r = datasize % bufsize;
99         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name);
100
101         for (i = 0; i <= (long)datasize - bufsize; i += bufsize)
102                 memcpy(chunk + i, buf, bufsize);
103
104         memcpy(chunk + datasize - r, buf, r);
105 }
106
107 int chkchunk(   char *chunk, uint32_t datasize,
108                 char *name, uint32_t namesize, uint64_t offset)
109 {
110         long i, r;
111         int bufsize = namesize + 16;
112         char buf[bufsize];
113         r = datasize % namesize;
114         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name);
115
116         for (i = 0; i <= (long)datasize - bufsize; i += bufsize)
117                 if (memcmp(chunk + i, buf, bufsize)) {
118                         /*printf("mismatch: '%*s'* vs '%*s'\n",
119                                 bufsize, buf, datasize, chunk);
120                         */
121                         return 0;
122                 }
123
124         if (memcmp(chunk + datasize - r, buf, r))
125                 return 0;
126
127         return 1;
128 }
129
130
131 #define ALLOC_MIN 4096
132 #define ALLOC_MAX 1048576
133
134 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
135 {
136         static uint64_t alloc_size;
137         static char *buf;
138         uint64_t size = 0;
139         char *p;
140         size_t r;
141
142         if (alloc_size < ALLOC_MIN)
143                 alloc_size = ALLOC_MIN;
144
145         if (alloc_size > ALLOC_MAX)
146                 alloc_size = ALLOC_MAX;
147
148         p = realloc(buf, alloc_size);
149         if (!p) {
150                 if (buf)
151                         free(buf);
152                 buf = NULL;
153                 goto out;
154         }
155
156         buf = p;
157
158         while (!feof(fp)) {
159                 r = fread(buf + size, 1, alloc_size - size, fp);
160                 if (!r)
161                         break;
162                 size += r;
163                 if (size >= alloc_size) {
164                         p = realloc(buf, alloc_size * 2);
165                         if (!p) {
166                                 if (buf)
167                                         free(buf);
168                                 buf = NULL;
169                                 size = 0;
170                                 goto out;
171                         }
172                         buf = p;
173                         alloc_size *= 2;
174                 }
175         }
176
177 out:
178         *retbuf = buf;
179         *retsize = size;
180 }
181
182 void report_request(struct xseg_request *req)
183 {
184         uint32_t max = req->datasize;
185         if (max > 128)
186                 max = 128;
187         req->data[max-1] = 0;
188         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
189         fprintf(stderr, "data: %s\n", req->data);
190 }
191
192 int cmd_info(char *name)
193 {
194         uint32_t namesize = strlen(name);
195         size_t size = sizeof(uint64_t);
196         int r;
197         xserial srl;
198         struct xseg_request *req;
199
200         req = xseg_get_request(xseg, srcport);
201         if (!req) {
202                 fprintf(stderr, "No request!\n");
203                 return -1;
204         }
205
206         r = xseg_prep_request(req, namesize, size);
207         if (r < 0) {
208                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
209                         (unsigned long) namesize, (unsigned long) size);
210                 xseg_put_request(xseg, srcport, req);
211                 return -1;
212         }
213
214         strncpy(req->name, name, namesize);
215         req->offset = 0;
216         req->size = size;
217         req->op = X_INFO;
218
219         srl = xseg_submit(xseg, dstport, req);
220         if (srl == None)
221                 return -1;
222
223         xseg_signal(xseg, dstport);
224
225         return 0;
226 }
227
228 int cmd_read(char *name, uint64_t offset, uint64_t size)
229 {
230         uint32_t namesize = strlen(name);
231         int r;
232         xserial srl;
233         struct xseg_request *req = xseg_get_request(xseg, srcport);
234         if (!req) {
235                 fprintf(stderr, "No request\n");
236                 return -1;
237         }
238
239         r = xseg_prep_request(req, namesize, size);
240         if (r < 0) {
241                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
242                         (unsigned long)namesize, (unsigned long long)size);
243                 xseg_put_request(xseg, srcport, req);
244                 return -1;
245         }
246
247         strncpy(req->name, name, namesize);
248         req->offset = offset;
249         req->size = size;
250         req->op = X_READ;
251
252         srl = xseg_submit(xseg, dstport, req);
253         if (srl == None)
254                 return -1;
255
256         xseg_signal(xseg, dstport);
257         return 0;
258 }
259
260 int cmd_write(char *name, uint64_t offset)
261 {
262         char *buf = NULL;
263         int r;
264         xserial srl;
265         uint64_t size = 0;
266         uint32_t namesize = strlen(name);
267         struct xseg_request *req;
268
269         inputbuf(stdin, &buf, &size);
270         if (!size) {
271                 fprintf(stderr, "No input\n");
272                 return -1;
273         }
274
275         req = xseg_get_request(xseg, srcport);
276         if (!req) {
277                 fprintf(stderr, "No request\n");
278                 return -1;
279         }
280
281         r = xseg_prep_request(req, namesize, size);
282         if (r < 0) {
283                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
284                         (unsigned long)namesize, (unsigned long long)size);
285                 xseg_put_request(xseg, srcport, req);
286                 return -1;
287         }
288
289         strncpy(req->name, name, namesize);
290         memcpy(req->buffer, buf, size);
291         req->offset = offset;
292         req->size = size;
293         req->op = X_WRITE;
294
295         srl = xseg_submit(xseg, dstport, req);
296         if (srl == None) {
297                 fprintf(stderr, "Cannot submit\n");
298                 return -1;
299         }
300
301         return 0;
302 }
303
304 int cmd_truncate(char *name, uint64_t offset)
305 {
306         return 0;
307 }
308
309 int cmd_delete(char *name)
310 {
311         return 0;
312 }
313
314 int cmd_acquire(char *name)
315 {
316         return 0;
317 }
318
319 int cmd_release(char *name)
320 {
321         return 0;
322 }
323
324 int cmd_copy(char *src, char *dst)
325 {
326         return 0;
327 }
328
329 int cmd_clone(char *src, char *dst)
330 {
331         return 0;
332 }
333
334 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
335                 struct xseg_request *req)
336 {
337         char name[64], data[64];
338         /* null terminate name in case of req->name is less than 63 characters,
339          * and next character after name (aka first byte of next buffer) is not
340          * null
341          */
342         unsigned int end = (req->namesize > 63) ? 63 : req->namesize;
343
344         switch(method) {
345         case 0:
346                 strncpy(name, req->name, end);
347                 name[end] = 0;
348                 strncpy(data, req->data, 63);
349                 data[63] = 0;
350
351                 fprintf(logfd,
352                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
353                         "name[%u]: '%s', data[%llu]:\n%s------------------\n\n",
354                         (unsigned int)portno1,
355                         (unsigned int)portno2,
356                         (unsigned int)req->op,
357                         (unsigned long long)req->offset,
358                         (unsigned long)req->size,
359                         (unsigned int)req->state,
360                         (unsigned int)req->namesize, name,
361                         (unsigned long long)req->datasize, data);
362                 break;
363         case 1:
364                 fprintf(logfd,
365                         "src port: %u, dst port: %u, op: %u\n",
366                         (unsigned int)portno1,
367                         (unsigned int)portno2,
368                         (unsigned int)req->op);
369                 break;
370         case 2:
371                 fprintf(logfd, "src port: %u, dst port: %u, reqs: %u\n",
372                         (unsigned int)portno1,
373                         (unsigned int)portno2,
374                         ++reqs);
375         }
376
377         return;
378 }
379
380 #define LOG_ACCEPT  0
381 #define LOG_RECEIVE 1
382
383 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
384 {
385         struct xseg_request *req;
386         int logfd, method;
387         if (!strcmp(logfile, "-"))
388                 logfd = 1;
389         else {
390                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
391                 if (logfd < 0) {
392                         perror(logfile);
393                         return -1;
394                 }
395         }
396
397         if (!strcmp(how, "full"))
398                 method = 0;
399         else if (!strcmp(how, "summary"))
400                 method = 1;
401         else
402                 method = 2;
403
404         for (;;) {
405                 int reloop = 0, active;
406                 xseg_prepare_wait(xseg, portno1);
407                 xseg_prepare_wait(xseg, portno2);
408                 req = NULL;
409
410                 for (;;) {
411                         active = 0;
412
413                         req = xseg_accept(xseg, portno1);
414                         if (req) {
415                                 xseg_submit(xseg, portno2, req);
416                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
417                                 active += 1;
418                         }
419
420                         req = xseg_accept(xseg, portno2);
421                         if (req) {
422                                 xseg_submit(xseg, portno1, req);
423                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
424                                 active += 1;
425                         }
426
427                         req = xseg_receive(xseg, portno1);
428                         if (req) {
429                                 xseg_respond(xseg, portno2, req);
430                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
431                                 active += 1;
432                         }
433
434                         req = xseg_receive(xseg, portno2);
435                         if (req) {
436                                 xseg_respond(xseg, portno1, req);
437                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
438                                 active += 1;
439                         }
440
441                         if (active == 0) {
442                                 if (reloop)
443                                         break;
444                                 /* wait on multiple queues? */
445                                 xseg_wait_signal(xseg, 100000);
446                                 break;
447                         } else {
448                                 xseg_cancel_wait(xseg, portno1);        
449                                 xseg_cancel_wait(xseg, portno2);        
450                                 reloop = 1;
451                         }
452                 }
453         }
454
455         close(logfd);
456
457         return 0;
458 }
459
460 int cmd_rndwrite(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size)
461 {
462         if (loops < 0)
463                 return help();
464
465         if (namesize >= chunksize) {
466                 fprintf(stderr, "namesize >= chunksize\n");
467                 return -1;
468         }
469
470         char *p = realloc(namebuf, namesize+1);
471         if (!p) {
472                 fprintf(stderr, "Cannot allocate memory\n");
473                 return -1;
474         }
475         namebuf = p;
476
477         p = realloc(chunk, chunksize);
478         if (!p) {
479                 fprintf(stderr, "Cannot allocate memory\n");
480                 return -1;
481         }
482         chunk = p;
483         memset(chunk, 0, chunksize);
484
485         srandom(seed);
486
487         struct xseg_request *submitted = NULL, *received;
488         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
489         int reported = 0, r;
490         uint64_t offset;
491         xserial srl;
492
493         for (;;) {
494                 xseg_prepare_wait(xseg, srcport);
495                 if (nr_submitted < loops &&
496                     (submitted = xseg_get_request(xseg, srcport))) {
497                         xseg_cancel_wait(xseg, srcport);
498                         r = xseg_prep_request(submitted, namesize, chunksize);
499                         if (r < 0) {
500                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
501                                         namesize, chunksize);
502                                 xseg_put_request(xseg, submitted->portno, submitted);
503                                 return -1;
504                         }
505
506                         nr_submitted += 1;
507                         reported = 0;
508                         seed = random();
509                         mkname(namebuf, namesize, seed);
510                         namebuf[namesize] = 0;
511                         //printf("%ld: %s\n", nr_submitted, namebuf);
512                         strncpy(submitted->name, namebuf, namesize);
513                         offset = 0;// pick(size);
514                         mkchunk(submitted->buffer, chunksize, namebuf, namesize, offset);
515
516                         submitted->offset = offset;
517                         submitted->size = chunksize;
518                         submitted->op = X_WRITE;
519                         submitted->flags |= XF_NOSYNC;
520
521                         srl = xseg_submit(xseg, dstport, submitted);
522                         (void)srl;
523                         xseg_signal(xseg, dstport);
524                 }
525
526                 received = xseg_receive(xseg, srcport);
527                 if (received) {
528                         xseg_cancel_wait(xseg, srcport);
529                         nr_received += 1;
530                         if (!(received->state & XS_SERVED)) {
531                                 nr_failed += 1;
532                                 report_request(received);
533                         }
534                         if (xseg_put_request(xseg, received->portno, received))
535                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
536                 }
537
538                 if (!submitted && !received)
539                         xseg_wait_signal(xseg, 1000000);
540
541                         if (nr_submitted % 1000 == 0 && !reported) {
542                                 reported = 1;
543                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
544                                         nr_submitted, nr_received, nr_failed);
545                         }
546
547                         if (nr_received >= loops)
548                                 break;
549         }
550
551         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
552                 nr_submitted, nr_received, nr_failed);
553         return 0;
554 }
555
556 /* note:
557  * prepare/wait rhythm,
558  * files are converted to independent chunk access patterns,
559 */
560
561 int cmd_rndread(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size)
562 {
563         if (loops < 0)
564                 return help();
565
566         if (namesize >= chunksize) {
567                 fprintf(stderr, "namesize >= chunksize\n");
568                 return -1;
569         }
570
571         char *p = realloc(namebuf, namesize+1);
572         if (!p) {
573                 fprintf(stderr, "Cannot allocate memory\n");
574                 return -1;
575         }
576         namebuf = p;
577
578         p = realloc(chunk, chunksize);
579         if (!p) {
580                 fprintf(stderr, "Cannot allocate memory\n");
581                 return -1;
582         }
583         chunk = p;
584         memset(chunk, 0, chunksize);
585
586         srandom(seed);
587
588         struct xseg_request *submitted = NULL, *received;
589         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
590         int reported = 0, r;
591         uint64_t offset;
592         xserial srl;
593
594         for (;;) {
595                 submitted = NULL;
596                 xseg_prepare_wait(xseg, srcport);
597                 if (nr_submitted < loops &&
598                     (submitted = xseg_get_request(xseg, srcport))) {
599                         xseg_cancel_wait(xseg, srcport);
600                         r = xseg_prep_request(submitted, namesize, chunksize);
601                         if (r < 0) {
602                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
603                                         namesize, chunksize);
604                                 xseg_put_request(xseg, submitted->portno, submitted);
605                                 return -1;
606                         }
607
608                         nr_submitted += 1;
609                         reported = 0;
610                         seed = random();
611                         mkname(namebuf, namesize, seed);
612                         namebuf[namesize] = 0;
613                         //printf("%ld: %s\n", nr_submitted, namebuf);
614                         offset = 0;//pick(size);
615
616                         strncpy(submitted->name, namebuf, namesize);
617                         submitted->offset = offset;
618                         submitted->size = chunksize;
619                         submitted->op = X_READ;
620
621                         srl = xseg_submit(xseg, dstport, submitted);
622                         (void)srl;
623                         xseg_signal(xseg, dstport);
624                 }
625
626                 received = xseg_receive(xseg, srcport);
627                 if (received) {
628                         xseg_cancel_wait(xseg, srcport);
629                         nr_received += 1;
630                         if (!(received->state & XS_SERVED)) {
631                                 nr_failed += 1;
632                                 report_request(received);
633                         } else if (!chkchunk(received->data, received->datasize,
634                                         received->name, received->namesize, received->offset)) {
635                                 nr_mismatch += 1;
636                         }
637
638                         if (xseg_put_request(xseg, received->portno, received))
639                                 fprintf(stderr, "Cannot put request at port %u\n", received->portno);
640                 }
641
642                 if (!submitted && !received)
643                         xseg_wait_signal(xseg, 1000000);
644
645                 if (nr_submitted % 1000 == 0 && !reported) {
646                         reported = 1;
647                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
648                         nr_submitted, nr_received, nr_failed, nr_mismatch);
649                 }
650
651                 if (nr_received >= loops)
652                         break;
653         }
654
655         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
656                 nr_submitted, nr_received, nr_failed, nr_mismatch);
657         return 0;
658 }
659
660 int cmd_report(uint32_t port)
661 {
662         struct xq *fq, *rq, *pq;
663         fq = &xseg->ports[port].free_queue;
664         rq = &xseg->ports[port].request_queue;
665         pq = &xseg->ports[port].reply_queue;
666         fprintf(stderr, "port %u:\n"
667                 "       free_queue [%p] count : %u\n"
668                 "    request_queue [%p] count : %u\n"
669                 "      reply_queue [%p] count : %u\n",
670                 port,
671                 (void *)fq, xq_count(fq),
672                 (void *)rq, xq_count(rq),
673                 (void *)pq, xq_count(pq));
674         return 0;
675 }
676
677 int cmd_join(void)
678 {
679         if (xseg)
680                 return 0;
681
682         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
683         if (!xseg) {
684                 fprintf(stderr, "cannot join segment!\n");
685                 return -1;
686         }
687         return 0;
688 }
689
690 int cmd_reportall(void)
691 {
692         uint32_t t;
693
694         if (cmd_join())
695                 return -1;
696
697         fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
698         for (t = 0; t < xseg->config.nr_ports; t++)
699                 cmd_report(t);
700
701         return 0;
702 }
703
704 int cmd_create(void)
705 {
706         int r = xseg_create(&cfg);
707         if (r) {
708                 fprintf(stderr, "cannot create segment!\n");
709                 return -1;
710         }
711
712         fprintf(stderr, "Segment initialized.\n");
713         return 0;
714 }
715
716 int cmd_destroy(void)
717 {
718         if (!xseg && cmd_join())
719                 return -1;
720         xseg_leave(xseg);
721         xseg_destroy(xseg);
722         xseg = NULL;
723         fprintf(stderr, "Segment destroyed.\n");
724         return 0;
725 }
726
727 int cmd_alloc_requests(unsigned long nr)
728 {
729         return xseg_alloc_requests(xseg, srcport, nr);
730 }
731
732 int cmd_free_requests(unsigned long nr)
733 {
734         return xseg_free_requests(xseg, srcport, nr);
735 }
736
737 int cmd_put_requests(void)
738 {
739         struct xseg_request *req;
740
741         for (;;) {
742                 req = xseg_accept(xseg, dstport);
743                 if (!req)
744                         break;
745                 if (xseg_put_request(xseg, req->portno, req))
746                         fprintf(stderr, "Cannot put request at port %u\n", req->portno);
747         }
748
749         return 0;
750 }
751
752 int cmd_finish(unsigned long nr, int fail)
753 {
754         struct xseg_request *req;
755
756         for (; nr--;) {
757                 req = xseg_accept(xseg, srcport);
758                 if (!req)
759                         break;
760                 if (fail)
761                         req->state &= ~XS_SERVED;
762                 else
763                         req->state |= XS_SERVED;
764                 xseg_respond(xseg, dstport, req);
765                 xseg_signal(xseg, dstport);
766         }
767
768         return 0;
769 }
770
771 void handle_reply(struct xseg_request *req)
772 {
773         if (!(req->state & XS_SERVED)) {
774                 report_request(req);
775                 goto put;
776         }
777
778         switch (req->op) {
779         case X_READ:
780                 fwrite(req->data, 1, req->datasize, stdout);
781                 break;
782
783         case X_WRITE:
784         case X_SYNC:
785         case X_DELETE:
786         case X_TRUNCATE:
787         case X_COMMIT:
788         case X_CLONE:
789         case X_INFO:
790                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req->data));
791                 break;
792
793         default:
794                 break;
795         }
796
797 put:
798         if (xseg_put_request(xseg, req->portno, req))
799                 fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
800 }
801
802 int cmd_wait(uint32_t nr)
803 {
804         struct xseg_request *req;
805         long ret;
806
807         for (;;) {
808                 req = xseg_receive(xseg, srcport);
809                 if (req) {
810                         handle_reply(req);
811                         nr--;
812                         if (nr == 0)
813                                 break;
814                         continue;
815                 }
816
817                 ret = xseg_prepare_wait(xseg, srcport);
818                 if (ret)
819                         return -1;
820
821                 ret = xseg_wait_signal(xseg, 1000000);
822                 ret = xseg_cancel_wait(xseg, srcport);
823                 if (ret)
824                         return -1;
825         }
826
827         return 0;
828 }
829
830 int cmd_put_replies(void)
831 {
832         struct xseg_request *req;
833
834         for (;;) {
835                 req = xseg_receive(xseg, dstport);
836                 if (!req)
837                         break;
838                 fprintf(stderr, "request: %08llx%08llx\n"
839                         "     op: %u\n"
840                         "  state: %u\n",
841                         0LL, (unsigned long long)req->serial,
842                         req->op,
843                         req->state);
844                 report_request(req);
845
846                 //fwrite(req->buffer, 1, req->buffersize, stdout);
847
848                 if (xseg_put_request(xseg, req->portno, req))
849                         fprintf(stderr, "Cannot put reply\n");
850         }
851
852         return 0;
853 }
854
855 int cmd_bind(long portno)
856 {
857         struct xseg_port *port = xseg_bind_port(xseg, portno);
858         if (!port) {
859                 fprintf(stderr, "failed to bind port %ld\n", portno);
860                 return 1;
861         }
862
863         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
864         return 0;
865 }
866
867 int cmd_signal(uint32_t portno)
868 {
869         return xseg_signal(xseg, portno);
870 }
871
872 int parse_ports(char *str)
873 {
874         int ret = 0;
875         char *s = str;
876
877         for (;;) {
878                 if (*s == 0)
879                         return 0;
880
881                 if (*s == ':') {
882                         *s = 0;
883                         if ((s > str) && isdigit(str[0])) {
884                                 srcport = atol(str);
885                                 ret ++;
886                         }
887                         break;
888                 }
889                 s ++;
890         }
891
892         s += 1;
893         str = s;
894
895         for (;;) {
896                 if (*s == 0) {
897                         if ((s > str) && isdigit(str[0])) {
898                                 dstport = atol(str);
899                                 ret ++;
900                         }
901                         break;
902                 }
903                 s ++;
904         }
905
906         return ret;
907 }
908
909 int main(int argc, char **argv)
910 {
911         int i, ret = 0;
912         char *spec;
913
914         if (argc < 3)
915                 return help();
916
917         srcport = -1;
918         dstport = -1;
919         spec = argv[1];
920
921         if (xseg_parse_spec(spec, &cfg)) {
922                 fprintf(stderr, "Cannot parse spec\n");
923                 return -1;
924         }
925
926         if (xseg_initialize()) {
927                 fprintf(stderr, "cannot initialize!\n");
928                 return -1;
929         }
930
931         for (i = 2; i < argc; i++) {
932
933                 if (!strcmp(argv[i], "create")) {
934                         ret = cmd_create();
935                         continue;
936                 }
937
938                 if (!strcmp(argv[i], "join")) {
939                         ret = cmd_join();
940                         if (!ret)
941                                 fprintf(stderr, "Segment joined.\n");
942                         continue;
943                 }
944
945                 if (!strcmp(argv[i], "destroy")) {
946                         ret = cmd_destroy();
947                         continue;
948                 }
949
950                 if (cmd_join())
951                         return -1;
952
953                 if (!strcmp(argv[i], "reportall")) {
954                         ret = cmd_reportall();
955                         continue;
956                 }
957
958                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
959                         ret = cmd_bind(atol(argv[i+1]));
960                         i += 1;
961                         continue;
962                 }
963
964                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
965                         ret = cmd_signal(atol(argv[i+1]));
966                         i += 1;
967                         continue;
968                 }
969
970                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
971                         ret = cmd_bridge(atol(argv[i+1]),
972                                          atol(argv[i+2]),
973                                          argv[i+3],
974                                          argv[i+4]);
975                         i += 4;
976                         continue;
977                 }
978
979                 if (srcport == -1) {
980                         if (!parse_ports(argv[i]))
981                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
982                         continue;
983                 }
984
985                 if (dstport == -1) {
986                         if (!parse_ports(argv[i]))
987                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
988                         continue;
989                 }
990
991                 if (!strcmp(argv[i], "report")) {
992                         ret = cmd_report(dstport);
993                         continue;
994                 }
995
996                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
997                         ret = cmd_alloc_requests(atol(argv[i+1]));
998                         i += 1;
999                         continue;
1000                 }
1001
1002                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1003                         ret = cmd_free_requests(atol(argv[i+1]));
1004                         i += 1;
1005                         continue;
1006                 }
1007
1008                 if (!strcmp(argv[i], "put_requests")) {
1009                         ret = cmd_put_requests();
1010                         continue;
1011                 }
1012
1013                 if (!strcmp(argv[i], "put_replies")) {
1014                         ret = cmd_put_replies();
1015                         continue;
1016                 }
1017
1018                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1019                         ret = cmd_finish(atol(argv[i+1]), 0);
1020                         i += 1;
1021                         continue;
1022                 }
1023
1024                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1025                         ret = cmd_finish(atol(argv[i+1]), 1);
1026                         i += 1;
1027                         continue;
1028                 }
1029
1030                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1031                         ret = cmd_wait(atol(argv[i+1]));
1032                         i += 1;
1033                         continue;
1034                 }
1035
1036                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1037                         long nr_loops = atol(argv[i+1]);
1038                         unsigned int seed = atoi(argv[i+2]);
1039                         unsigned int namesize = atoi(argv[i+3]);
1040                         unsigned int chunksize = atoi(argv[i+4]);
1041                         unsigned long objectsize = atol(argv[i+5]);
1042                         ret = cmd_rndwrite(nr_loops, seed, namesize, chunksize, objectsize);
1043                         i += 5;
1044                         continue;
1045                 }
1046
1047                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1048                         long nr_loops = atol(argv[i+1]);
1049                         unsigned int seed = atoi(argv[i+2]);
1050                         unsigned int namesize = atoi(argv[i+3]);
1051                         unsigned int chunksize = atoi(argv[i+4]);
1052                         unsigned long objectsize = atol(argv[i+5]);
1053                         ret = cmd_rndread(nr_loops, seed, namesize, chunksize, objectsize);
1054                         i += 5;
1055                         continue;
1056                 }
1057
1058                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1059                         char *name = argv[i+1];
1060                         uint64_t offset = atol(argv[i+2]);
1061                         uint64_t size   = atol(argv[i+3]);
1062                         ret = cmd_read(name, offset, size);
1063                         i += 3;
1064                         continue;
1065                 }
1066
1067                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1068                         char *name = argv[i+1];
1069                         uint64_t offset = atol(argv[i+2]);
1070                         ret = cmd_write(name, offset);
1071                         i += 2;
1072                         continue;
1073                 }
1074
1075                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1076                         char *name = argv[i+1];
1077                         uint64_t offset = atol(argv[i+2]);
1078                         ret = cmd_truncate(name, offset);
1079                         i += 2;
1080                         continue;
1081                 }
1082
1083                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1084                         char *name = argv[i+1];
1085                         ret = cmd_delete(name);
1086                         i += 1;
1087                         continue;
1088                 }
1089
1090                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1091                         char *name = argv[i+1];
1092                         ret = cmd_acquire(name);
1093                         i += 1;
1094                         continue;
1095                 }
1096
1097                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1098                         char *name = argv[i+1];
1099                         ret = cmd_release(name);
1100                         i += 1;
1101                         continue;
1102                 }
1103
1104                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1105                         char *src = argv[i+1];
1106                         char *dst = argv[i+2];
1107                         ret = cmd_copy(src, dst);
1108                         i += 2;
1109                         continue;
1110                 }
1111
1112                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1113                         char *src = argv[i+1];
1114                         char *dst = argv[i+2];
1115                         ret = cmd_clone(src, dst);
1116                         i += 2;
1117                         continue;
1118                 }
1119
1120                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1121                         char *name = argv[i+1];
1122                         ret = cmd_info(name);
1123                         i += 1;
1124                         continue;
1125                 }
1126
1127
1128                 if (!parse_ports(argv[i]))
1129                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1130         }
1131
1132         /* xseg_leave(); */
1133         return ret;
1134 }