fix various bugs in mt-mapperd.
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xseg/xseg.h>
13 #include <xseg/protocol.h>
14 int help(void)
15 {
16         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
17                 "spec:\n"
18                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
19                 "global commands:\n"
20                 "    reportall\n"
21                 "    create\n"
22                 "    destroy\n"
23                 "    bind <portno>\n"
24                 "    signal <portno>\n"
25                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
26                 "port commands:\n"
27                 "    report\n"
28                 "    alloc_requests (to source) <nr>\n"
29                 "    free_requests (from source) <nr>\n"
30                 "    put_requests (all from dest)\n"
31                 "    put_replies (all from dest)\n"
32                 "    wait        <nr_replies>\n"
33                 "    complete    <nr_requests>\n"
34                 "    fail        <nr_requests>\n"
35                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
36                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
37                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
38                 "    info        <target>\n"
39                 "    read        <target> <offset> <size>\n"
40                 "    write       <target> <offset> < data\n"
41                 "    truncate    <target> <size>\n"
42                 "    delete      <target>\n"
43                 "    acquire     <target>\n"
44                 "    release     <target>\n"
45                 "    copy        <src>  <dst>\n"
46                 "    clone       <src>  <dst>\n"
47         );
48         return 1;
49 }
50
51 char *namebuf;
52 char *chunk;
53 struct xseg_config cfg;
54 struct xseg *xseg;
55 uint32_t srcport, dstport;
56 uint64_t reqs;
57 #define mkname mkname_heavy
58 /* heavy distributes duplicates much more widely than light
59  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
60  */
61
62 xport sport = NoPort;
63 static void init_local_signal() 
64 {
65         if (xseg && sport != srcport){
66                 xseg_init_local_signal(xseg, srcport);
67                 sport = srcport;
68         }
69 }
70
71 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
72 {
73         int i;
74         char c;
75         for (i = 0; i < namelen; i += 1) {
76                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
77                 c = '0' + ((c + (c >> 4)) & 0xf);
78                 if (c > '9')
79                         c += 'a'-'0'-10;
80                 name[i] = c;
81                 seed *= ((seed % 137911) | 1) * 137911;
82         }
83 }
84
85 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
86 {
87         int i;
88         char c;
89         for (i = 0; i < namelen; i += 1) {
90                 c = seed;
91                 name[i] = 'A' + (c & 0xf);
92                 seed += 1;
93         }
94 }
95
96 uint64_t pick(uint64_t size)
97 {
98         return (uint64_t)((double)(RAND_MAX) / random());
99 }
100
101 void mkchunk(   char *chunk, uint32_t datalen,
102                 char *target, uint32_t targetlen, uint64_t offset)
103 {
104         long i, r, bufsize = targetlen + 16;
105         char buf[bufsize];
106         r = datalen % bufsize;
107         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
108
109         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
110                 memcpy(chunk + i, buf, bufsize);
111
112         memcpy(chunk + datalen - r, buf, r);
113 }
114
115 int chkchunk(   char *chunk, uint32_t datalen,
116                 char *target, uint32_t targetlen, uint64_t offset)
117 {
118         long i, r;
119         int bufsize = targetlen + 16;
120         char buf[bufsize];
121         r = datalen % targetlen;
122         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
123
124         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125                 if (memcmp(chunk + i, buf, bufsize)) {
126                         /*printf("mismatch: '%*s'* vs '%*s'\n",
127                                 bufsize, buf, datalen, chunk);
128                         */
129                         return 0;
130                 }
131
132         if (memcmp(chunk + datalen - r, buf, r))
133                 return 0;
134
135         return 1;
136 }
137
138
139 #define ALLOC_MIN 4096
140 #define ALLOC_MAX 1048576
141
142 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
143 {
144         static uint64_t alloc_size;
145         static char *buf;
146         uint64_t size = 0;
147         char *p;
148         size_t r;
149
150         if (alloc_size < ALLOC_MIN)
151                 alloc_size = ALLOC_MIN;
152
153         if (alloc_size > ALLOC_MAX)
154                 alloc_size = ALLOC_MAX;
155
156         p = realloc(buf, alloc_size);
157         if (!p) {
158                 if (buf)
159                         free(buf);
160                 buf = NULL;
161                 goto out;
162         }
163
164         buf = p;
165
166         while (!feof(fp)) {
167                 r = fread(buf + size, 1, alloc_size - size, fp);
168                 if (!r)
169                         break;
170                 size += r;
171                 if (size >= alloc_size) {
172                         p = realloc(buf, alloc_size * 2);
173                         if (!p) {
174                                 if (buf)
175                                         free(buf);
176                                 buf = NULL;
177                                 size = 0;
178                                 goto out;
179                         }
180                         buf = p;
181                         alloc_size *= 2;
182                 }
183         }
184
185 out:
186         *retbuf = buf;
187         *retsize = size;
188 }
189
190 void report_request(struct xseg_request *req)
191 {
192         uint32_t max = req->datalen;
193         char *data = xseg_get_data(xseg, req);
194         if (max > 128)
195                 max = 128;
196         data[max-1] = 0;
197         fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
198 }
199
200 int cmd_info(char *target)
201 {
202         uint32_t targetlen = strlen(target);
203         size_t size = sizeof(uint64_t);
204         int r;
205         xport p;
206         struct xseg_request *req;
207         char *req_target;
208
209         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
210         if (!req) {
211                 fprintf(stderr, "No request!\n");
212                 return -1;
213         }
214
215         r = xseg_prep_request(xseg, req, targetlen, size);
216         if (r < 0) {
217                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
218                         (unsigned long) targetlen, (unsigned long) size);
219                 xseg_put_request(xseg, req, srcport);
220                 return -1;
221         }
222
223         req_target = xseg_get_target(xseg, req);
224         strncpy(req_target, target, targetlen);
225         req->offset = 0;
226         req->size = size;
227         req->op = X_INFO;
228
229         p = xseg_submit(xseg, req, srcport, X_ALLOC);
230         if (p == NoPort)
231                 return -1;
232
233         xseg_signal(xseg, p);
234
235         return 0;
236 }
237
238 int cmd_read(char *target, uint64_t offset, uint64_t size)
239 {
240         uint32_t targetlen = strlen(target);
241         int r;
242         xport p;
243         char *req_target;
244         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
245         printf("%x\n", req);
246         if (!req) {
247                 fprintf(stderr, "No request\n");
248                 return -1;
249         }
250
251         r = xseg_prep_request(xseg, req, targetlen, size);
252         if (r < 0) {
253                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
254                         (unsigned long)targetlen, (unsigned long long)size);
255                 xseg_put_request(xseg, req, srcport);
256                 return -1;
257         }
258
259         req_target = xseg_get_target(xseg, req);
260         strncpy(req_target, target, targetlen);
261         req->offset = offset;
262         req->size = size;
263         req->op = X_READ;
264         report_request(req);
265         p = xseg_submit(xseg, req, srcport, X_ALLOC);
266         if (p == NoPort)
267                 return -1;
268
269         xseg_signal(xseg, p);
270         return 0;
271 }
272
273 int cmd_write(char *target, uint64_t offset)
274 {
275         char *buf = NULL;
276         int r;
277         xport p;
278         uint64_t size = 0;
279         char *req_target, *req_data;
280         uint32_t targetlen = strlen(target);
281         struct xseg_request *req;
282
283         inputbuf(stdin, &buf, &size);
284         if (!size) {
285                 fprintf(stderr, "No input\n");
286                 return -1;
287         }
288
289         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
290         if (!req) {
291                 fprintf(stderr, "No request\n");
292                 return -1;
293         }
294
295         r = xseg_prep_request(xseg, req, targetlen, size);
296         if (r < 0) {
297                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
298                         (unsigned long)targetlen, (unsigned long long)size);
299                 xseg_put_request(xseg, req, srcport);
300                 return -1;
301         }
302
303         req_target = xseg_get_target(xseg, req);
304         strncpy(req_target, target, targetlen);
305         
306         req_data = xseg_get_data(xseg, req);
307         memcpy(req_data, buf, size);
308         req->offset = offset;
309         req->size = size;
310         req->op = X_WRITE;
311
312         p = xseg_submit(xseg, req, srcport, X_ALLOC);
313         if (p == NoPort) {
314                 fprintf(stderr, "Cannot submit\n");
315                 return -1;
316         }
317         xseg_signal(xseg, p);
318
319         return 0;
320 }
321
322 int cmd_truncate(char *target, uint64_t offset)
323 {
324         return 0;
325 }
326
327 int cmd_delete(char *target)
328 {
329         return 0;
330 }
331
332 int cmd_acquire(char *target)
333 {
334         return 0;
335 }
336
337 int cmd_release(char *target)
338 {
339         return 0;
340 }
341
342 int cmd_copy(char *src, char *dst)
343 {
344         return 0;
345 }
346
347 int cmd_clone(char *src, char *dst)
348 {
349
350         uint32_t targetlen = strlen(dst);
351         struct xseg_request *req;
352         struct xseg_request_clone *xclone;
353         xseg_bind_port(xseg, srcport);
354         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
355         if (!req) {
356                 fprintf(stderr, "No request\n");
357                 return -1;
358         }
359
360         int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
361         if (r < 0) {
362                 fprintf(stderr, "Cannot prepare request!\n");
363                 xseg_put_request(xseg, req, srcport);
364                 return -1;
365         }
366
367         char *target = xseg_get_target(xseg, req);
368         char *data = xseg_get_data(xseg, req);
369
370         strncpy(target, dst, targetlen);
371         xclone = (struct xseg_request_clone *) data;
372         strncpy(xclone->target, src, 128);
373         xclone->size = 400 * (1 << 20);
374         req->offset = 0;
375         req->size = sizeof(struct xseg_request_clone);
376         req->op = X_CLONE;
377
378         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
379         if (p == NoPort){
380                 fprintf(stderr, "Cannot submit request\n");
381                 return -1;
382         }
383         xseg_signal(xseg, p);
384
385         return 0;
386 }
387
388 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
389                 struct xseg_request *req)
390 {
391         FILE *logfp;
392         char target[64], data[64];
393         char *req_target, *req_data;
394         /* null terminate name in case of req->target is less than 63 characters,
395          * and next character after name (aka first byte of next buffer) is not
396          * null
397          */
398         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
399         
400         req_target = xseg_get_target(xseg, req);
401         req_data = xseg_get_data(xseg, req);
402
403         logfp = fdopen(logfd, "a");
404         if (!logfp)
405                 return;
406
407         switch(method) {
408         case 0:
409                 strncpy(target, req_target, end);
410                 target[end] = 0;
411                 strncpy(data, req_data, 63);
412                 data[63] = 0;
413
414                 fprintf(logfp,
415                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
416                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
417                         (unsigned int)portno1,
418                         (unsigned int)portno2,
419                         (unsigned int)req->op,
420                         (unsigned long long)req->offset,
421                         (unsigned long)req->size,
422                         (unsigned int)req->state,
423                         (unsigned int)req->targetlen, target,
424                         (unsigned long long)req->datalen, data);
425                 break;
426         case 1:
427                 fprintf(logfp,
428                         "src port: %u, dst port: %u, op: %u\n",
429                         (unsigned int)portno1,
430                         (unsigned int)portno2,
431                         (unsigned int)req->op);
432                 break;
433         case 2:
434                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
435                         (unsigned int)portno1,
436                         (unsigned int)portno2,
437                         (unsigned long long)++reqs);
438         }
439
440         fclose(logfp);
441         return;
442 }
443
444 #define LOG_ACCEPT  0
445 #define LOG_RECEIVE 1
446
447 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
448 {
449         struct xseg_request *req;
450         int logfd, method;
451         if (!strcmp(logfile, "-"))
452                 logfd = 1;
453         else {
454                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
455                 if (logfd < 0) {
456                         perror(logfile);
457                         return -1;
458                 }
459         }
460
461         if (!strcmp(how, "full"))
462                 method = 0;
463         else if (!strcmp(how, "summary"))
464                 method = 1;
465         else
466                 method = 2;
467
468         for (;;) {
469                 int reloop = 0, active;
470                 xseg_prepare_wait(xseg, portno1);
471                 xseg_prepare_wait(xseg, portno2);
472                 req = NULL;
473
474                 for (;;) {
475                         active = 0;
476
477                         //FIXME
478                         req = xseg_accept(xseg, portno1);
479                         if (req) {
480                                 xseg_submit(xseg, req, portno2, X_ALLOC);
481                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
482                                 active += 1;
483                         }
484
485                         req = xseg_accept(xseg, portno2);
486                         if (req) {
487                                 xseg_submit(xseg, req, portno1, X_ALLOC);
488                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
489                                 active += 1;
490                         }
491
492                         req = xseg_receive(xseg, portno1);
493                         if (req) {
494                                 xseg_respond(xseg, req, portno2, X_ALLOC);
495                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
496                                 active += 1;
497                         }
498
499                         req = xseg_receive(xseg, portno2);
500                         if (req) {
501                                 xseg_respond(xseg, req, portno1, X_ALLOC);
502                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
503                                 active += 1;
504                         }
505
506                         if (active == 0) {
507                                 if (reloop)
508                                         break;
509                                 /* wait on multiple queues? */
510                                 xseg_wait_signal(xseg, 100000);
511                                 break;
512                         } else {
513                                 xseg_cancel_wait(xseg, portno1);        
514                                 xseg_cancel_wait(xseg, portno2);        
515                                 reloop = 1;
516                         }
517                 }
518         }
519
520         close(logfd);
521
522         return 0;
523 }
524
525 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
526 {
527         if (loops < 0)
528                 return help();
529
530         if (targetlen >= chunksize) {
531                 fprintf(stderr, "targetlen >= chunksize\n");
532                 return -1;
533         }
534
535         char *p = realloc(namebuf, targetlen+1);
536         if (!p) {
537                 fprintf(stderr, "Cannot allocate memory\n");
538                 return -1;
539         }
540         namebuf = p;
541
542         p = realloc(chunk, chunksize);
543         if (!p) {
544                 fprintf(stderr, "Cannot allocate memory\n");
545                 return -1;
546         }
547         chunk = p;
548         memset(chunk, 0, chunksize);
549
550         srandom(seed);
551
552         struct xseg_request *submitted = NULL, *received;
553         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
554         int reported = 0, r;
555         uint64_t offset;
556         xport port;
557         char *req_data, *req_target;
558         seed = random();
559         init_local_signal();
560
561         for (;;) {
562                 xseg_prepare_wait(xseg, srcport);
563                 if (nr_submitted < loops &&
564                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
565                         xseg_cancel_wait(xseg, srcport);
566                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
567                         if (r < 0) {
568                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
569                                         targetlen, chunksize);
570                                 xseg_put_request(xseg, submitted, srcport);
571                                 return -1;
572                         }
573                         
574                         req_target = xseg_get_target(xseg, submitted);
575                         req_data = xseg_get_data(xseg, submitted);
576
577                         reported = 0;
578                         mkname(namebuf, targetlen, seed);
579                         namebuf[targetlen] = 0;
580                         //printf("%ld: %s\n", nr_submitted, namebuf);
581                         strncpy(req_target, namebuf, targetlen);
582                         offset = 0;// pick(size);
583                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
584
585                         submitted->offset = offset;
586                         submitted->size = chunksize;
587                         submitted->op = X_WRITE;
588                         submitted->flags |= XF_NOSYNC;
589
590                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
591                         if (port == NoPort) {
592                                 xseg_put_request(xseg, submitted, srcport);
593                         } else {
594                                 seed = random();
595                                 nr_submitted += 1;
596                                 xseg_signal(xseg, port);
597                         }
598                 }
599
600                 received = xseg_receive(xseg, srcport);
601                 if (received) {
602                         xseg_cancel_wait(xseg, srcport);
603                         nr_received += 1;
604                         if (!(received->state & XS_SERVED)) {
605                                 nr_failed += 1;
606                                 report_request(received);
607                         }
608                         if (xseg_put_request(xseg, received, srcport))
609                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
610                 }
611
612                 if (!submitted && !received)
613                         xseg_wait_signal(xseg, 1000000);
614
615                         if (nr_submitted % 1000 == 0 && !reported) {
616                                 reported = 1;
617                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
618                                         nr_submitted, nr_received, nr_failed);
619                         }
620
621                         if (nr_received >= loops)
622                                 break;
623         }
624
625         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
626                 nr_submitted, nr_received, nr_failed);
627         return 0;
628 }
629
630 /* note:
631  * prepare/wait rhythm,
632  * files are converted to independent chunk access patterns,
633 */
634
635 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
636 {
637         if (loops < 0)
638                 return help();
639
640         if (targetlen >= chunksize) {
641                 fprintf(stderr, "targetlen >= chunksize\n");
642                 return -1;
643         }
644
645         char *p = realloc(namebuf, targetlen+1);
646         if (!p) {
647                 fprintf(stderr, "Cannot allocate memory\n");
648                 return -1;
649         }
650         namebuf = p;
651
652         p = realloc(chunk, chunksize);
653         if (!p) {
654                 fprintf(stderr, "Cannot allocate memory\n");
655                 return -1;
656         }
657         chunk = p;
658         memset(chunk, 0, chunksize);
659
660         srandom(seed);
661
662         struct xseg_request *submitted = NULL, *received;
663         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
664         int reported = 0, r;
665         uint64_t offset;
666         xport port;
667         char *req_data, *req_target;
668         init_local_signal();
669
670         seed = random();
671         for (;;) {
672                 submitted = NULL;
673                 xseg_prepare_wait(xseg, srcport);
674                 if (nr_submitted < loops &&
675                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
676                         xseg_cancel_wait(xseg, srcport);
677                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
678                         if (r < 0) {
679                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
680                                         targetlen, chunksize);
681                                 xseg_put_request(xseg, submitted, srcport);
682                                 return -1;
683                         }
684
685                         req_target = xseg_get_target(xseg, submitted);
686                         reported = 0;
687                         mkname(namebuf, targetlen, seed);
688                         namebuf[targetlen] = 0;
689                         //printf("%ld: %s\n", nr_submitted, namebuf);
690                         offset = 0;//pick(size);
691
692                         strncpy(req_target, namebuf, targetlen);
693                         submitted->offset = offset;
694                         submitted->size = chunksize;
695                         submitted->op = X_READ;
696                         port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
697                         if (port == NoPort) {
698                                 xseg_put_request(xseg, submitted, srcport);
699                         } else {
700                                 seed = random();
701                                 nr_submitted += 1;
702                                 xseg_signal(xseg, port);
703                         }
704                 }
705
706                 received = xseg_receive(xseg, srcport);
707                 if (received) {
708                         xseg_cancel_wait(xseg, srcport);
709                         nr_received += 1;
710                         req_target = xseg_get_target(xseg, received);
711                         req_data = xseg_get_data(xseg, received);
712                         if (!(received->state & XS_SERVED)) {
713                                 nr_failed += 1;
714                                 report_request(received);
715                         } else if (!chkchunk(req_data, received->datalen,
716                                         req_target, received->targetlen, received->offset)) {
717         //                      report_request(received);
718                                 nr_mismatch += 1;
719                         }
720
721                         if (xseg_put_request(xseg, received, srcport))
722                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
723                 }
724
725                 if (!submitted && !received)
726                         xseg_wait_signal(xseg, 1000000);
727
728                 if (nr_submitted % 1000 == 0 && !reported) {
729                         reported = 1;
730                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
731                         nr_submitted, nr_received, nr_failed, nr_mismatch);
732                 }
733
734                 if (nr_received >= loops)
735                         break;
736         }
737
738         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
739                 nr_submitted, nr_received, nr_failed, nr_mismatch);
740         return 0;
741 }
742
743 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
744 {
745         if (loops < 0)
746                 return help();
747
748         struct xseg_request *submitted = NULL, *received;
749         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
750         int reported = 0, r;
751         uint64_t offset;
752         uint32_t targetlen = 10, chunksize = 4096;
753         struct timeval tv1, tv2;
754         xport p;
755         char *req_data, *req_target;
756
757         xseg_bind_port(xseg, srcport);
758
759         gettimeofday(&tv1, NULL);
760         for (;;) {
761                 submitted = NULL;
762                 xseg_prepare_wait(xseg, srcport);
763                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
764                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
765                         xseg_cancel_wait(xseg, srcport);
766                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
767                         if (r < 0) {
768                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
769                                         targetlen, chunksize);
770                                 xseg_put_request(xseg, submitted, srcport);
771                                 return -1;
772                         }
773                         
774                         //FIXME
775                         ++nr_flying;
776                         nr_submitted += 1;
777                         reported = 0;
778                         offset = 0;//pick(size);
779
780                         submitted->offset = offset;
781                         submitted->size = chunksize;
782                         req_target = xseg_get_target(xseg, submitted);
783                         req_data = xseg_get_data(xseg, submitted);
784
785                         if (op == 0)
786                                 submitted->op = X_INFO;
787                         else if (op == 1)
788                                 submitted->op = X_READ;
789                         else if (op == 2) {
790                                 submitted->op = X_WRITE;
791                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
792                         }
793
794                         p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
795                         if ( p != NoPort){
796                                 if (xseg_signal(xseg, p) < 0)
797                                         perror("Cannot signal peer");
798                         }
799                 }
800                 received = xseg_receive(xseg, srcport);
801                 if (received) {
802                         xseg_cancel_wait(xseg, srcport);
803                         --nr_flying;
804                         if (nr_received == 0)
805                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
806                                         (unsigned long long)received->elapsed);
807                         nr_received += 1;
808                         if (!(received->state & XS_SERVED)) {
809                                 nr_failed += 1;
810                                 //report_request(received);
811                         }
812
813                         if (xseg_put_request(xseg, received, srcport))
814                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
815                 }
816
817                 if (!submitted && !received)
818                         xseg_wait_signal(xseg, 10000000L);
819
820                 if (nr_received >= loops)
821                         break;
822         }
823         gettimeofday(&tv2, NULL);
824
825         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
826                 nr_submitted, nr_received, nr_failed, nr_mismatch);
827         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
828         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
829
830         return 0;
831 }
832
833 int cmd_report(uint32_t portno)
834 {
835         struct xseg_port *port = xseg_get_port(xseg, portno);
836         if (!port) {
837                 printf("port %u is not assigned\n", portno);
838                 return 0;
839         }
840         struct xq *fq, *rq, *pq;
841         fq = xseg_get_queue(xseg, port, free_queue);
842         rq = xseg_get_queue(xseg, port, request_queue);
843         pq = xseg_get_queue(xseg, port, reply_queue);
844         fprintf(stderr, "port %u:\n"
845                 "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
846                 "       free_queue [%p] count : %u\n"
847                 "    request_queue [%p] count : %u\n"
848                 "      reply_queue [%p] count : %u\n",
849                 portno, port->alloc_reqs, port->max_alloc_reqs,
850                 xseg->src_gw[portno], xseg->dst_gw[portno],
851                 (void *)fq, xq_count(fq),
852                 (void *)rq, xq_count(rq),
853                 (void *)pq, xq_count(pq));
854         return 0;
855 }
856
857 int cmd_join(void)
858 {
859         if (xseg)
860                 return 0;
861
862         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
863         if (!xseg) {
864                 fprintf(stderr, "cannot join segment!\n");
865                 return -1;
866         }
867         return 0;
868 }
869
870 int cmd_reportall(void)
871 {
872         uint32_t t;
873
874         if (cmd_join())
875                 return -1;
876
877
878         fprintf(stderr, "Heap usage: %llu / %llu\n", xseg->heap->cur, xseg->config.heap_size);
879         for (t = 0; t < xseg->config.nr_ports; t++)
880                 cmd_report(t);
881
882         return 0;
883 }
884
885 int cmd_create(void)
886 {
887         int r = xseg_create(&cfg);
888         if (r) {
889                 fprintf(stderr, "cannot create segment!\n");
890                 return -1;
891         }
892
893         fprintf(stderr, "Segment initialized.\n");
894         return 0;
895 }
896
897 int cmd_destroy(void)
898 {
899         if (!xseg && cmd_join())
900                 return -1;
901         xseg_leave(xseg);
902         xseg_destroy(xseg);
903         xseg = NULL;
904         fprintf(stderr, "Segment destroyed.\n");
905         return 0;
906 }
907
908 int cmd_alloc_requests(unsigned long nr)
909 {
910         return xseg_alloc_requests(xseg, srcport, nr);
911 }
912
913 int cmd_free_requests(unsigned long nr)
914 {
915         return xseg_free_requests(xseg, srcport, nr);
916 }
917
918 int cmd_put_requests(void)
919 {
920         struct xseg_request *req;
921
922         for (;;) {
923                 req = xseg_accept(xseg, dstport);
924                 if (!req)
925                         break;
926                 if (xseg_put_request(xseg, req, srcport))
927                         fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
928         }
929
930         return 0;
931 }
932
933 int cmd_finish(unsigned long nr, int fail)
934 {
935         struct xseg_request *req;
936         char *buf = malloc(sizeof(char) * 8128);
937         char *req_target, *req_data;
938         xseg_bind_port(xseg, srcport);
939         xport p;
940
941         for (; nr--;) {
942                 xseg_prepare_wait(xseg, srcport);
943                 req = xseg_accept(xseg, srcport);
944                 if (req) {
945                         req_target = xseg_get_target(xseg, req);
946                         req_data = xseg_get_data(xseg, req);
947                         xseg_cancel_wait(xseg, srcport);
948                         if (fail == 1)
949                                 req->state &= ~XS_SERVED;
950                         else {
951                                 if (req->op == X_READ)
952                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
953                                 else if (req->op == X_WRITE) 
954                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
955                                 else if (req->op == X_INFO)
956                                         *((uint64_t *) req->data) = 4294967296;
957                                 
958                                 req->state |= XS_SERVED;
959                                 req->serviced = req->size;
960                         }
961
962                         p = xseg_respond(xseg, req, srcport, X_ALLOC);
963                         xseg_signal(xseg, p);
964                         continue;
965                 }
966                 ++nr;
967                 xseg_wait_signal(xseg, 10000000L);
968         }
969
970         free(buf);
971
972         return 0;
973 }
974
975 void handle_reply(struct xseg_request *req)
976 {
977         char *req_data = xseg_get_data(xseg, req);
978         if (!(req->state & XS_SERVED)) {
979                 report_request(req);
980                 goto put;
981         }
982
983         switch (req->op) {
984         case X_READ:
985                 fwrite(req_data, 1, req->datalen, stdout);
986                 break;
987
988         case X_WRITE:
989                 fprintf(stdout, "wrote: ");
990                 fwrite(req_data, 1, req->datalen, stdout);
991                 break;
992         case X_SYNC:
993         case X_DELETE:
994         case X_TRUNCATE:
995         case X_COMMIT:
996         case X_CLONE:
997                 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
998                 break;
999         case X_INFO:
1000                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1001                 break;
1002
1003         default:
1004                 break;
1005         }
1006
1007 put:
1008         if (xseg_put_request(xseg, req, srcport))
1009                 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1010 }
1011
1012 int cmd_wait(uint32_t nr)
1013 {
1014         struct xseg_request *req;
1015         long ret;
1016         init_local_signal(); 
1017
1018         for (;;) {
1019                 req = xseg_receive(xseg, srcport);
1020                 if (req) {
1021                         handle_reply(req);
1022                         nr--;
1023                         if (nr == 0)
1024                                 break;
1025                         continue;
1026                 }
1027
1028                 ret = xseg_prepare_wait(xseg, srcport);
1029                 if (ret)
1030                         return -1;
1031
1032                 ret = xseg_wait_signal(xseg, 1000000);
1033                 ret = xseg_cancel_wait(xseg, srcport);
1034                 if (ret)
1035                         return -1;
1036         }
1037
1038         return 0;
1039 }
1040
1041 int cmd_put_replies(void)
1042 {
1043         struct xseg_request *req;
1044
1045         for (;;) {
1046                 req = xseg_receive(xseg, dstport);
1047                 if (!req)
1048                         break;
1049                 fprintf(stderr, "request: %08llx%08llx\n"
1050                         "     op: %u\n"
1051                         "  state: %u\n",
1052                         0LL, (unsigned long long)req->serial,
1053                         req->op,
1054                         req->state);
1055                 report_request(req);
1056
1057                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1058
1059                 if (xseg_put_request(xseg, req, srcport))
1060                         fprintf(stderr, "Cannot put reply\n");
1061         }
1062
1063         return 0;
1064 }
1065
1066 int cmd_bind(long portno)
1067 {
1068         struct xseg_port *port = xseg_bind_port(xseg, portno);
1069         if (!port) {
1070                 fprintf(stderr, "failed to bind port %ld\n", portno);
1071                 return 1;
1072         }
1073
1074         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1075         return 0;
1076 }
1077
1078 int cmd_signal(uint32_t portno)
1079 {
1080         return xseg_signal(xseg, portno);
1081 }
1082
1083 int parse_ports(char *str)
1084 {
1085         int ret = 0;
1086         char *s = str;
1087
1088         for (;;) {
1089                 if (*s == 0)
1090                         return 0;
1091
1092                 if (*s == ':') {
1093                         *s = 0;
1094                         if ((s > str) && isdigit(str[0])) {
1095                                 srcport = atol(str);
1096                                 ret ++;
1097                         }
1098                         break;
1099                 }
1100                 s ++;
1101         }
1102
1103         s += 1;
1104         str = s;
1105
1106         for (;;) {
1107                 if (*s == 0) {
1108                         if ((s > str) && isdigit(str[0])) {
1109                                 dstport = atol(str);
1110                                 ret ++;
1111                         }
1112                         break;
1113                 }
1114                 s ++;
1115         }
1116
1117         return ret;
1118 }
1119
1120 int main(int argc, char **argv)
1121 {
1122         int i, ret = 0;
1123         char *spec;
1124
1125         if (argc < 3)
1126                 return help();
1127
1128         srcport = -1;
1129         dstport = -1;
1130         spec = argv[1];
1131
1132         if (xseg_parse_spec(spec, &cfg)) {
1133                 fprintf(stderr, "Cannot parse spec\n");
1134                 return -1;
1135         }
1136
1137         if (xseg_initialize()) {
1138                 fprintf(stderr, "cannot initialize!\n");
1139                 return -1;
1140         }
1141
1142         for (i = 2; i < argc; i++) {
1143
1144                 if (!strcmp(argv[i], "create")) {
1145                         ret = cmd_create();
1146                         continue;
1147                 }
1148
1149                 if (!strcmp(argv[i], "join")) {
1150                         ret = cmd_join();
1151                         if (!ret)
1152                                 fprintf(stderr, "Segment joined.\n");
1153                         continue;
1154                 }
1155
1156                 if (!strcmp(argv[i], "destroy")) {
1157                         ret = cmd_destroy();
1158                         continue;
1159                 }
1160
1161                 if (cmd_join())
1162                         return -1;
1163
1164                 if (!strcmp(argv[i], "reportall")) {
1165                         ret = cmd_reportall();
1166                         continue;
1167                 }
1168
1169                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1170                         ret = cmd_bind(atol(argv[i+1]));
1171                         i += 1;
1172                         continue;
1173                 }
1174
1175                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1176                         ret = cmd_signal(atol(argv[i+1]));
1177                         i += 1;
1178                         continue;
1179                 }
1180
1181                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1182                         ret = cmd_bridge(atol(argv[i+1]),
1183                                          atol(argv[i+2]),
1184                                          argv[i+3],
1185                                          argv[i+4]);
1186                         i += 4;
1187                         continue;
1188                 }
1189
1190                 if (srcport == -1) {
1191                         if (!parse_ports(argv[i]))
1192                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1193                         continue;
1194                 }
1195
1196                 if (dstport == -1) {
1197                         if (!parse_ports(argv[i]))
1198                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1199                         continue;
1200                 }
1201
1202                 if (!strcmp(argv[i], "report")) {
1203                         ret = cmd_report(dstport);
1204                         continue;
1205                 }
1206
1207                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1208                         ret = cmd_alloc_requests(atol(argv[i+1]));
1209                         i += 1;
1210                         continue;
1211                 }
1212
1213                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1214                         ret = cmd_free_requests(atol(argv[i+1]));
1215                         i += 1;
1216                         continue;
1217                 }
1218
1219                 if (!strcmp(argv[i], "put_requests")) {
1220                         ret = cmd_put_requests();
1221                         continue;
1222                 }
1223
1224                 if (!strcmp(argv[i], "put_replies")) {
1225                         ret = cmd_put_replies();
1226                         continue;
1227                 }
1228
1229                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1230                         ret = cmd_finish(atol(argv[i+1]), 0);
1231                         i += 1;
1232                         continue;
1233                 }
1234
1235                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1236                         ret = cmd_finish(atol(argv[i+1]), 1);
1237                         i += 1;
1238                         continue;
1239                 }
1240
1241                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1242                         ret = cmd_wait(atol(argv[i+1]));
1243                         i += 1;
1244                         continue;
1245                 }
1246
1247                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1248                         long nr_loops = atol(argv[i+1]);
1249                         unsigned int seed = atoi(argv[i+2]);
1250                         unsigned int targetlen = atoi(argv[i+3]);
1251                         unsigned int chunksize = atoi(argv[i+4]);
1252                         unsigned long objectsize = atol(argv[i+5]);
1253                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1254                         i += 5;
1255                         continue;
1256                 }
1257
1258                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1259                         long nr_loops = atol(argv[i+1]);
1260                         unsigned int seed = atoi(argv[i+2]);
1261                         unsigned int targetlen = atoi(argv[i+3]);
1262                         unsigned int chunksize = atoi(argv[i+4]);
1263                         unsigned long objectsize = atol(argv[i+5]);
1264                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1265                         i += 5;
1266                         continue;
1267                 }
1268
1269                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1270                         long nr_loops = atol(argv[i+1]);
1271                         long concurrent_reqs = atol(argv[i+2]);
1272                         int op = atoi(argv[i+3]);
1273                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1274                         i += 3;
1275                         continue;
1276                 }
1277
1278                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1279                         char *target = argv[i+1];
1280                         uint64_t offset = atol(argv[i+2]);
1281                         uint64_t size   = atol(argv[i+3]);
1282                         ret = cmd_read(target, offset, size);
1283                         i += 3;
1284                         continue;
1285                 }
1286
1287                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1288                         char *target = argv[i+1];
1289                         uint64_t offset = atol(argv[i+2]);
1290                         ret = cmd_write(target, offset);
1291                         i += 2;
1292                         continue;
1293                 }
1294
1295                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1296                         char *target = argv[i+1];
1297                         uint64_t offset = atol(argv[i+2]);
1298                         ret = cmd_truncate(target, offset);
1299                         i += 2;
1300                         continue;
1301                 }
1302
1303                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1304                         char *target = argv[i+1];
1305                         ret = cmd_delete(target);
1306                         i += 1;
1307                         continue;
1308                 }
1309
1310                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1311                         char *target = argv[i+1];
1312                         ret = cmd_acquire(target);
1313                         i += 1;
1314                         continue;
1315                 }
1316
1317                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1318                         char *target = argv[i+1];
1319                         ret = cmd_release(target);
1320                         i += 1;
1321                         continue;
1322                 }
1323
1324                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1325                         char *src = argv[i+1];
1326                         char *dst = argv[i+2];
1327                         ret = cmd_copy(src, dst);
1328                         i += 2;
1329                         continue;
1330                 }
1331
1332                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1333                         char *src = argv[i+1];
1334                         char *dst = argv[i+2];
1335                         ret = cmd_clone(src, dst);
1336                         i += 2;
1337                         continue;
1338                 }
1339
1340                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1341                         char *target = argv[i+1];
1342                         ret = cmd_info(target);
1343                         i += 1;
1344                         continue;
1345                 }
1346
1347
1348                 if (!parse_ports(argv[i]))
1349                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1350         }
1351
1352         /* xseg_leave(); */
1353         return ret;
1354 }