add lock manipulation and user prompting in verifying dangling reqs
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xtypes/xhash.h>
13 #include <xtypes/xobj.h>
14 #include <xseg/xseg.h>
15 #include <xseg/protocol.h>
16 int help(void)
17 {
18         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
19                 "spec:\n"
20                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
21                 "global commands:\n"
22                 "    reportall\n"
23                 "    create\n"
24                 "    destroy\n"
25                 "    bind <portno>\n"
26                 "    signal <portno>\n"
27                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
28                 "port commands:\n"
29                 "    report\n"
30                 "    alloc_requests (to source) <nr>\n"
31                 "    free_requests (from source) <nr>\n"
32                 "    put_requests (all from dest)\n"
33                 "    put_replies (all from dest)\n"
34                 "    wait        <nr_replies>\n"
35                 "    complete    <nr_requests>\n"
36                 "    fail        <nr_requests>\n"
37                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
38                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
39                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
40                 "    info        <target>\n"
41                 "    read        <target> <offset> <size>\n"
42                 "    write       <target> <offset> < data\n"
43                 "    truncate    <target> <size>\n"
44                 "    delete      <target>\n"
45                 "    acquire     <target>\n"
46                 "    release     <target>\n"
47                 "    copy        <src>  <dst>\n"
48                 "    clone       <src>  <dst>\n"
49         );
50         return 1;
51 }
52
53
54 enum req_action {
55         REPORT = 1,
56         FAIL = 2,
57         COMPLETE = 3
58 };
59
60 enum queue {
61         FREE_QUEUE = 0,
62         REQUEST_QUEUE = 1,
63         REPLY_QUEUE = 2
64 };
65
66 char *namebuf;
67 char *chunk;
68 struct xseg_config cfg;
69 struct xseg *xseg;
70 uint32_t srcport, dstport;
71 uint64_t reqs;
72 #define mkname mkname_heavy
73 /* heavy distributes duplicates much more widely than light
74  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
75  */
76
77 xport sport = NoPort;
78 static void init_local_signal() 
79 {
80         if (xseg && sport != srcport){
81                 xseg_init_local_signal(xseg, srcport);
82                 sport = srcport;
83         }
84 }
85
86 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
87 {
88         int i;
89         char c;
90         for (i = 0; i < namelen; i += 1) {
91                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
92                 c = '0' + ((c + (c >> 4)) & 0xf);
93                 if (c > '9')
94                         c += 'a'-'0'-10;
95                 name[i] = c;
96                 seed *= ((seed % 137911) | 1) * 137911;
97         }
98 }
99
100 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
101 {
102         int i;
103         char c;
104         for (i = 0; i < namelen; i += 1) {
105                 c = seed;
106                 name[i] = 'A' + (c & 0xf);
107                 seed += 1;
108         }
109 }
110
111 uint64_t pick(uint64_t size)
112 {
113         return (uint64_t)((double)(RAND_MAX) / random());
114 }
115
116 void mkchunk(   char *chunk, uint32_t datalen,
117                 char *target, uint32_t targetlen, uint64_t offset)
118 {
119         long i, r, bufsize = targetlen + 16;
120         char buf[bufsize];
121         r = datalen % bufsize;
122         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
123
124         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125                 memcpy(chunk + i, buf, bufsize);
126
127         memcpy(chunk + datalen - r, buf, r);
128 }
129
130 int chkchunk(   char *chunk, uint32_t datalen,
131                 char *target, uint32_t targetlen, uint64_t offset)
132 {
133         long i, r;
134         int bufsize = targetlen + 16;
135         char buf[bufsize];
136         r = datalen % targetlen;
137         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
138
139         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
140                 if (memcmp(chunk + i, buf, bufsize)) {
141                         /*printf("mismatch: '%*s'* vs '%*s'\n",
142                                 bufsize, buf, datalen, chunk);
143                         */
144                         return 0;
145                 }
146
147         if (memcmp(chunk + datalen - r, buf, r))
148                 return 0;
149
150         return 1;
151 }
152
153
154 #define ALLOC_MIN 4096
155 #define ALLOC_MAX 1048576
156
157 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
158 {
159         static uint64_t alloc_size;
160         static char *buf;
161         uint64_t size = 0;
162         char *p;
163         size_t r;
164
165         if (alloc_size < ALLOC_MIN)
166                 alloc_size = ALLOC_MIN;
167
168         if (alloc_size > ALLOC_MAX)
169                 alloc_size = ALLOC_MAX;
170
171         p = realloc(buf, alloc_size);
172         if (!p) {
173                 if (buf)
174                         free(buf);
175                 buf = NULL;
176                 goto out;
177         }
178
179         buf = p;
180
181         while (!feof(fp)) {
182                 r = fread(buf + size, 1, alloc_size - size, fp);
183                 if (!r)
184                         break;
185                 size += r;
186                 if (size >= alloc_size) {
187                         p = realloc(buf, alloc_size * 2);
188                         if (!p) {
189                                 if (buf)
190                                         free(buf);
191                                 buf = NULL;
192                                 size = 0;
193                                 goto out;
194                         }
195                         buf = p;
196                         alloc_size *= 2;
197                 }
198         }
199
200 out:
201         *retbuf = buf;
202         *retsize = size;
203 }
204
205 void report_request(struct xseg_request *req)
206 {
207         char target[64], data[64];
208         char *req_target, *req_data;
209         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
210         req_target = xseg_get_target(xseg, req);
211         req_data = xseg_get_data(xseg, req);
212
213         strncpy(target, req_target, end);
214         target[end] = 0;
215         strncpy(data, req_data, 63);
216         data[63] = 0;
217         fprintf(stderr,
218                 "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
219                 "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
220                 "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
221                 (unsigned long) req, req->targetlen, (unsigned long long)req->target,
222                 target,
223                 (unsigned long long) req->datalen, (unsigned long long) req->data,
224                 data,
225                 (unsigned long long) req->offset, (unsigned long long) req->size,
226                 (unsigned long long) req->serviced, req->op, req->state, req->flags,
227                 (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
228                 (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
229
230
231 }
232
233 int cmd_info(char *target)
234 {
235         uint32_t targetlen = strlen(target);
236         size_t size = sizeof(uint64_t);
237         int r;
238         xport p;
239         struct xseg_request *req;
240         char *req_target;
241
242         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
243         if (!req) {
244                 fprintf(stderr, "No request!\n");
245                 return -1;
246         }
247
248         r = xseg_prep_request(xseg, req, targetlen, size);
249         if (r < 0) {
250                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
251                         (unsigned long) targetlen, (unsigned long) size);
252                 xseg_put_request(xseg, req, srcport);
253                 return -1;
254         }
255
256         req_target = xseg_get_target(xseg, req);
257         strncpy(req_target, target, targetlen);
258         req->offset = 0;
259         req->size = size;
260         req->op = X_INFO;
261
262         p = xseg_submit(xseg, req, srcport, X_ALLOC);
263         if (p == NoPort)
264                 return -1;
265
266         xseg_signal(xseg, p);
267
268         return 0;
269 }
270
271 int cmd_read(char *target, uint64_t offset, uint64_t size)
272 {
273         uint32_t targetlen = strlen(target);
274         int r;
275         xport p;
276         char *req_target;
277         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
278         if (!req) {
279                 fprintf(stderr, "No request\n");
280                 return -1;
281         }
282
283         r = xseg_prep_request(xseg, req, targetlen, size);
284         if (r < 0) {
285                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
286                         (unsigned long)targetlen, (unsigned long long)size);
287                 xseg_put_request(xseg, req, srcport);
288                 return -1;
289         }
290
291         req_target = xseg_get_target(xseg, req);
292         strncpy(req_target, target, targetlen);
293         req->offset = offset;
294         req->size = size;
295         req->op = X_READ;
296         report_request(req);
297         p = xseg_submit(xseg, req, srcport, X_ALLOC);
298         if (p == NoPort)
299                 return -1;
300
301         xseg_signal(xseg, p);
302         return 0;
303 }
304
305 int cmd_write(char *target, uint64_t offset)
306 {
307         char *buf = NULL;
308         int r;
309         xport p;
310         uint64_t size = 0;
311         char *req_target, *req_data;
312         uint32_t targetlen = strlen(target);
313         struct xseg_request *req;
314
315         inputbuf(stdin, &buf, &size);
316         if (!size) {
317                 fprintf(stderr, "No input\n");
318                 return -1;
319         }
320
321         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
322         if (!req) {
323                 fprintf(stderr, "No request\n");
324                 return -1;
325         }
326
327         r = xseg_prep_request(xseg, req, targetlen, size);
328         if (r < 0) {
329                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
330                         (unsigned long)targetlen, (unsigned long long)size);
331                 xseg_put_request(xseg, req, srcport);
332                 return -1;
333         }
334
335         req_target = xseg_get_target(xseg, req);
336         strncpy(req_target, target, targetlen);
337         
338         req_data = xseg_get_data(xseg, req);
339         memcpy(req_data, buf, size);
340         req->offset = offset;
341         req->size = size;
342         req->op = X_WRITE;
343
344         p = xseg_submit(xseg, req, srcport, X_ALLOC);
345         if (p == NoPort) {
346                 fprintf(stderr, "Cannot submit\n");
347                 return -1;
348         }
349         xseg_signal(xseg, p);
350
351         return 0;
352 }
353
354 int cmd_truncate(char *target, uint64_t offset)
355 {
356         return 0;
357 }
358
359 int cmd_delete(char *target)
360 {
361         uint32_t targetlen = strlen(target);
362         int r;
363         struct xseg_request *req;
364         init_local_signal();
365         xseg_bind_port(xseg, srcport, NULL);
366
367         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
368         if (!req) {
369                 fprintf(stderr, "No request!\n");
370                 return -1;
371         }
372
373         r = xseg_prep_request(xseg, req, targetlen, 0);
374         if (r < 0) {
375                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
376                         (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
377                 xseg_put_request(xseg, req, srcport);
378                 return -1;
379         }
380
381         char *reqtarget = xseg_get_target(xseg, req);
382         strncpy(reqtarget, target, targetlen);
383         req->op = X_DELETE;
384
385         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
386         if (p == NoPort){
387                 fprintf(stderr, "Couldn't submit request\n");
388                 xseg_put_request(xseg, req, srcport);
389                 return -1;
390         }
391
392         xseg_signal(xseg, p);
393
394         return 0;
395 }
396
397 int cmd_acquire(char *target)
398 {
399         uint32_t targetlen = strlen(target);
400         int r;
401         xport p;
402         char *req_target;
403         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
404         if (!req) {
405                 fprintf(stderr, "No request\n");
406                 return -1;
407         }
408
409         r = xseg_prep_request(xseg, req, targetlen, 0);
410         if (r < 0) {
411                 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
412                         (unsigned long)targetlen);
413                 xseg_put_request(xseg, req, srcport);
414                 return -1;
415         }
416
417         req_target = xseg_get_target(xseg, req);
418         strncpy(req_target, target, targetlen);
419         req->offset = 0;
420         req->size = 0;
421         req->op = X_OPEN;
422         p = xseg_submit(xseg, req, srcport, X_ALLOC);
423         if (p == NoPort)
424                 return -1;
425
426         xseg_signal(xseg, p);
427         return 0;
428 }
429
430 int cmd_release(char *target)
431 {
432         uint32_t targetlen = strlen(target);
433         int r;
434         xport p;
435         char *req_target;
436         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
437         if (!req) {
438                 fprintf(stderr, "No request\n");
439                 return -1;
440         }
441
442         r = xseg_prep_request(xseg, req, targetlen, 0);
443         if (r < 0) {
444                 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
445                         (unsigned long)targetlen);
446                 xseg_put_request(xseg, req, srcport);
447                 return -1;
448         }
449
450         req_target = xseg_get_target(xseg, req);
451         strncpy(req_target, target, targetlen);
452         req->offset = 0;
453         req->size = 0;
454         req->op = X_CLOSE;
455         p = xseg_submit(xseg, req, srcport, X_ALLOC);
456         if (p == NoPort)
457                 return -1;
458
459         xseg_signal(xseg, p);
460         return 0;
461         return 0;
462 }
463
464 int cmd_copy(char *src, char *dst)
465 {
466         uint32_t targetlen = strlen(dst);
467         uint32_t parentlen = strlen(src);
468         struct xseg_request *req;
469         struct xseg_request_copy *xcopy;
470         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
471         if (!req) {
472                 fprintf(stderr, "No request\n");
473                 return -1;
474         }
475
476         int r = xseg_prep_request(xseg, req, targetlen,
477                         sizeof(struct xseg_request_copy));
478         if (r < 0) {
479                 fprintf(stderr, "Cannot prepare request!\n");
480                 xseg_put_request(xseg, req, srcport);
481                 return -1;
482         }
483
484         char *target = xseg_get_target(xseg, req);
485         char *data = xseg_get_data(xseg, req);
486
487         strncpy(target, dst, targetlen);
488         xcopy = (struct xseg_request_copy *) data;
489         strncpy(xcopy->target, src, parentlen);
490         xcopy->targetlen = parentlen;
491         req->offset = 0;
492         req->size = sizeof(struct xseg_request_copy);
493         req->op = X_COPY;
494
495         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
496         if (p == NoPort){
497                 fprintf(stderr, "Cannot submit request\n");
498                 return -1;
499         }
500         xseg_signal(xseg, p);
501
502         return 0;
503         return 0;
504 }
505
506 int cmd_clone(char *src, char *dst)
507 {
508
509         uint32_t targetlen = strlen(dst);
510         uint32_t parentlen = strlen(src);
511         struct xseg_request *req;
512         struct xseg_request_clone *xclone;
513         xseg_bind_port(xseg, srcport, NULL);
514         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
515         if (!req) {
516                 fprintf(stderr, "No request\n");
517                 return -1;
518         }
519
520         int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
521         if (r < 0) {
522                 fprintf(stderr, "Cannot prepare request!\n");
523                 xseg_put_request(xseg, req, srcport);
524                 return -1;
525         }
526
527         char *target = xseg_get_target(xseg, req);
528         char *data = xseg_get_data(xseg, req);
529
530         strncpy(target, dst, targetlen);
531         xclone = (struct xseg_request_clone *) data;
532         strncpy(xclone->target, src, parentlen);
533         xclone->targetlen = parentlen;
534         xclone->size = -1;
535         req->offset = 0;
536         req->size = sizeof(struct xseg_request_clone);
537         req->op = X_CLONE;
538
539         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
540         if (p == NoPort){
541                 fprintf(stderr, "Cannot submit request\n");
542                 return -1;
543         }
544         xseg_signal(xseg, p);
545
546         return 0;
547 }
548
549 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
550                 struct xseg_request *req)
551 {
552         FILE *logfp;
553         char target[64], data[64];
554         char *req_target, *req_data;
555         /* null terminate name in case of req->target is less than 63 characters,
556          * and next character after name (aka first byte of next buffer) is not
557          * null
558          */
559         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
560         
561         req_target = xseg_get_target(xseg, req);
562         req_data = xseg_get_data(xseg, req);
563
564         logfp = fdopen(logfd, "a");
565         if (!logfp)
566                 return;
567
568         switch(method) {
569         case 0:
570                 strncpy(target, req_target, end);
571                 target[end] = 0;
572                 strncpy(data, req_data, 63);
573                 data[63] = 0;
574
575                 fprintf(logfp,
576                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
577                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
578                         (unsigned int)portno1,
579                         (unsigned int)portno2,
580                         (unsigned int)req->op,
581                         (unsigned long long)req->offset,
582                         (unsigned long)req->size,
583                         (unsigned int)req->state,
584                         (unsigned int)req->targetlen, target,
585                         (unsigned long long)req->datalen, data);
586                 break;
587         case 1:
588                 fprintf(logfp,
589                         "src port: %u, dst port: %u, op: %u\n",
590                         (unsigned int)portno1,
591                         (unsigned int)portno2,
592                         (unsigned int)req->op);
593                 break;
594         case 2:
595                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
596                         (unsigned int)portno1,
597                         (unsigned int)portno2,
598                         (unsigned long long)++reqs);
599         }
600
601         fclose(logfp);
602         return;
603 }
604
605 #define LOG_ACCEPT  0
606 #define LOG_RECEIVE 1
607
608 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
609 {
610         struct xseg_request *req;
611         int logfd, method;
612         if (!strcmp(logfile, "-"))
613                 logfd = 1;
614         else {
615                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
616                 if (logfd < 0) {
617                         perror(logfile);
618                         return -1;
619                 }
620         }
621
622         if (!strcmp(how, "full"))
623                 method = 0;
624         else if (!strcmp(how, "summary"))
625                 method = 1;
626         else
627                 method = 2;
628
629         for (;;) {
630                 int reloop = 0, active;
631                 xseg_prepare_wait(xseg, portno1);
632                 xseg_prepare_wait(xseg, portno2);
633                 req = NULL;
634
635                 for (;;) {
636                         active = 0;
637
638                         //FIXME
639                         req = xseg_accept(xseg, portno1, 0);
640                         if (req) {
641                                 xseg_submit(xseg, req, portno2, X_ALLOC);
642                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
643                                 active += 1;
644                         }
645
646                         req = xseg_accept(xseg, portno2, 0);
647                         if (req) {
648                                 xseg_submit(xseg, req, portno1, X_ALLOC);
649                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
650                                 active += 1;
651                         }
652
653                         req = xseg_receive(xseg, portno1, 0);
654                         if (req) {
655                                 xseg_respond(xseg, req, portno2, X_ALLOC);
656                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
657                                 active += 1;
658                         }
659
660                         req = xseg_receive(xseg, portno2, 0);
661                         if (req) {
662                                 xseg_respond(xseg, req, portno1, X_ALLOC);
663                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
664                                 active += 1;
665                         }
666
667                         if (active == 0) {
668                                 if (reloop)
669                                         break;
670                                 /* wait on multiple queues? */
671                                 xseg_wait_signal(xseg, 100000);
672                                 break;
673                         } else {
674                                 xseg_cancel_wait(xseg, portno1);        
675                                 xseg_cancel_wait(xseg, portno2);        
676                                 reloop = 1;
677                         }
678                 }
679         }
680
681         close(logfd);
682
683         return 0;
684 }
685
686 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
687 {
688         if (loops < 0)
689                 return help();
690
691         if (targetlen >= chunksize) {
692                 fprintf(stderr, "targetlen >= chunksize\n");
693                 return -1;
694         }
695
696         char *p = realloc(namebuf, targetlen+1);
697         if (!p) {
698                 fprintf(stderr, "Cannot allocate memory\n");
699                 return -1;
700         }
701         namebuf = p;
702
703         p = realloc(chunk, chunksize);
704         if (!p) {
705                 fprintf(stderr, "Cannot allocate memory\n");
706                 return -1;
707         }
708         chunk = p;
709         memset(chunk, 0, chunksize);
710
711         srandom(seed);
712
713         struct xseg_request *submitted = NULL, *received;
714         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
715         int reported = 0, r;
716         uint64_t offset;
717         xport port;
718         char *req_data, *req_target;
719         seed = random();
720         init_local_signal();
721
722         for (;;) {
723                 xseg_prepare_wait(xseg, srcport);
724                 if (nr_submitted < loops &&
725                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
726                         xseg_cancel_wait(xseg, srcport);
727                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
728                         if (r < 0) {
729                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
730                                         targetlen, chunksize);
731                                 xseg_put_request(xseg, submitted, srcport);
732                                 return -1;
733                         }
734                         
735                         req_target = xseg_get_target(xseg, submitted);
736                         req_data = xseg_get_data(xseg, submitted);
737
738                         reported = 0;
739                         mkname(namebuf, targetlen, seed);
740                         namebuf[targetlen] = 0;
741                         //printf("%ld: %s\n", nr_submitted, namebuf);
742                         strncpy(req_target, namebuf, targetlen);
743                         offset = 0;// pick(size);
744                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
745
746                         submitted->offset = offset;
747                         submitted->size = chunksize;
748                         submitted->op = X_WRITE;
749                         submitted->flags |= XF_NOSYNC;
750
751                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
752                         if (port == NoPort) {
753                                 xseg_put_request(xseg, submitted, srcport);
754                         } else {
755                                 seed = random();
756                                 nr_submitted += 1;
757                                 xseg_signal(xseg, port);
758                         }
759                 }
760
761                 received = xseg_receive(xseg, srcport, 0);
762                 if (received) {
763                         xseg_cancel_wait(xseg, srcport);
764                         nr_received += 1;
765                         if (!(received->state & XS_SERVED)) {
766                                 nr_failed += 1;
767                                 report_request(received);
768                         }
769                         if (xseg_put_request(xseg, received, srcport))
770                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
771                 }
772
773                 if (!submitted && !received)
774                         xseg_wait_signal(xseg, 1000000);
775
776                         if (nr_submitted % 1000 == 0 && !reported) {
777                                 reported = 1;
778                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
779                                         nr_submitted, nr_received, nr_failed);
780                         }
781
782                         if (nr_received >= loops)
783                                 break;
784         }
785
786         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
787                 nr_submitted, nr_received, nr_failed);
788         return 0;
789 }
790
791 int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
792 {
793         if (loops < 0)
794                 return help();
795         char *p = realloc(namebuf, targetlen+1);
796         if (!p) {
797                 fprintf(stderr, "Cannot allocate memory\n");
798                 return -1;
799         }
800         namebuf = p;
801
802         srandom(seed);
803
804         struct xseg_request *submitted = NULL, *received;
805         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
806         int reported = 0, r;
807         xport port;
808         char *req_target;
809         seed = random();
810         init_local_signal();
811
812         for (;;) {
813                 xseg_prepare_wait(xseg, srcport);
814                 if (nr_submitted < loops &&
815                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
816                         xseg_cancel_wait(xseg, srcport);
817                         r = xseg_prep_request(xseg, submitted, targetlen, 0);
818                         if (r < 0) {
819                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
820                                         targetlen, 0);
821                                 xseg_put_request(xseg, submitted, srcport);
822                                 return -1;
823                         }
824                         
825                         req_target = xseg_get_target(xseg, submitted);
826
827                         reported = 0;
828                         mkname(namebuf, targetlen, seed);
829                         namebuf[targetlen] = 0;
830                         //printf("%ld: %s\n", nr_submitted, namebuf);
831                         strncpy(req_target, namebuf, targetlen);
832                         submitted->offset = 0;
833                         submitted->size = 0;
834                         submitted->op = X_DELETE;
835                         submitted->flags = 0;
836
837                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
838                         if (port == NoPort) {
839                                 xseg_put_request(xseg, submitted, srcport);
840                         } else {
841                                 seed = random();
842                                 nr_submitted += 1;
843                                 xseg_signal(xseg, port);
844                         }
845                 }
846
847                 received = xseg_receive(xseg, srcport, 0);
848                 if (received) {
849                         xseg_cancel_wait(xseg, srcport);
850                         nr_received += 1;
851                         if (!(received->state & XS_SERVED)) {
852                                 nr_failed += 1;
853                                 report_request(received);
854                         }
855                         if (xseg_put_request(xseg, received, srcport))
856                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
857                 }
858
859                 if (!submitted && !received)
860                         xseg_wait_signal(xseg, 1000000);
861
862                         if (nr_submitted % 1000 == 0 && !reported) {
863                                 reported = 1;
864                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
865                                         nr_submitted, nr_received, nr_failed);
866                         }
867
868                         if (nr_received >= loops)
869                                 break;
870         }
871
872         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
873                 nr_submitted, nr_received, nr_failed);
874         return 0;
875 }
876 /* note:
877  * prepare/wait rhythm,
878  * files are converted to independent chunk access patterns,
879 */
880
881 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
882 {
883         if (loops < 0)
884                 return help();
885
886         if (targetlen >= chunksize) {
887                 fprintf(stderr, "targetlen >= chunksize\n");
888                 return -1;
889         }
890
891         char *p = realloc(namebuf, targetlen+1);
892         if (!p) {
893                 fprintf(stderr, "Cannot allocate memory\n");
894                 return -1;
895         }
896         namebuf = p;
897
898         p = realloc(chunk, chunksize);
899         if (!p) {
900                 fprintf(stderr, "Cannot allocate memory\n");
901                 return -1;
902         }
903         chunk = p;
904         memset(chunk, 0, chunksize);
905
906         srandom(seed);
907
908         struct xseg_request *submitted = NULL, *received;
909         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
910         int reported = 0, r;
911         uint64_t offset;
912         xport port;
913         char *req_data, *req_target;
914         init_local_signal();
915
916         seed = random();
917         for (;;) {
918                 submitted = NULL;
919                 xseg_prepare_wait(xseg, srcport);
920                 if (nr_submitted < loops &&
921                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
922                         xseg_cancel_wait(xseg, srcport);
923                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
924                         if (r < 0) {
925                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
926                                         targetlen, chunksize);
927                                 xseg_put_request(xseg, submitted, srcport);
928                                 return -1;
929                         }
930
931                         req_target = xseg_get_target(xseg, submitted);
932                         reported = 0;
933                         mkname(namebuf, targetlen, seed);
934                         namebuf[targetlen] = 0;
935                         //printf("%ld: %s\n", nr_submitted, namebuf);
936                         offset = 0;//pick(size);
937
938                         strncpy(req_target, namebuf, targetlen);
939                         submitted->offset = offset;
940                         submitted->size = chunksize;
941                         submitted->op = X_READ;
942                         port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
943                         if (port == NoPort) {
944                                 xseg_put_request(xseg, submitted, srcport);
945                         } else {
946                                 seed = random();
947                                 nr_submitted += 1;
948                                 xseg_signal(xseg, port);
949                         }
950                 }
951
952                 received = xseg_receive(xseg, srcport, 0);
953                 if (received) {
954                         xseg_cancel_wait(xseg, srcport);
955                         nr_received += 1;
956                         req_target = xseg_get_target(xseg, received);
957                         req_data = xseg_get_data(xseg, received);
958                         if (!(received->state & XS_SERVED)) {
959                                 nr_failed += 1;
960                                 report_request(received);
961                         } else if (!chkchunk(req_data, received->datalen,
962                                         req_target, received->targetlen, received->offset)) {
963         //                      report_request(received);
964                                 nr_mismatch += 1;
965                         }
966
967                         if (xseg_put_request(xseg, received, srcport))
968                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
969                 }
970
971                 if (!submitted && !received)
972                         xseg_wait_signal(xseg, 1000000);
973
974                 if (nr_submitted % 1000 == 0 && !reported) {
975                         reported = 1;
976                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
977                         nr_submitted, nr_received, nr_failed, nr_mismatch);
978                 }
979
980                 if (nr_received >= loops)
981                         break;
982         }
983
984         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
985                 nr_submitted, nr_received, nr_failed, nr_mismatch);
986         return 0;
987 }
988
989 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
990 {
991         if (loops < 0)
992                 return help();
993
994         struct xseg_request *submitted = NULL, *received;
995         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
996         int reported = 0, r;
997         uint64_t offset;
998         uint32_t targetlen = 10, chunksize = 4096;
999         struct timeval tv1, tv2;
1000         xport p;
1001         char *req_data, *req_target;
1002
1003         xseg_bind_port(xseg, srcport, NULL);
1004
1005         gettimeofday(&tv1, NULL);
1006         for (;;) {
1007                 submitted = NULL;
1008                 xseg_prepare_wait(xseg, srcport);
1009                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
1010                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
1011                         xseg_cancel_wait(xseg, srcport);
1012                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
1013                         if (r < 0) {
1014                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
1015                                         targetlen, chunksize);
1016                                 xseg_put_request(xseg, submitted, srcport);
1017                                 return -1;
1018                         }
1019                         
1020                         //FIXME
1021                         ++nr_flying;
1022                         nr_submitted += 1;
1023                         reported = 0;
1024                         offset = 0;//pick(size);
1025
1026                         submitted->offset = offset;
1027                         submitted->size = chunksize;
1028                         req_target = xseg_get_target(xseg, submitted);
1029                         req_data = xseg_get_data(xseg, submitted);
1030
1031                         if (op == 0)
1032                                 submitted->op = X_INFO;
1033                         else if (op == 1)
1034                                 submitted->op = X_READ;
1035                         else if (op == 2) {
1036                                 submitted->op = X_WRITE;
1037                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
1038                         }
1039
1040                         p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
1041                         if ( p != NoPort){
1042                                 if (xseg_signal(xseg, p) < 0)
1043                                         perror("Cannot signal peer");
1044                         }
1045                 }
1046                 received = xseg_receive(xseg, srcport, 0);
1047                 if (received) {
1048                         xseg_cancel_wait(xseg, srcport);
1049                         --nr_flying;
1050                         if (nr_received == 0)
1051                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
1052                                         (unsigned long long)received->elapsed);
1053                         nr_received += 1;
1054                         if (!(received->state & XS_SERVED)) {
1055                                 nr_failed += 1;
1056                                 //report_request(received);
1057                         }
1058
1059                         if (xseg_put_request(xseg, received, srcport))
1060                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
1061                 }
1062
1063                 if (!submitted && !received)
1064                         xseg_wait_signal(xseg, 10000000L);
1065
1066                 if (nr_received >= loops)
1067                         break;
1068         }
1069         gettimeofday(&tv2, NULL);
1070
1071         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1072                 nr_submitted, nr_received, nr_failed, nr_mismatch);
1073         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
1074         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
1075
1076         return 0;
1077 }
1078
1079 static void lock_status(struct xlock *lock, char *buf, int len)
1080 {
1081         int r;
1082         if (lock->owner == Noone)
1083                 r = snprintf(buf, len, "Locked: No");
1084         else
1085                 r = snprintf(buf, len, "Locked: Yes (Owner: %lu)", lock->owner);
1086         if (r >= len)
1087                 buf[len-1] = 0;
1088 }
1089
1090 int cmd_report(uint32_t portno)
1091 {
1092         char fls[64], rls[64], pls[64]; // buffer to store lock status
1093         struct xseg_port *port = xseg_get_port(xseg, portno);
1094         if (!port) {
1095                 printf("port %u is not assigned\n", portno);
1096                 return 0;
1097         }
1098         struct xq *fq, *rq, *pq;
1099         fq = xseg_get_queue(xseg, port, free_queue);
1100         rq = xseg_get_queue(xseg, port, request_queue);
1101         pq = xseg_get_queue(xseg, port, reply_queue);
1102         lock_status(&port->fq_lock, fls, 64);
1103         lock_status(&port->rq_lock, rls, 64);
1104         lock_status(&port->pq_lock, pls, 64);
1105         fprintf(stderr, "port %u:\n"
1106                 "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
1107                 "       free_queue [%p] count : %4llu | %s\n"
1108                 "    request_queue [%p] count : %4llu | %s\n"
1109                 "      reply_queue [%p] count : %4llu | %s\n",
1110                 portno, (unsigned long long)port->alloc_reqs, 
1111                 (unsigned long long)port->max_alloc_reqs,
1112                 xseg->src_gw[portno],
1113                 xseg->dst_gw[portno],
1114                 (void *)fq, (unsigned long long)xq_count(fq), fls,
1115                 (void *)rq, (unsigned long long)xq_count(rq), rls,
1116                 (void *)pq, (unsigned long long)xq_count(pq), pls);
1117         return 0;
1118 }
1119
1120 int cmd_join(void)
1121 {
1122         if (xseg)
1123                 return 0;
1124
1125         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
1126         if (!xseg) {
1127                 fprintf(stderr, "cannot join segment!\n");
1128                 return -1;
1129         }
1130         return 0;
1131 }
1132 static void print_hanlder(char *name, struct xobject_h *obj_h)
1133 {
1134         char ls[64];
1135         lock_status(&obj_h->lock, ls, 64);
1136         fprintf(stderr, "%20s: free: %4llu, allocated: %4llu, allocated space: %7llu (object size: %llu), Lock %s\n",
1137                         name,
1138                         (unsigned long long) obj_h->nr_free,
1139                         (unsigned long long) obj_h->nr_allocated,
1140                         (unsigned long long) obj_h->allocated_space,
1141                         (unsigned long long) obj_h->obj_size, ls);
1142 }
1143
1144 //FIXME ugly
1145 static void print_heap(struct xseg *xseg)
1146 {
1147         char *UNIT[4];
1148         UNIT[0] = "B";
1149         UNIT[1] = "KiB";
1150         UNIT[2] = "MiB";
1151         UNIT[3] = "GiB";
1152         uint64_t MULT[4];
1153         MULT[0] = 1;
1154         MULT[1] = 1024;
1155         MULT[2] = 1024*1024;
1156         MULT[3] = 1024*1024*1024;
1157
1158         int u;
1159         uint64_t t;
1160         fprintf(stderr, "Heap usage: ");
1161         u = 0;
1162         t = xseg->heap->cur;
1163         while (t > 0) {
1164                 t /= 1024;
1165                 u++;
1166         }
1167         if (!t)
1168                 u--;
1169         t = xseg->heap->cur / MULT[u];
1170         if (t < 10){
1171                 float tf = ((float)(xseg->heap->cur))/((float)MULT[u]);
1172                 fprintf(stderr, "%2.1f %s/", tf, UNIT[u]);
1173         }
1174         else {
1175                 unsigned int tu = xseg->heap->cur / MULT[u];
1176                 fprintf(stderr, "%3u %s/", tu, UNIT[u]);
1177         }
1178
1179         u = 0;
1180         t = xseg->config.heap_size;
1181         while (t > 0) {
1182                 t /= 1024;
1183                 u++;
1184         }
1185         if (!t)
1186                 u--;
1187         t = xseg->config.heap_size/MULT[u];
1188         if (t < 10){
1189                 float tf = ((float)(xseg->config.heap_size))/(float)MULT[u];
1190                 fprintf(stderr, "%2.1f %s ", tf, UNIT[u]);
1191         }
1192         else {
1193                 unsigned int tu = xseg->config.heap_size / MULT[u];
1194                 fprintf(stderr, "%3u %s ", tu, UNIT[u]);
1195         }
1196         char ls[64];
1197         lock_status(&xseg->heap->lock, ls, 64);
1198         fprintf(stderr, "(%llu / %llu), %s\n",
1199                         (unsigned long long)xseg->heap->cur,
1200                         (unsigned long long)xseg->config.heap_size,
1201                         ls);
1202 }
1203
1204 int cmd_reportall(void)
1205 {
1206         uint32_t t;
1207
1208         if (cmd_join())
1209                 return -1;
1210
1211         fprintf(stderr, "Segment lock: %s\n",
1212                 (xseg->shared->flags & XSEG_F_LOCK) ? "Locked" : "Unlocked");
1213         print_heap(xseg);
1214         /* fprintf(stderr, "Heap usage: %llu / %llu\n", */
1215         /*              (unsigned long long)xseg->heap->cur, */
1216         /*              (unsigned long long)xseg->config.heap_size); */
1217         fprintf(stderr, "Handlers: \n");
1218         print_hanlder("Requests handler", xseg->request_h);
1219         print_hanlder("Ports handler", xseg->port_h);
1220         print_hanlder("Objects handler", xseg->object_handlers);
1221         fprintf(stderr, "\n");
1222
1223         for (t = 0; t < xseg->config.nr_ports; t++)
1224                 cmd_report(t);
1225
1226         return 0;
1227 }
1228
1229
1230 int finish_req(struct xseg_request *req, enum req_action action)
1231 {
1232         if (action == COMPLETE){
1233                 req->state &= ~XS_FAILED;
1234                 req->state |= XS_SERVED;
1235         } else {
1236                 req->state |= XS_FAILED;
1237                 req->state &= ~XS_SERVED;
1238         }
1239         req->serviced = 0;
1240         xport p = xseg_respond(xseg, req, srcport, X_ALLOC);
1241         if (p == NoPort)
1242                 xseg_put_request(xseg, req, srcport);
1243         else
1244                 xseg_signal(xseg, p);
1245         return 0;
1246 }
1247
1248 //FIXME this should be in xseg lib?
1249 static int isDangling(struct xseg_request *req)
1250 {
1251         xport i;
1252         struct xseg_port *port;
1253         for (i = 0; i < xseg->config.nr_ports; i++) {
1254                 if (xseg->ports[i]){
1255                         port = xseg_get_port(xseg, i);
1256                         if (!port){
1257                                 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1258                                 continue;
1259                         }
1260                         struct xq *fq, *rq, *pq;
1261                         fq = xseg_get_queue(xseg, port, free_queue);
1262                         rq = xseg_get_queue(xseg, port, request_queue);
1263                         pq = xseg_get_queue(xseg, port, reply_queue);
1264                         xlock_acquire(&port->fq_lock, srcport);
1265                         if (__xq_check(fq, XPTR_MAKE(req, xseg->segment))){
1266                                         xlock_release(&port->fq_lock);
1267                                         return 0;
1268                         }
1269                         xlock_release(&port->fq_lock);
1270                         xlock_acquire(&port->rq_lock, srcport);
1271                         if (__xq_check(rq, XPTR_MAKE(req, xseg->segment))){
1272                                         xlock_release(&port->rq_lock);
1273                                         return 0;
1274                         }
1275                         xlock_release(&port->rq_lock);
1276                         xlock_acquire(&port->pq_lock, srcport);
1277                         if (__xq_check(pq, XPTR_MAKE(req, xseg->segment))){
1278                                         xlock_release(&port->pq_lock);
1279                                         return 0;
1280                         }
1281                         xlock_release(&port->pq_lock);
1282                 }
1283         }
1284         return 1;
1285 }
1286
1287 int prompt_user(char *msg)
1288 {
1289         int c = 0, r = -1;
1290         printf("%s [y/n]: ", msg);
1291         while (1) {
1292                 c = fgetc(stdin);
1293                 if (c == 'y' || c == 'Y')
1294                         r = 1;
1295                 else if (c == 'n' || c == 'N')
1296                         r = 0;
1297                 else if (c == '\n'){
1298                         if (r == -1)
1299                                 printf("%s [y/n]: ", msg);
1300                         else
1301                                 break;
1302                 }
1303         }
1304         return r;
1305 }
1306
1307 //FIXME this should be in xseg lib?
1308 int cmd_verify(int fix)
1309 {
1310         if (cmd_join())
1311                 return -1;
1312         //segment lock
1313         if (xseg->shared->flags & XSEG_F_LOCK){
1314                 fprintf(stderr, "Segment lock: Locked\n");
1315                 if (fix && prompt_user("Unlock it ?"))
1316                         xseg->shared->flags &= ~XSEG_F_LOCK;
1317         }
1318         //heap lock
1319         if (xseg->heap->lock.owner != Noone){
1320                 fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
1321                         (unsigned long long)xseg->heap->lock.owner);
1322                 if (fix && prompt_user("Unlock it ?"))
1323                         xlock_release(&xseg->heap->lock);
1324         }
1325         //obj_h locks
1326         if (xseg->request_h->lock.owner != Noone){
1327                 fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
1328                         (unsigned long long)xseg->request_h->lock.owner);
1329                 if (fix && prompt_user("Unlock it ?"))
1330                         xlock_release(&xseg->request_h->lock);
1331         }
1332         if (xseg->port_h->lock.owner != Noone){
1333                 fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
1334                         (unsigned long long)xseg->port_h->lock.owner);
1335                 if (fix && prompt_user("Unlock it ?"))
1336                         xlock_release(&xseg->port_h->lock);
1337         }
1338         if (xseg->object_handlers->lock.owner != Noone){
1339                 fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
1340                         (unsigned long long)xseg->object_handlers->lock.owner);
1341                 if (fix && prompt_user("Unlock it ?"))
1342                         xlock_release(&xseg->object_handlers->lock);
1343         }
1344         //take segment lock?
1345         xport i;
1346         struct xseg_port *port;
1347         for (i = 0; i < xseg->config.nr_ports; i++) {
1348                 if (xseg->ports[i]){
1349                         port = xseg_get_port(xseg, i);
1350                         if (!port){
1351                                 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1352                                 continue;
1353                         }
1354                         if (port->fq_lock.owner != Noone) {
1355                                 fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
1356                                                 i, (unsigned long long)port->fq_lock.owner);
1357                                 if (fix && prompt_user("Unlock it ?"))
1358                                         xlock_release(&port->fq_lock);
1359                         }
1360                         if (port->rq_lock.owner != Noone) {
1361                                 fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
1362                                                 i, (unsigned long long)port->rq_lock.owner);
1363                                 if (fix && prompt_user("Unlock it ?"))
1364                                         xlock_release(&port->rq_lock);
1365                         }
1366                         if (port->pq_lock.owner != Noone) {
1367                                 fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
1368                                                 i, (unsigned long long)port->pq_lock.owner);
1369                                 if (fix && prompt_user("Unlock it ?"))
1370                                         xlock_release(&port->pq_lock);
1371                         }
1372                 }
1373         }
1374
1375         struct xobject_h *obj_h = xseg->request_h;
1376         struct xobject_iter it;
1377
1378         struct xseg_request *req;
1379         xlock_acquire(&obj_h->lock, srcport);
1380         xobj_iter_init(obj_h, &it);
1381         while (xobj_iterate(obj_h, &it, (void **)&req)){
1382                 //FIXME this will not work cause obj->magic - req->serial is not
1383                 //touched when a request is get
1384                 /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
1385                 if (isDangling(req) && !__xobj_isFree(obj_h, req)){
1386                         report_request(req);
1387                         if (fix && prompt_user("Fail it ?")){
1388                                 printf("Finishing ...\n");
1389                                 finish_req(req, FAIL);
1390                         }
1391                 }
1392         }
1393         xlock_release(&obj_h->lock);
1394         return 0;
1395 }
1396
1397 int cmd_inspectq(xport portno, enum queue qt)
1398 {
1399         if (cmd_join())
1400                 return -1;
1401
1402         struct xq *q;
1403         struct xlock *l;
1404         struct xseg_port *port = xseg_get_port(xseg, portno);
1405         if (!port)
1406                 return -1;
1407         if (qt == FREE_QUEUE){
1408                 q = xseg_get_queue(xseg, port, free_queue);
1409                 l = &port->fq_lock;
1410         }
1411         else if (qt == REQUEST_QUEUE){
1412                 q = xseg_get_queue(xseg, port, request_queue);
1413                 l = &port->rq_lock;
1414         }
1415         else if (qt == REPLY_QUEUE) {
1416                 q = xseg_get_queue(xseg, port, reply_queue);
1417                 l = &port->rq_lock;
1418         }
1419         else
1420                 return -1;
1421         xlock_acquire(l, srcport);
1422         xqindex i,c = xq_count(q);
1423         if (c) {
1424                 struct xseg_request *req;
1425                 xptr xqi;
1426                 for (i = 0; i < c; i++) {
1427                         xqi = __xq_pop_head(q);
1428                         req = XPTR_TAKE(xqi, xseg->segment);
1429                         report_request(req);
1430                         __xq_append_tail(q, xqi);
1431                 }
1432         }
1433         else {
1434                 fprintf(stderr, "Queue is empty\n\n");
1435         }
1436         xlock_release(l);
1437         return 0;
1438 }
1439
1440
1441 int cmd_request(struct xseg_request *req, enum req_action action)
1442 {
1443         if (cmd_join())
1444                 return -1;
1445
1446         struct xobject_h *obj_h = xseg->request_h;
1447         if (!xobj_check(obj_h, req))
1448                 return -1;
1449
1450         if (action == REPORT)
1451                 report_request(req);
1452         else if (action == FAIL){
1453                 report_request(req);
1454                 if (prompt_user("fail it ?")){
1455                         printf("Finishing ...\n");
1456                         finish_req(req, FAIL);
1457                 }
1458         }
1459         else if (action == COMPLETE){
1460                 report_request(req);
1461                 if (prompt_user("Complete it ?")){
1462                         printf("Finishing ...\n");
1463                         finish_req(req, COMPLETE);
1464                 }
1465         }
1466         return 0;
1467 }
1468
1469 int cmd_create(void)
1470 {
1471         int r = xseg_create(&cfg);
1472         if (r) {
1473                 fprintf(stderr, "cannot create segment!\n");
1474                 return -1;
1475         }
1476
1477         fprintf(stderr, "Segment initialized.\n");
1478         return 0;
1479 }
1480
1481 int cmd_destroy(void)
1482 {
1483         if (!xseg && cmd_join())
1484                 return -1;
1485         xseg_leave(xseg);
1486         xseg_destroy(xseg);
1487         xseg = NULL;
1488         fprintf(stderr, "Segment destroyed.\n");
1489         return 0;
1490 }
1491
1492 int cmd_alloc_requests(unsigned long nr)
1493 {
1494         return xseg_alloc_requests(xseg, srcport, nr);
1495 }
1496
1497 int cmd_free_requests(unsigned long nr)
1498 {
1499         return xseg_free_requests(xseg, srcport, nr);
1500 }
1501
1502 int cmd_put_requests(void)
1503 {
1504         struct xseg_request *req;
1505
1506         for (;;) {
1507                 req = xseg_accept(xseg, dstport, 0);
1508                 if (!req)
1509                         break;
1510                 if (xseg_put_request(xseg, req, srcport))
1511                         fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1512         }
1513
1514         return 0;
1515 }
1516
1517 int cmd_finish(unsigned long nr, int fail)
1518 {
1519         struct xseg_request *req;
1520         char *buf = malloc(sizeof(char) * 8128);
1521         char *req_target, *req_data;
1522         xseg_bind_port(xseg, srcport, NULL);
1523         xport p;
1524
1525         for (; nr--;) {
1526                 xseg_prepare_wait(xseg, srcport);
1527                 req = xseg_accept(xseg, srcport, 0);
1528                 if (req) {
1529                         req_target = xseg_get_target(xseg, req);
1530                         req_data = xseg_get_data(xseg, req);
1531                         xseg_cancel_wait(xseg, srcport);
1532                         if (fail == 1)
1533                                 req->state &= ~XS_SERVED;
1534                         else {
1535                                 if (req->op == X_READ)
1536                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1537                                 else if (req->op == X_WRITE) 
1538                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1539                                 else if (req->op == X_INFO)
1540                                         *((uint64_t *) req->data) = 4294967296;
1541                                 
1542                                 req->state |= XS_SERVED;
1543                                 req->serviced = req->size;
1544                         }
1545
1546                         p = xseg_respond(xseg, req, srcport, X_ALLOC);
1547                         xseg_signal(xseg, p);
1548                         continue;
1549                 }
1550                 ++nr;
1551                 xseg_wait_signal(xseg, 10000000L);
1552         }
1553
1554         free(buf);
1555
1556         return 0;
1557 }
1558
1559 void handle_reply(struct xseg_request *req)
1560 {
1561         char *req_data = xseg_get_data(xseg, req);
1562         char *req_target = xseg_get_target(xseg, req);
1563         if (!(req->state & XS_SERVED)) {
1564                 report_request(req);
1565                 goto put;
1566         }
1567
1568         switch (req->op) {
1569         case X_READ:
1570                 fwrite(req_data, 1, req->datalen, stdout);
1571                 break;
1572
1573         case X_WRITE:
1574                 fprintf(stdout, "wrote: ");
1575                 fwrite(req_data, 1, req->datalen, stdout);
1576                 break;
1577         case X_SYNC:
1578         case X_DELETE:
1579                 fprintf(stderr, "deleted %s\n", req_target);
1580                 break;
1581         case X_TRUNCATE:
1582         case X_COMMIT:
1583         case X_CLONE:
1584                 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1585                 break;
1586         case X_INFO:
1587                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1588                 break;
1589         case X_COPY:
1590                 fprintf(stderr, "copied %s\n", ((struct xseg_request_copy *)req_data)->target);
1591                 break;
1592         case X_CLOSE:
1593                 fprintf(stderr, "Closed %s\n", req_target);
1594         case X_OPEN:
1595                 fprintf(stderr, "Opened %s\n", req_target);
1596
1597         default:
1598                 break;
1599         }
1600
1601 put:
1602         if (xseg_put_request(xseg, req, srcport))
1603                 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1604 }
1605
1606 int cmd_wait(uint32_t nr)
1607 {
1608         struct xseg_request *req;
1609         long ret;
1610         init_local_signal(); 
1611
1612         for (;;) {
1613                 req = xseg_receive(xseg, srcport, 0);
1614                 if (req) {
1615                         handle_reply(req);
1616                         nr--;
1617                         if (nr == 0)
1618                                 break;
1619                         continue;
1620                 }
1621
1622                 ret = xseg_prepare_wait(xseg, srcport);
1623                 if (ret)
1624                         return -1;
1625
1626                 ret = xseg_wait_signal(xseg, 1000000);
1627                 ret = xseg_cancel_wait(xseg, srcport);
1628                 if (ret)
1629                         return -1;
1630         }
1631
1632         return 0;
1633 }
1634
1635 int cmd_put_replies(void)
1636 {
1637         struct xseg_request *req;
1638
1639         for (;;) {
1640                 req = xseg_receive(xseg, dstport, 0);
1641                 if (!req)
1642                         break;
1643                 fprintf(stderr, "request: %08llx%08llx\n"
1644                         "     op: %u\n"
1645                         "  state: %u\n",
1646                         0LL, (unsigned long long)req->serial,
1647                         req->op,
1648                         req->state);
1649                 report_request(req);
1650
1651                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1652
1653                 if (xseg_put_request(xseg, req, srcport))
1654                         fprintf(stderr, "Cannot put reply\n");
1655         }
1656
1657         return 0;
1658 }
1659
1660 int cmd_bind(long portno)
1661 {
1662         struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1663         if (!port) {
1664                 fprintf(stderr, "failed to bind port %ld\n", portno);
1665                 return 1;
1666         }
1667
1668         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1669         return 0;
1670 }
1671
1672 int cmd_signal(uint32_t portno)
1673 {
1674         return xseg_signal(xseg, portno);
1675 }
1676
1677 int parse_ports(char *str)
1678 {
1679         int ret = 0;
1680         char *s = str;
1681
1682         for (;;) {
1683                 if (*s == 0)
1684                         return 0;
1685
1686                 if (*s == ':') {
1687                         *s = 0;
1688                         if ((s > str) && isdigit(str[0])) {
1689                                 srcport = atol(str);
1690                                 ret ++;
1691                         }
1692                         break;
1693                 }
1694                 s ++;
1695         }
1696
1697         s += 1;
1698         str = s;
1699
1700         for (;;) {
1701                 if (*s == 0) {
1702                         if ((s > str) && isdigit(str[0])) {
1703                                 dstport = atol(str);
1704                                 ret ++;
1705                         }
1706                         break;
1707                 }
1708                 s ++;
1709         }
1710
1711         return ret;
1712 }
1713
1714 int main(int argc, char **argv)
1715 {
1716         int i, ret = 0;
1717         char *spec;
1718
1719         if (argc < 3)
1720                 return help();
1721
1722         srcport = -1;
1723         dstport = -1;
1724         spec = argv[1];
1725
1726         if (xseg_parse_spec(spec, &cfg)) {
1727                 fprintf(stderr, "Cannot parse spec\n");
1728                 return -1;
1729         }
1730
1731         if (xseg_initialize()) {
1732                 fprintf(stderr, "cannot initialize!\n");
1733                 return -1;
1734         }
1735
1736         for (i = 2; i < argc; i++) {
1737
1738                 if (!strcmp(argv[i], "create")) {
1739                         ret = cmd_create();
1740                         continue;
1741                 }
1742
1743                 if (!strcmp(argv[i], "join")) {
1744                         ret = cmd_join();
1745                         if (!ret)
1746                                 fprintf(stderr, "Segment joined.\n");
1747                         continue;
1748                 }
1749
1750                 if (!strcmp(argv[i], "destroy")) {
1751                         ret = cmd_destroy();
1752                         continue;
1753                 }
1754
1755                 if (cmd_join())
1756                         return -1;
1757
1758                 if (!strcmp(argv[i], "reportall")) {
1759                         ret = cmd_reportall();
1760                         continue;
1761                 }
1762
1763                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1764                         ret = cmd_bind(atol(argv[i+1]));
1765                         i += 1;
1766                         continue;
1767                 }
1768
1769                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1770                         ret = cmd_signal(atol(argv[i+1]));
1771                         i += 1;
1772                         continue;
1773                 }
1774
1775                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1776                         ret = cmd_bridge(atol(argv[i+1]),
1777                                          atol(argv[i+2]),
1778                                          argv[i+3],
1779                                          argv[i+4]);
1780                         i += 4;
1781                         continue;
1782                 }
1783
1784                 if (srcport == -1) {
1785                         if (!parse_ports(argv[i]))
1786                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1787                         continue;
1788                 }
1789
1790                 if (dstport == -1) {
1791                         if (!parse_ports(argv[i]))
1792                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1793                         continue;
1794                 }
1795
1796                 if (!strcmp(argv[i], "verify")) {
1797                         ret = cmd_verify(0);
1798                         continue;
1799                 }
1800
1801                 if (!strcmp(argv[i], "verify-fix")) {
1802                         ret = cmd_verify(1);
1803                         continue;
1804                 }
1805
1806                 if (!strcmp(argv[i], "failreq") && (i + 1 < argc)) {
1807                         struct xseg_request *req;
1808                         sscanf(argv[i+1], "%lx", &req);
1809                         ret = cmd_request(req, FAIL);
1810                         i += 1;
1811                         continue;
1812                 }
1813
1814                 if (!strcmp(argv[i], "inspect-freeq") && (i + 1 < argc)) {
1815                         ret = cmd_inspectq(atol(argv[i+1]), FREE_QUEUE);
1816                         i += 1;
1817                         continue;
1818                 }
1819
1820                 if (!strcmp(argv[i], "inspect-requestq") && (i + 1 < argc)) {
1821                         ret = cmd_inspectq(atol(argv[i+1]), REQUEST_QUEUE);
1822                         i += 1;
1823                         continue;
1824                 }
1825
1826                 if (!strcmp(argv[i], "inspect-replyq") && (i + 1 < argc)) {
1827                         ret = cmd_inspectq(atol(argv[i+1]), REPLY_QUEUE);
1828                         i += 1;
1829                         continue;
1830                 }
1831
1832                 if (!strcmp(argv[i], "report")) {
1833                         ret = cmd_report(dstport);
1834                         continue;
1835                 }
1836
1837                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1838                         ret = cmd_alloc_requests(atol(argv[i+1]));
1839                         i += 1;
1840                         continue;
1841                 }
1842
1843                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1844                         ret = cmd_free_requests(atol(argv[i+1]));
1845                         i += 1;
1846                         continue;
1847                 }
1848
1849                 if (!strcmp(argv[i], "put_requests")) {
1850                         ret = cmd_put_requests();
1851                         continue;
1852                 }
1853
1854                 if (!strcmp(argv[i], "put_replies")) {
1855                         ret = cmd_put_replies();
1856                         continue;
1857                 }
1858
1859                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1860                         ret = cmd_finish(atol(argv[i+1]), 0);
1861                         i += 1;
1862                         continue;
1863                 }
1864
1865                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1866                         ret = cmd_finish(atol(argv[i+1]), 1);
1867                         i += 1;
1868                         continue;
1869                 }
1870
1871                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1872                         ret = cmd_wait(atol(argv[i+1]));
1873                         i += 1;
1874                         continue;
1875                 }
1876
1877                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1878                         long nr_loops = atol(argv[i+1]);
1879                         unsigned int seed = atoi(argv[i+2]);
1880                         unsigned int targetlen = atoi(argv[i+3]);
1881                         unsigned int chunksize = atoi(argv[i+4]);
1882                         unsigned long objectsize = atol(argv[i+5]);
1883                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1884                         i += 5;
1885                         continue;
1886                 }
1887                 
1888                 if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1889                         long nr_loops = atol(argv[i+1]);
1890                         unsigned int seed = atoi(argv[i+2]);
1891                         unsigned int targetlen = atoi(argv[i+3]);
1892                         ret = cmd_rnddelete(nr_loops, seed, targetlen);
1893                         i += 3;
1894                         continue;
1895                 }
1896
1897                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1898                         long nr_loops = atol(argv[i+1]);
1899                         unsigned int seed = atoi(argv[i+2]);
1900                         unsigned int targetlen = atoi(argv[i+3]);
1901                         unsigned int chunksize = atoi(argv[i+4]);
1902                         unsigned long objectsize = atol(argv[i+5]);
1903                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1904                         i += 5;
1905                         continue;
1906                 }
1907
1908                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1909                         long nr_loops = atol(argv[i+1]);
1910                         long concurrent_reqs = atol(argv[i+2]);
1911                         int op = atoi(argv[i+3]);
1912                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1913                         i += 3;
1914                         continue;
1915                 }
1916
1917                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1918                         char *target = argv[i+1];
1919                         uint64_t offset = atol(argv[i+2]);
1920                         uint64_t size   = atol(argv[i+3]);
1921                         ret = cmd_read(target, offset, size);
1922                         i += 3;
1923                         continue;
1924                 }
1925
1926                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1927                         char *target = argv[i+1];
1928                         uint64_t offset = atol(argv[i+2]);
1929                         ret = cmd_write(target, offset);
1930                         i += 2;
1931                         continue;
1932                 }
1933
1934                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1935                         char *target = argv[i+1];
1936                         uint64_t offset = atol(argv[i+2]);
1937                         ret = cmd_truncate(target, offset);
1938                         i += 2;
1939                         continue;
1940                 }
1941
1942                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1943                         char *target = argv[i+1];
1944                         ret = cmd_delete(target);
1945                         i += 1;
1946                         continue;
1947                 }
1948
1949                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1950                         char *target = argv[i+1];
1951                         ret = cmd_acquire(target);
1952                         i += 1;
1953                         continue;
1954                 }
1955
1956                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1957                         char *target = argv[i+1];
1958                         ret = cmd_release(target);
1959                         i += 1;
1960                         continue;
1961                 }
1962
1963                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1964                         char *src = argv[i+1];
1965                         char *dst = argv[i+2];
1966                         ret = cmd_copy(src, dst);
1967                         i += 2;
1968                         continue;
1969                 }
1970
1971                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1972                         char *src = argv[i+1];
1973                         char *dst = argv[i+2];
1974                         ret = cmd_clone(src, dst);
1975                         i += 2;
1976                         continue;
1977                 }
1978
1979                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1980                         char *target = argv[i+1];
1981                         ret = cmd_info(target);
1982                         i += 1;
1983                         continue;
1984                 }
1985
1986
1987                 if (!parse_ports(argv[i]))
1988                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1989         }
1990
1991         /* xseg_leave(); */
1992         return ret;
1993 }