add support for configurable max allocated requests and local req cache
[archipelago] / xseg / peers / user / xseg-tool.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <ctype.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11
12 #include <xtypes/xhash.h>
13 #include <xtypes/xobj.h>
14 #include <xseg/xseg.h>
15 #include <xseg/protocol.h>
16 int help(void)
17 {
18         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
19                 "spec:\n"
20                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
21                 "global commands:\n"
22                 "    reportall\n"
23                 "    create\n"
24                 "    destroy\n"
25                 "    bind <portno>\n"
26                 "    signal <portno>\n"
27                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
28                 "port commands:\n"
29                 "    report\n"
30                 "    alloc_requests (to source) <nr>\n"
31                 "    free_requests (from source) <nr>\n"
32                 "    put_requests (all from dest)\n"
33                 "    put_replies (all from dest)\n"
34                 "    wait        <nr_replies>\n"
35                 "    complete    <nr_requests>\n"
36                 "    fail        <nr_requests>\n"
37                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
38                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
39                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
40                 "    info        <target>\n"
41                 "    read        <target> <offset> <size>\n"
42                 "    write       <target> <offset> < data\n"
43                 "    truncate    <target> <size>\n"
44                 "    delete      <target>\n"
45                 "    acquire     <target>\n"
46                 "    release     <target>\n"
47                 "    copy        <src>  <dst>\n"
48                 "    clone       <src>  <dst>\n"
49         );
50         return 1;
51 }
52
53
54 enum req_action {
55         REPORT = 1,
56         FAIL = 2,
57         COMPLETE = 3
58 };
59
60 enum queue {
61         FREE_QUEUE = 0,
62         REQUEST_QUEUE = 1,
63         REPLY_QUEUE = 2
64 };
65
66 char *namebuf;
67 char *chunk;
68 struct xseg_config cfg;
69 struct xseg *xseg;
70 uint32_t srcport, dstport;
71 uint64_t reqs;
72 #define mkname mkname_heavy
73 /* heavy distributes duplicates much more widely than light
74  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
75  */
76
77 xport sport = NoPort;
78 static void init_local_signal() 
79 {
80         if (xseg && sport != srcport){
81                 xseg_init_local_signal(xseg, srcport);
82                 sport = srcport;
83         }
84 }
85
86 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
87 {
88         int i;
89         char c;
90         for (i = 0; i < namelen; i += 1) {
91                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
92                 c = '0' + ((c + (c >> 4)) & 0xf);
93                 if (c > '9')
94                         c += 'a'-'0'-10;
95                 name[i] = c;
96                 seed *= ((seed % 137911) | 1) * 137911;
97         }
98 }
99
100 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
101 {
102         int i;
103         char c;
104         for (i = 0; i < namelen; i += 1) {
105                 c = seed;
106                 name[i] = 'A' + (c & 0xf);
107                 seed += 1;
108         }
109 }
110
111 uint64_t pick(uint64_t size)
112 {
113         return (uint64_t)((double)(RAND_MAX) / random());
114 }
115
116 void mkchunk(   char *chunk, uint32_t datalen,
117                 char *target, uint32_t targetlen, uint64_t offset)
118 {
119         long i, r, bufsize = targetlen + 16;
120         char buf[bufsize];
121         r = datalen % bufsize;
122         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
123
124         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125                 memcpy(chunk + i, buf, bufsize);
126
127         memcpy(chunk + datalen - r, buf, r);
128 }
129
130 int chkchunk(   char *chunk, uint32_t datalen,
131                 char *target, uint32_t targetlen, uint64_t offset)
132 {
133         long i, r;
134         int bufsize = targetlen + 16;
135         char buf[bufsize];
136         r = datalen % targetlen;
137         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
138
139         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
140                 if (memcmp(chunk + i, buf, bufsize)) {
141                         /*printf("mismatch: '%*s'* vs '%*s'\n",
142                                 bufsize, buf, datalen, chunk);
143                         */
144                         return 0;
145                 }
146
147         if (memcmp(chunk + datalen - r, buf, r))
148                 return 0;
149
150         return 1;
151 }
152
153
154 #define ALLOC_MIN 4096
155 #define ALLOC_MAX 1048576
156
157 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
158 {
159         static uint64_t alloc_size;
160         static char *buf;
161         uint64_t size = 0;
162         char *p;
163         size_t r;
164
165         if (alloc_size < ALLOC_MIN)
166                 alloc_size = ALLOC_MIN;
167
168         if (alloc_size > ALLOC_MAX)
169                 alloc_size = ALLOC_MAX;
170
171         p = realloc(buf, alloc_size);
172         if (!p) {
173                 if (buf)
174                         free(buf);
175                 buf = NULL;
176                 goto out;
177         }
178
179         buf = p;
180
181         while (!feof(fp)) {
182                 r = fread(buf + size, 1, alloc_size - size, fp);
183                 if (!r)
184                         break;
185                 size += r;
186                 if (size >= alloc_size) {
187                         p = realloc(buf, alloc_size * 2);
188                         if (!p) {
189                                 if (buf)
190                                         free(buf);
191                                 buf = NULL;
192                                 size = 0;
193                                 goto out;
194                         }
195                         buf = p;
196                         alloc_size *= 2;
197                 }
198         }
199
200 out:
201         *retbuf = buf;
202         *retsize = size;
203 }
204
205 void report_request(struct xseg_request *req)
206 {
207         char target[64], data[64];
208         char *req_target, *req_data;
209         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
210         req_target = xseg_get_target(xseg, req);
211         req_data = xseg_get_data(xseg, req);
212
213         strncpy(target, req_target, end);
214         target[end] = 0;
215         strncpy(data, req_data, 63);
216         data[63] = 0;
217         fprintf(stderr,
218                 "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
219                 "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
220                 "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
221                 (unsigned long) req, req->targetlen, (unsigned long long)req->target,
222                 target,
223                 (unsigned long long) req->datalen, (unsigned long long) req->data,
224                 data,
225                 (unsigned long long) req->offset, (unsigned long long) req->size,
226                 (unsigned long long) req->serviced, req->op, req->state, req->flags,
227                 (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
228                 (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
229
230
231 }
232
233 int cmd_info(char *target)
234 {
235         uint32_t targetlen = strlen(target);
236         size_t size = sizeof(uint64_t);
237         int r;
238         xport p;
239         struct xseg_request *req;
240         char *req_target;
241
242         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
243         if (!req) {
244                 fprintf(stderr, "No request!\n");
245                 return -1;
246         }
247
248         r = xseg_prep_request(xseg, req, targetlen, size);
249         if (r < 0) {
250                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
251                         (unsigned long) targetlen, (unsigned long) size);
252                 xseg_put_request(xseg, req, srcport);
253                 return -1;
254         }
255
256         req_target = xseg_get_target(xseg, req);
257         strncpy(req_target, target, targetlen);
258         req->offset = 0;
259         req->size = size;
260         req->op = X_INFO;
261
262         p = xseg_submit(xseg, req, srcport, X_ALLOC);
263         if (p == NoPort)
264                 return -1;
265
266         xseg_signal(xseg, p);
267
268         return 0;
269 }
270
271 int cmd_read(char *target, uint64_t offset, uint64_t size)
272 {
273         uint32_t targetlen = strlen(target);
274         int r;
275         xport p;
276         char *req_target;
277         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
278         if (!req) {
279                 fprintf(stderr, "No request\n");
280                 return -1;
281         }
282
283         r = xseg_prep_request(xseg, req, targetlen, size);
284         if (r < 0) {
285                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
286                         (unsigned long)targetlen, (unsigned long long)size);
287                 xseg_put_request(xseg, req, srcport);
288                 return -1;
289         }
290
291         req_target = xseg_get_target(xseg, req);
292         strncpy(req_target, target, targetlen);
293         req->offset = offset;
294         req->size = size;
295         req->op = X_READ;
296         report_request(req);
297         p = xseg_submit(xseg, req, srcport, X_ALLOC);
298         if (p == NoPort)
299                 return -1;
300
301         xseg_signal(xseg, p);
302         return 0;
303 }
304
305 int cmd_write(char *target, uint64_t offset)
306 {
307         char *buf = NULL;
308         int r;
309         xport p;
310         uint64_t size = 0;
311         char *req_target, *req_data;
312         uint32_t targetlen = strlen(target);
313         struct xseg_request *req;
314
315         inputbuf(stdin, &buf, &size);
316         if (!size) {
317                 fprintf(stderr, "No input\n");
318                 return -1;
319         }
320
321         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
322         if (!req) {
323                 fprintf(stderr, "No request\n");
324                 return -1;
325         }
326
327         r = xseg_prep_request(xseg, req, targetlen, size);
328         if (r < 0) {
329                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
330                         (unsigned long)targetlen, (unsigned long long)size);
331                 xseg_put_request(xseg, req, srcport);
332                 return -1;
333         }
334
335         req_target = xseg_get_target(xseg, req);
336         strncpy(req_target, target, targetlen);
337         
338         req_data = xseg_get_data(xseg, req);
339         memcpy(req_data, buf, size);
340         req->offset = offset;
341         req->size = size;
342         req->op = X_WRITE;
343
344         p = xseg_submit(xseg, req, srcport, X_ALLOC);
345         if (p == NoPort) {
346                 fprintf(stderr, "Cannot submit\n");
347                 return -1;
348         }
349         xseg_signal(xseg, p);
350
351         return 0;
352 }
353
354 int cmd_truncate(char *target, uint64_t offset)
355 {
356         return 0;
357 }
358
359 int cmd_delete(char *target)
360 {
361         uint32_t targetlen = strlen(target);
362         int r;
363         struct xseg_request *req;
364         init_local_signal();
365         xseg_bind_port(xseg, srcport, NULL);
366
367         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
368         if (!req) {
369                 fprintf(stderr, "No request!\n");
370                 return -1;
371         }
372
373         r = xseg_prep_request(xseg, req, targetlen, 0);
374         if (r < 0) {
375                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
376                         (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
377                 xseg_put_request(xseg, req, srcport);
378                 return -1;
379         }
380
381         char *reqtarget = xseg_get_target(xseg, req);
382         strncpy(reqtarget, target, targetlen);
383         req->op = X_DELETE;
384
385         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
386         if (p == NoPort){
387                 fprintf(stderr, "Couldn't submit request\n");
388                 xseg_put_request(xseg, req, srcport);
389                 return -1;
390         }
391
392         xseg_signal(xseg, p);
393
394         return 0;
395 }
396
397 int cmd_acquire(char *target)
398 {
399         uint32_t targetlen = strlen(target);
400         int r;
401         xport p;
402         char *req_target;
403         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
404         if (!req) {
405                 fprintf(stderr, "No request\n");
406                 return -1;
407         }
408
409         r = xseg_prep_request(xseg, req, targetlen, 0);
410         if (r < 0) {
411                 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
412                         (unsigned long)targetlen);
413                 xseg_put_request(xseg, req, srcport);
414                 return -1;
415         }
416
417         req_target = xseg_get_target(xseg, req);
418         strncpy(req_target, target, targetlen);
419         req->offset = 0;
420         req->size = 0;
421         req->op = X_OPEN;
422         p = xseg_submit(xseg, req, srcport, X_ALLOC);
423         if (p == NoPort)
424                 return -1;
425
426         xseg_signal(xseg, p);
427         return 0;
428 }
429
430 int cmd_release(char *target)
431 {
432         uint32_t targetlen = strlen(target);
433         int r;
434         xport p;
435         char *req_target;
436         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
437         if (!req) {
438                 fprintf(stderr, "No request\n");
439                 return -1;
440         }
441
442         r = xseg_prep_request(xseg, req, targetlen, 0);
443         if (r < 0) {
444                 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
445                         (unsigned long)targetlen);
446                 xseg_put_request(xseg, req, srcport);
447                 return -1;
448         }
449
450         req_target = xseg_get_target(xseg, req);
451         strncpy(req_target, target, targetlen);
452         req->offset = 0;
453         req->size = 0;
454         req->op = X_CLOSE;
455         req->flags = XF_FORCE;
456         p = xseg_submit(xseg, req, srcport, X_ALLOC);
457         if (p == NoPort)
458                 return -1;
459
460         xseg_signal(xseg, p);
461         return 0;
462         return 0;
463 }
464
465 int cmd_copy(char *src, char *dst)
466 {
467         uint32_t targetlen = strlen(dst);
468         uint32_t parentlen = strlen(src);
469         struct xseg_request *req;
470         struct xseg_request_copy *xcopy;
471         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
472         if (!req) {
473                 fprintf(stderr, "No request\n");
474                 return -1;
475         }
476
477         int r = xseg_prep_request(xseg, req, targetlen,
478                         sizeof(struct xseg_request_copy));
479         if (r < 0) {
480                 fprintf(stderr, "Cannot prepare request!\n");
481                 xseg_put_request(xseg, req, srcport);
482                 return -1;
483         }
484
485         char *target = xseg_get_target(xseg, req);
486         char *data = xseg_get_data(xseg, req);
487
488         strncpy(target, dst, targetlen);
489         xcopy = (struct xseg_request_copy *) data;
490         strncpy(xcopy->target, src, parentlen);
491         xcopy->targetlen = parentlen;
492         req->offset = 0;
493         req->size = sizeof(struct xseg_request_copy);
494         req->op = X_COPY;
495
496         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
497         if (p == NoPort){
498                 fprintf(stderr, "Cannot submit request\n");
499                 return -1;
500         }
501         xseg_signal(xseg, p);
502
503         return 0;
504         return 0;
505 }
506
507 int cmd_clone(char *src, char *dst)
508 {
509
510         uint32_t targetlen = strlen(dst);
511         uint32_t parentlen = strlen(src);
512         struct xseg_request *req;
513         struct xseg_request_clone *xclone;
514         xseg_bind_port(xseg, srcport, NULL);
515         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
516         if (!req) {
517                 fprintf(stderr, "No request\n");
518                 return -1;
519         }
520
521         int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
522         if (r < 0) {
523                 fprintf(stderr, "Cannot prepare request!\n");
524                 xseg_put_request(xseg, req, srcport);
525                 return -1;
526         }
527
528         char *target = xseg_get_target(xseg, req);
529         char *data = xseg_get_data(xseg, req);
530
531         strncpy(target, dst, targetlen);
532         xclone = (struct xseg_request_clone *) data;
533         strncpy(xclone->target, src, parentlen);
534         xclone->targetlen = parentlen;
535         xclone->size = -1;
536         req->offset = 0;
537         req->size = sizeof(struct xseg_request_clone);
538         req->op = X_CLONE;
539
540         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
541         if (p == NoPort){
542                 fprintf(stderr, "Cannot submit request\n");
543                 return -1;
544         }
545         xseg_signal(xseg, p);
546
547         return 0;
548 }
549
550 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
551                 struct xseg_request *req)
552 {
553         FILE *logfp;
554         char target[64], data[64];
555         char *req_target, *req_data;
556         /* null terminate name in case of req->target is less than 63 characters,
557          * and next character after name (aka first byte of next buffer) is not
558          * null
559          */
560         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
561         
562         req_target = xseg_get_target(xseg, req);
563         req_data = xseg_get_data(xseg, req);
564
565         logfp = fdopen(logfd, "a");
566         if (!logfp)
567                 return;
568
569         switch(method) {
570         case 0:
571                 strncpy(target, req_target, end);
572                 target[end] = 0;
573                 strncpy(data, req_data, 63);
574                 data[63] = 0;
575
576                 fprintf(logfp,
577                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
578                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
579                         (unsigned int)portno1,
580                         (unsigned int)portno2,
581                         (unsigned int)req->op,
582                         (unsigned long long)req->offset,
583                         (unsigned long)req->size,
584                         (unsigned int)req->state,
585                         (unsigned int)req->targetlen, target,
586                         (unsigned long long)req->datalen, data);
587                 break;
588         case 1:
589                 fprintf(logfp,
590                         "src port: %u, dst port: %u, op: %u\n",
591                         (unsigned int)portno1,
592                         (unsigned int)portno2,
593                         (unsigned int)req->op);
594                 break;
595         case 2:
596                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
597                         (unsigned int)portno1,
598                         (unsigned int)portno2,
599                         (unsigned long long)++reqs);
600         }
601
602         fclose(logfp);
603         return;
604 }
605
606 #define LOG_ACCEPT  0
607 #define LOG_RECEIVE 1
608
609 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
610 {
611         struct xseg_request *req;
612         int logfd, method;
613         if (!strcmp(logfile, "-"))
614                 logfd = 1;
615         else {
616                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
617                 if (logfd < 0) {
618                         perror(logfile);
619                         return -1;
620                 }
621         }
622
623         if (!strcmp(how, "full"))
624                 method = 0;
625         else if (!strcmp(how, "summary"))
626                 method = 1;
627         else
628                 method = 2;
629
630         for (;;) {
631                 int reloop = 0, active;
632                 xseg_prepare_wait(xseg, portno1);
633                 xseg_prepare_wait(xseg, portno2);
634                 req = NULL;
635
636                 for (;;) {
637                         active = 0;
638
639                         //FIXME
640                         req = xseg_accept(xseg, portno1, 0);
641                         if (req) {
642                                 xseg_submit(xseg, req, portno2, X_ALLOC);
643                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
644                                 active += 1;
645                         }
646
647                         req = xseg_accept(xseg, portno2, 0);
648                         if (req) {
649                                 xseg_submit(xseg, req, portno1, X_ALLOC);
650                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
651                                 active += 1;
652                         }
653
654                         req = xseg_receive(xseg, portno1, 0);
655                         if (req) {
656                                 xseg_respond(xseg, req, portno2, X_ALLOC);
657                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
658                                 active += 1;
659                         }
660
661                         req = xseg_receive(xseg, portno2, 0);
662                         if (req) {
663                                 xseg_respond(xseg, req, portno1, X_ALLOC);
664                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
665                                 active += 1;
666                         }
667
668                         if (active == 0) {
669                                 if (reloop)
670                                         break;
671                                 /* wait on multiple queues? */
672                                 xseg_wait_signal(xseg, 100000);
673                                 break;
674                         } else {
675                                 xseg_cancel_wait(xseg, portno1);        
676                                 xseg_cancel_wait(xseg, portno2);        
677                                 reloop = 1;
678                         }
679                 }
680         }
681
682         close(logfd);
683
684         return 0;
685 }
686
687 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
688 {
689         if (loops < 0)
690                 return help();
691
692         if (targetlen >= chunksize) {
693                 fprintf(stderr, "targetlen >= chunksize\n");
694                 return -1;
695         }
696
697         char *p = realloc(namebuf, targetlen+1);
698         if (!p) {
699                 fprintf(stderr, "Cannot allocate memory\n");
700                 return -1;
701         }
702         namebuf = p;
703
704         p = realloc(chunk, chunksize);
705         if (!p) {
706                 fprintf(stderr, "Cannot allocate memory\n");
707                 return -1;
708         }
709         chunk = p;
710         memset(chunk, 0, chunksize);
711
712         srandom(seed);
713
714         struct xseg_request *submitted = NULL, *received;
715         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
716         int reported = 0, r;
717         uint64_t offset;
718         xport port;
719         char *req_data, *req_target;
720         seed = random();
721         init_local_signal();
722
723         for (;;) {
724                 xseg_prepare_wait(xseg, srcport);
725                 if (nr_submitted < loops &&
726                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
727                         xseg_cancel_wait(xseg, srcport);
728                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
729                         if (r < 0) {
730                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
731                                         targetlen, chunksize);
732                                 xseg_put_request(xseg, submitted, srcport);
733                                 return -1;
734                         }
735                         
736                         req_target = xseg_get_target(xseg, submitted);
737                         req_data = xseg_get_data(xseg, submitted);
738
739                         reported = 0;
740                         mkname(namebuf, targetlen, seed);
741                         namebuf[targetlen] = 0;
742                         //printf("%ld: %s\n", nr_submitted, namebuf);
743                         strncpy(req_target, namebuf, targetlen);
744                         offset = 0;// pick(size);
745                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
746
747                         submitted->offset = offset;
748                         submitted->size = chunksize;
749                         submitted->op = X_WRITE;
750                         submitted->flags |= XF_NOSYNC;
751
752                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
753                         if (port == NoPort) {
754                                 xseg_put_request(xseg, submitted, srcport);
755                         } else {
756                                 seed = random();
757                                 nr_submitted += 1;
758                                 xseg_signal(xseg, port);
759                         }
760                 }
761
762                 received = xseg_receive(xseg, srcport, 0);
763                 if (received) {
764                         xseg_cancel_wait(xseg, srcport);
765                         nr_received += 1;
766                         if (!(received->state & XS_SERVED)) {
767                                 nr_failed += 1;
768                                 report_request(received);
769                         }
770                         if (xseg_put_request(xseg, received, srcport))
771                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
772                 }
773
774                 if (!submitted && !received)
775                         xseg_wait_signal(xseg, 1000000);
776
777                         if (nr_submitted % 1000 == 0 && !reported) {
778                                 reported = 1;
779                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
780                                         nr_submitted, nr_received, nr_failed);
781                         }
782
783                         if (nr_received >= loops)
784                                 break;
785         }
786
787         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
788                 nr_submitted, nr_received, nr_failed);
789         return 0;
790 }
791
792 int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
793 {
794         if (loops < 0)
795                 return help();
796         char *p = realloc(namebuf, targetlen+1);
797         if (!p) {
798                 fprintf(stderr, "Cannot allocate memory\n");
799                 return -1;
800         }
801         namebuf = p;
802
803         srandom(seed);
804
805         struct xseg_request *submitted = NULL, *received;
806         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
807         int reported = 0, r;
808         xport port;
809         char *req_target;
810         seed = random();
811         init_local_signal();
812
813         for (;;) {
814                 xseg_prepare_wait(xseg, srcport);
815                 if (nr_submitted < loops &&
816                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
817                         xseg_cancel_wait(xseg, srcport);
818                         r = xseg_prep_request(xseg, submitted, targetlen, 0);
819                         if (r < 0) {
820                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
821                                         targetlen, 0);
822                                 xseg_put_request(xseg, submitted, srcport);
823                                 return -1;
824                         }
825                         
826                         req_target = xseg_get_target(xseg, submitted);
827
828                         reported = 0;
829                         mkname(namebuf, targetlen, seed);
830                         namebuf[targetlen] = 0;
831                         //printf("%ld: %s\n", nr_submitted, namebuf);
832                         strncpy(req_target, namebuf, targetlen);
833                         submitted->offset = 0;
834                         submitted->size = 0;
835                         submitted->op = X_DELETE;
836                         submitted->flags = 0;
837
838                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
839                         if (port == NoPort) {
840                                 xseg_put_request(xseg, submitted, srcport);
841                         } else {
842                                 seed = random();
843                                 nr_submitted += 1;
844                                 xseg_signal(xseg, port);
845                         }
846                 }
847
848                 received = xseg_receive(xseg, srcport, 0);
849                 if (received) {
850                         xseg_cancel_wait(xseg, srcport);
851                         nr_received += 1;
852                         if (!(received->state & XS_SERVED)) {
853                                 nr_failed += 1;
854                                 report_request(received);
855                         }
856                         if (xseg_put_request(xseg, received, srcport))
857                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
858                 }
859
860                 if (!submitted && !received)
861                         xseg_wait_signal(xseg, 1000000);
862
863                         if (nr_submitted % 1000 == 0 && !reported) {
864                                 reported = 1;
865                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
866                                         nr_submitted, nr_received, nr_failed);
867                         }
868
869                         if (nr_received >= loops)
870                                 break;
871         }
872
873         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
874                 nr_submitted, nr_received, nr_failed);
875         return 0;
876 }
877 /* note:
878  * prepare/wait rhythm,
879  * files are converted to independent chunk access patterns,
880 */
881
882 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
883 {
884         if (loops < 0)
885                 return help();
886
887         if (targetlen >= chunksize) {
888                 fprintf(stderr, "targetlen >= chunksize\n");
889                 return -1;
890         }
891
892         char *p = realloc(namebuf, targetlen+1);
893         if (!p) {
894                 fprintf(stderr, "Cannot allocate memory\n");
895                 return -1;
896         }
897         namebuf = p;
898
899         p = realloc(chunk, chunksize);
900         if (!p) {
901                 fprintf(stderr, "Cannot allocate memory\n");
902                 return -1;
903         }
904         chunk = p;
905         memset(chunk, 0, chunksize);
906
907         srandom(seed);
908
909         struct xseg_request *submitted = NULL, *received;
910         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
911         int reported = 0, r;
912         uint64_t offset;
913         xport port;
914         char *req_data, *req_target;
915         init_local_signal();
916
917         seed = random();
918         for (;;) {
919                 submitted = NULL;
920                 xseg_prepare_wait(xseg, srcport);
921                 if (nr_submitted < loops &&
922                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
923                         xseg_cancel_wait(xseg, srcport);
924                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
925                         if (r < 0) {
926                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
927                                         targetlen, chunksize);
928                                 xseg_put_request(xseg, submitted, srcport);
929                                 return -1;
930                         }
931
932                         req_target = xseg_get_target(xseg, submitted);
933                         reported = 0;
934                         mkname(namebuf, targetlen, seed);
935                         namebuf[targetlen] = 0;
936                         //printf("%ld: %s\n", nr_submitted, namebuf);
937                         offset = 0;//pick(size);
938
939                         strncpy(req_target, namebuf, targetlen);
940                         submitted->offset = offset;
941                         submitted->size = chunksize;
942                         submitted->op = X_READ;
943                         port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
944                         if (port == NoPort) {
945                                 xseg_put_request(xseg, submitted, srcport);
946                         } else {
947                                 seed = random();
948                                 nr_submitted += 1;
949                                 xseg_signal(xseg, port);
950                         }
951                 }
952
953                 received = xseg_receive(xseg, srcport, 0);
954                 if (received) {
955                         xseg_cancel_wait(xseg, srcport);
956                         nr_received += 1;
957                         req_target = xseg_get_target(xseg, received);
958                         req_data = xseg_get_data(xseg, received);
959                         if (!(received->state & XS_SERVED)) {
960                                 nr_failed += 1;
961                                 report_request(received);
962                         } else if (!chkchunk(req_data, received->datalen,
963                                         req_target, received->targetlen, received->offset)) {
964         //                      report_request(received);
965                                 nr_mismatch += 1;
966                         }
967
968                         if (xseg_put_request(xseg, received, srcport))
969                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
970                 }
971
972                 if (!submitted && !received)
973                         xseg_wait_signal(xseg, 1000000);
974
975                 if (nr_submitted % 1000 == 0 && !reported) {
976                         reported = 1;
977                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
978                         nr_submitted, nr_received, nr_failed, nr_mismatch);
979                 }
980
981                 if (nr_received >= loops)
982                         break;
983         }
984
985         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
986                 nr_submitted, nr_received, nr_failed, nr_mismatch);
987         return 0;
988 }
989
990 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
991 {
992         if (loops < 0)
993                 return help();
994
995         struct xseg_request *submitted = NULL, *received;
996         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
997         int reported = 0, r;
998         uint64_t offset;
999         uint32_t targetlen = 10, chunksize = 4096;
1000         struct timeval tv1, tv2;
1001         xport p;
1002         char *req_data, *req_target;
1003
1004         xseg_bind_port(xseg, srcport, NULL);
1005
1006         gettimeofday(&tv1, NULL);
1007         for (;;) {
1008                 submitted = NULL;
1009                 xseg_prepare_wait(xseg, srcport);
1010                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
1011                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
1012                         xseg_cancel_wait(xseg, srcport);
1013                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
1014                         if (r < 0) {
1015                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
1016                                         targetlen, chunksize);
1017                                 xseg_put_request(xseg, submitted, srcport);
1018                                 return -1;
1019                         }
1020                         
1021                         //FIXME
1022                         ++nr_flying;
1023                         nr_submitted += 1;
1024                         reported = 0;
1025                         offset = 0;//pick(size);
1026
1027                         submitted->offset = offset;
1028                         submitted->size = chunksize;
1029                         req_target = xseg_get_target(xseg, submitted);
1030                         req_data = xseg_get_data(xseg, submitted);
1031
1032                         if (op == 0)
1033                                 submitted->op = X_INFO;
1034                         else if (op == 1)
1035                                 submitted->op = X_READ;
1036                         else if (op == 2) {
1037                                 submitted->op = X_WRITE;
1038                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
1039                         }
1040
1041                         p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
1042                         if ( p != NoPort){
1043                                 if (xseg_signal(xseg, p) < 0)
1044                                         perror("Cannot signal peer");
1045                         }
1046                 }
1047                 received = xseg_receive(xseg, srcport, 0);
1048                 if (received) {
1049                         xseg_cancel_wait(xseg, srcport);
1050                         --nr_flying;
1051                         if (nr_received == 0)
1052                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
1053                                         (unsigned long long)received->elapsed);
1054                         nr_received += 1;
1055                         if (!(received->state & XS_SERVED)) {
1056                                 nr_failed += 1;
1057                                 //report_request(received);
1058                         }
1059
1060                         if (xseg_put_request(xseg, received, srcport))
1061                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
1062                 }
1063
1064                 if (!submitted && !received)
1065                         xseg_wait_signal(xseg, 10000000L);
1066
1067                 if (nr_received >= loops)
1068                         break;
1069         }
1070         gettimeofday(&tv2, NULL);
1071
1072         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1073                 nr_submitted, nr_received, nr_failed, nr_mismatch);
1074         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
1075         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
1076
1077         return 0;
1078 }
1079
1080 static void lock_status(struct xlock *lock, char *buf, int len)
1081 {
1082         int r;
1083         if (lock->owner == Noone)
1084                 r = snprintf(buf, len, "Locked: No");
1085         else
1086                 r = snprintf(buf, len, "Locked: Yes (Owner: %lu)", lock->owner);
1087         if (r >= len)
1088                 buf[len-1] = 0;
1089 }
1090
1091 int cmd_report(uint32_t portno)
1092 {
1093         char fls[64], rls[64], pls[64]; // buffer to store lock status
1094         struct xseg_port *port = xseg_get_port(xseg, portno);
1095         if (!port) {
1096                 printf("port %u is not assigned\n", portno);
1097                 return 0;
1098         }
1099         struct xq *fq, *rq, *pq;
1100         fq = xseg_get_queue(xseg, port, free_queue);
1101         rq = xseg_get_queue(xseg, port, request_queue);
1102         pq = xseg_get_queue(xseg, port, reply_queue);
1103         lock_status(&port->fq_lock, fls, 64);
1104         lock_status(&port->rq_lock, rls, 64);
1105         lock_status(&port->pq_lock, pls, 64);
1106         fprintf(stderr, "port %u:\n"
1107                 "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
1108                 "       free_queue [%p] count : %4llu | %s\n"
1109                 "    request_queue [%p] count : %4llu | %s\n"
1110                 "      reply_queue [%p] count : %4llu | %s\n",
1111                 portno, (unsigned long long)port->alloc_reqs, 
1112                 (unsigned long long)port->max_alloc_reqs,
1113                 xseg->src_gw[portno],
1114                 xseg->dst_gw[portno],
1115                 (void *)fq, (unsigned long long)xq_count(fq), fls,
1116                 (void *)rq, (unsigned long long)xq_count(rq), rls,
1117                 (void *)pq, (unsigned long long)xq_count(pq), pls);
1118         return 0;
1119 }
1120
1121 int cmd_join(void)
1122 {
1123         if (xseg)
1124                 return 0;
1125
1126         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
1127         if (!xseg) {
1128                 fprintf(stderr, "cannot join segment!\n");
1129                 return -1;
1130         }
1131         return 0;
1132 }
1133 static void print_hanlder(char *name, struct xobject_h *obj_h)
1134 {
1135         char ls[64];
1136         lock_status(&obj_h->lock, ls, 64);
1137         fprintf(stderr, "%20s: free: %4llu, allocated: %4llu, allocated space: %7llu (object size: %llu), Lock %s\n",
1138                         name,
1139                         (unsigned long long) obj_h->nr_free,
1140                         (unsigned long long) obj_h->nr_allocated,
1141                         (unsigned long long) obj_h->allocated_space,
1142                         (unsigned long long) obj_h->obj_size, ls);
1143 }
1144
1145 //FIXME ugly
1146 static void print_heap(struct xseg *xseg)
1147 {
1148         char *UNIT[4];
1149         UNIT[0] = "B";
1150         UNIT[1] = "KiB";
1151         UNIT[2] = "MiB";
1152         UNIT[3] = "GiB";
1153         uint64_t MULT[4];
1154         MULT[0] = 1;
1155         MULT[1] = 1024;
1156         MULT[2] = 1024*1024;
1157         MULT[3] = 1024*1024*1024;
1158
1159         int u;
1160         uint64_t t;
1161         fprintf(stderr, "Heap usage: ");
1162         u = 0;
1163         t = xseg->heap->cur;
1164         while (t > 0) {
1165                 t /= 1024;
1166                 u++;
1167         }
1168         if (!t)
1169                 u--;
1170         t = xseg->heap->cur / MULT[u];
1171         if (t < 10){
1172                 float tf = ((float)(xseg->heap->cur))/((float)MULT[u]);
1173                 fprintf(stderr, "%2.1f %s/", tf, UNIT[u]);
1174         }
1175         else {
1176                 unsigned int tu = xseg->heap->cur / MULT[u];
1177                 fprintf(stderr, "%3u %s/", tu, UNIT[u]);
1178         }
1179
1180         u = 0;
1181         t = xseg->config.heap_size;
1182         while (t > 0) {
1183                 t /= 1024;
1184                 u++;
1185         }
1186         if (!t)
1187                 u--;
1188         t = xseg->config.heap_size/MULT[u];
1189         if (t < 10){
1190                 float tf = ((float)(xseg->config.heap_size))/(float)MULT[u];
1191                 fprintf(stderr, "%2.1f %s ", tf, UNIT[u]);
1192         }
1193         else {
1194                 unsigned int tu = xseg->config.heap_size / MULT[u];
1195                 fprintf(stderr, "%3u %s ", tu, UNIT[u]);
1196         }
1197         char ls[64];
1198         lock_status(&xseg->heap->lock, ls, 64);
1199         fprintf(stderr, "(%llu / %llu), %s\n",
1200                         (unsigned long long)xseg->heap->cur,
1201                         (unsigned long long)xseg->config.heap_size,
1202                         ls);
1203 }
1204
1205 int cmd_reportall(void)
1206 {
1207         uint32_t t;
1208
1209         if (cmd_join())
1210                 return -1;
1211
1212         fprintf(stderr, "Segment lock: %s\n",
1213                 (xseg->shared->flags & XSEG_F_LOCK) ? "Locked" : "Unlocked");
1214         print_heap(xseg);
1215         /* fprintf(stderr, "Heap usage: %llu / %llu\n", */
1216         /*              (unsigned long long)xseg->heap->cur, */
1217         /*              (unsigned long long)xseg->config.heap_size); */
1218         fprintf(stderr, "Handlers: \n");
1219         print_hanlder("Requests handler", xseg->request_h);
1220         print_hanlder("Ports handler", xseg->port_h);
1221         print_hanlder("Objects handler", xseg->object_handlers);
1222         fprintf(stderr, "\n");
1223
1224         for (t = 0; t < xseg->config.nr_ports; t++)
1225                 cmd_report(t);
1226
1227         return 0;
1228 }
1229
1230
1231 int finish_req(struct xseg_request *req, enum req_action action)
1232 {
1233         if (action == COMPLETE){
1234                 req->state &= ~XS_FAILED;
1235                 req->state |= XS_SERVED;
1236         } else {
1237                 req->state |= XS_FAILED;
1238                 req->state &= ~XS_SERVED;
1239         }
1240         req->serviced = 0;
1241         xport p = xseg_respond(xseg, req, srcport, X_ALLOC);
1242         if (p == NoPort)
1243                 xseg_put_request(xseg, req, srcport);
1244         else
1245                 xseg_signal(xseg, p);
1246         return 0;
1247 }
1248
1249 //FIXME this should be in xseg lib?
1250 static int isDangling(struct xseg_request *req)
1251 {
1252         xport i;
1253         struct xseg_port *port;
1254         for (i = 0; i < xseg->config.nr_ports; i++) {
1255                 if (xseg->ports[i]){
1256                         port = xseg_get_port(xseg, i);
1257                         if (!port){
1258                                 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1259                                 continue;
1260                         }
1261                         struct xq *fq, *rq, *pq;
1262                         fq = xseg_get_queue(xseg, port, free_queue);
1263                         rq = xseg_get_queue(xseg, port, request_queue);
1264                         pq = xseg_get_queue(xseg, port, reply_queue);
1265                         xlock_acquire(&port->fq_lock, srcport);
1266                         if (__xq_check(fq, XPTR_MAKE(req, xseg->segment))){
1267                                         xlock_release(&port->fq_lock);
1268                                         return 0;
1269                         }
1270                         xlock_release(&port->fq_lock);
1271                         xlock_acquire(&port->rq_lock, srcport);
1272                         if (__xq_check(rq, XPTR_MAKE(req, xseg->segment))){
1273                                         xlock_release(&port->rq_lock);
1274                                         return 0;
1275                         }
1276                         xlock_release(&port->rq_lock);
1277                         xlock_acquire(&port->pq_lock, srcport);
1278                         if (__xq_check(pq, XPTR_MAKE(req, xseg->segment))){
1279                                         xlock_release(&port->pq_lock);
1280                                         return 0;
1281                         }
1282                         xlock_release(&port->pq_lock);
1283                 }
1284         }
1285         return 1;
1286 }
1287
1288 int prompt_user(char *msg)
1289 {
1290         int c = 0, r = -1;
1291         printf("%s [y/n]: ", msg);
1292         while (1) {
1293                 c = fgetc(stdin);
1294                 if (c == 'y' || c == 'Y')
1295                         r = 1;
1296                 else if (c == 'n' || c == 'N')
1297                         r = 0;
1298                 else if (c == '\n'){
1299                         if (r == -1)
1300                                 printf("%s [y/n]: ", msg);
1301                         else
1302                                 break;
1303                 }
1304         }
1305         return r;
1306 }
1307
1308 //FIXME this should be in xseg lib?
1309 int cmd_verify(int fix)
1310 {
1311         if (cmd_join())
1312                 return -1;
1313         //segment lock
1314         if (xseg->shared->flags & XSEG_F_LOCK){
1315                 fprintf(stderr, "Segment lock: Locked\n");
1316                 if (fix && prompt_user("Unlock it ?"))
1317                         xseg->shared->flags &= ~XSEG_F_LOCK;
1318         }
1319         //heap lock
1320         if (xseg->heap->lock.owner != Noone){
1321                 fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
1322                         (unsigned long long)xseg->heap->lock.owner);
1323                 if (fix && prompt_user("Unlock it ?"))
1324                         xlock_release(&xseg->heap->lock);
1325         }
1326         //obj_h locks
1327         if (xseg->request_h->lock.owner != Noone){
1328                 fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
1329                         (unsigned long long)xseg->request_h->lock.owner);
1330                 if (fix && prompt_user("Unlock it ?"))
1331                         xlock_release(&xseg->request_h->lock);
1332         }
1333         if (xseg->port_h->lock.owner != Noone){
1334                 fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
1335                         (unsigned long long)xseg->port_h->lock.owner);
1336                 if (fix && prompt_user("Unlock it ?"))
1337                         xlock_release(&xseg->port_h->lock);
1338         }
1339         if (xseg->object_handlers->lock.owner != Noone){
1340                 fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
1341                         (unsigned long long)xseg->object_handlers->lock.owner);
1342                 if (fix && prompt_user("Unlock it ?"))
1343                         xlock_release(&xseg->object_handlers->lock);
1344         }
1345         //take segment lock?
1346         xport i;
1347         struct xseg_port *port;
1348         for (i = 0; i < xseg->config.nr_ports; i++) {
1349                 if (xseg->ports[i]){
1350                         port = xseg_get_port(xseg, i);
1351                         if (!port){
1352                                 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1353                                 continue;
1354                         }
1355                         if (port->fq_lock.owner != Noone) {
1356                                 fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
1357                                                 i, (unsigned long long)port->fq_lock.owner);
1358                                 if (fix && prompt_user("Unlock it ?"))
1359                                         xlock_release(&port->fq_lock);
1360                         }
1361                         if (port->rq_lock.owner != Noone) {
1362                                 fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
1363                                                 i, (unsigned long long)port->rq_lock.owner);
1364                                 if (fix && prompt_user("Unlock it ?"))
1365                                         xlock_release(&port->rq_lock);
1366                         }
1367                         if (port->pq_lock.owner != Noone) {
1368                                 fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
1369                                                 i, (unsigned long long)port->pq_lock.owner);
1370                                 if (fix && prompt_user("Unlock it ?"))
1371                                         xlock_release(&port->pq_lock);
1372                         }
1373                 }
1374         }
1375
1376         struct xobject_h *obj_h = xseg->request_h;
1377         struct xobject_iter it;
1378
1379         struct xseg_request *req;
1380         xlock_acquire(&obj_h->lock, srcport);
1381         xobj_iter_init(obj_h, &it);
1382         while (xobj_iterate(obj_h, &it, (void **)&req)){
1383                 //FIXME this will not work cause obj->magic - req->serial is not
1384                 //touched when a request is get
1385                 /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
1386                 if (isDangling(req) && !__xobj_isFree(obj_h, req)){
1387                         report_request(req);
1388                         if (fix && prompt_user("Fail it ?")){
1389                                 printf("Finishing ...\n");
1390                                 finish_req(req, FAIL);
1391                         }
1392                 }
1393         }
1394         xlock_release(&obj_h->lock);
1395         return 0;
1396 }
1397
1398 int cmd_inspectq(xport portno, enum queue qt)
1399 {
1400         if (cmd_join())
1401                 return -1;
1402
1403         struct xq *q;
1404         struct xlock *l;
1405         struct xseg_port *port = xseg_get_port(xseg, portno);
1406         if (!port)
1407                 return -1;
1408         if (qt == FREE_QUEUE){
1409                 q = xseg_get_queue(xseg, port, free_queue);
1410                 l = &port->fq_lock;
1411         }
1412         else if (qt == REQUEST_QUEUE){
1413                 q = xseg_get_queue(xseg, port, request_queue);
1414                 l = &port->rq_lock;
1415         }
1416         else if (qt == REPLY_QUEUE) {
1417                 q = xseg_get_queue(xseg, port, reply_queue);
1418                 l = &port->rq_lock;
1419         }
1420         else
1421                 return -1;
1422         xlock_acquire(l, srcport);
1423         xqindex i,c = xq_count(q);
1424         if (c) {
1425                 struct xseg_request *req;
1426                 xptr xqi;
1427                 for (i = 0; i < c; i++) {
1428                         xqi = __xq_pop_head(q);
1429                         req = XPTR_TAKE(xqi, xseg->segment);
1430                         report_request(req);
1431                         __xq_append_tail(q, xqi);
1432                 }
1433         }
1434         else {
1435                 fprintf(stderr, "Queue is empty\n\n");
1436         }
1437         xlock_release(l);
1438         return 0;
1439 }
1440
1441
1442 int cmd_request(struct xseg_request *req, enum req_action action)
1443 {
1444         if (cmd_join())
1445                 return -1;
1446
1447         struct xobject_h *obj_h = xseg->request_h;
1448         if (!xobj_check(obj_h, req))
1449                 return -1;
1450
1451         if (action == REPORT)
1452                 report_request(req);
1453         else if (action == FAIL){
1454                 report_request(req);
1455                 if (prompt_user("fail it ?")){
1456                         printf("Finishing ...\n");
1457                         finish_req(req, FAIL);
1458                 }
1459         }
1460         else if (action == COMPLETE){
1461                 report_request(req);
1462                 if (prompt_user("Complete it ?")){
1463                         printf("Finishing ...\n");
1464                         finish_req(req, COMPLETE);
1465                 }
1466         }
1467         return 0;
1468 }
1469
1470 int cmd_create(void)
1471 {
1472         int r = xseg_create(&cfg);
1473         if (r) {
1474                 fprintf(stderr, "cannot create segment!\n");
1475                 return -1;
1476         }
1477
1478         fprintf(stderr, "Segment initialized.\n");
1479         return 0;
1480 }
1481
1482 int cmd_destroy(void)
1483 {
1484         if (!xseg && cmd_join())
1485                 return -1;
1486         xseg_leave(xseg);
1487         xseg_destroy(xseg);
1488         xseg = NULL;
1489         fprintf(stderr, "Segment destroyed.\n");
1490         return 0;
1491 }
1492
1493 int cmd_alloc_requests(unsigned long nr)
1494 {
1495         return xseg_alloc_requests(xseg, srcport, nr);
1496 }
1497
1498 int cmd_free_requests(unsigned long nr)
1499 {
1500         return xseg_free_requests(xseg, srcport, nr);
1501 }
1502
1503 int cmd_put_requests(void)
1504 {
1505         struct xseg_request *req;
1506
1507         for (;;) {
1508                 req = xseg_accept(xseg, dstport, 0);
1509                 if (!req)
1510                         break;
1511                 if (xseg_put_request(xseg, req, srcport))
1512                         fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1513         }
1514
1515         return 0;
1516 }
1517
1518 int cmd_finish(unsigned long nr, int fail)
1519 {
1520         struct xseg_request *req;
1521         char *buf = malloc(sizeof(char) * 8128);
1522         char *req_target, *req_data;
1523         xseg_bind_port(xseg, srcport, NULL);
1524         xport p;
1525
1526         for (; nr--;) {
1527                 xseg_prepare_wait(xseg, srcport);
1528                 req = xseg_accept(xseg, srcport, 0);
1529                 if (req) {
1530                         req_target = xseg_get_target(xseg, req);
1531                         req_data = xseg_get_data(xseg, req);
1532                         xseg_cancel_wait(xseg, srcport);
1533                         if (fail == 1)
1534                                 req->state &= ~XS_SERVED;
1535                         else {
1536                                 if (req->op == X_READ)
1537                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1538                                 else if (req->op == X_WRITE) 
1539                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1540                                 else if (req->op == X_INFO)
1541                                         *((uint64_t *) req->data) = 4294967296;
1542                                 
1543                                 req->state |= XS_SERVED;
1544                                 req->serviced = req->size;
1545                         }
1546
1547                         p = xseg_respond(xseg, req, srcport, X_ALLOC);
1548                         xseg_signal(xseg, p);
1549                         continue;
1550                 }
1551                 ++nr;
1552                 xseg_wait_signal(xseg, 10000000L);
1553         }
1554
1555         free(buf);
1556
1557         return 0;
1558 }
1559
1560 void handle_reply(struct xseg_request *req)
1561 {
1562         char *req_data = xseg_get_data(xseg, req);
1563         char *req_target = xseg_get_target(xseg, req);
1564         if (!(req->state & XS_SERVED)) {
1565                 report_request(req);
1566                 goto put;
1567         }
1568
1569         switch (req->op) {
1570         case X_READ:
1571                 fwrite(req_data, 1, req->datalen, stdout);
1572                 break;
1573
1574         case X_WRITE:
1575                 fprintf(stdout, "wrote: ");
1576                 fwrite(req_data, 1, req->datalen, stdout);
1577                 break;
1578         case X_SYNC:
1579         case X_DELETE:
1580                 fprintf(stderr, "deleted %s\n", req_target);
1581                 break;
1582         case X_TRUNCATE:
1583         case X_COMMIT:
1584         case X_CLONE:
1585                 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1586                 break;
1587         case X_INFO:
1588                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1589                 break;
1590         case X_COPY:
1591                 fprintf(stderr, "copied %s\n", ((struct xseg_request_copy *)req_data)->target);
1592                 break;
1593         case X_CLOSE:
1594                 fprintf(stderr, "Closed %s\n", req_target);
1595         case X_OPEN:
1596                 fprintf(stderr, "Opened %s\n", req_target);
1597
1598         default:
1599                 break;
1600         }
1601
1602 put:
1603         if (xseg_put_request(xseg, req, srcport))
1604                 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1605 }
1606
1607 int cmd_wait(uint32_t nr)
1608 {
1609         struct xseg_request *req;
1610         long ret;
1611         init_local_signal(); 
1612
1613         for (;;) {
1614                 req = xseg_receive(xseg, srcport, 0);
1615                 if (req) {
1616                         handle_reply(req);
1617                         nr--;
1618                         if (nr == 0)
1619                                 break;
1620                         continue;
1621                 }
1622
1623                 ret = xseg_prepare_wait(xseg, srcport);
1624                 if (ret)
1625                         return -1;
1626
1627                 ret = xseg_wait_signal(xseg, 1000000);
1628                 ret = xseg_cancel_wait(xseg, srcport);
1629                 if (ret)
1630                         return -1;
1631         }
1632
1633         return 0;
1634 }
1635
1636 int cmd_put_replies(void)
1637 {
1638         struct xseg_request *req;
1639
1640         for (;;) {
1641                 req = xseg_receive(xseg, dstport, 0);
1642                 if (!req)
1643                         break;
1644                 fprintf(stderr, "request: %08llx%08llx\n"
1645                         "     op: %u\n"
1646                         "  state: %u\n",
1647                         0LL, (unsigned long long)req->serial,
1648                         req->op,
1649                         req->state);
1650                 report_request(req);
1651
1652                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1653
1654                 if (xseg_put_request(xseg, req, srcport))
1655                         fprintf(stderr, "Cannot put reply\n");
1656         }
1657
1658         return 0;
1659 }
1660
1661 int cmd_bind(long portno)
1662 {
1663         struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1664         if (!port) {
1665                 fprintf(stderr, "failed to bind port %ld\n", portno);
1666                 return 1;
1667         }
1668
1669         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1670         return 0;
1671 }
1672
1673 int cmd_signal(uint32_t portno)
1674 {
1675         return xseg_signal(xseg, portno);
1676 }
1677
1678 int parse_ports(char *str)
1679 {
1680         int ret = 0;
1681         char *s = str;
1682
1683         for (;;) {
1684                 if (*s == 0)
1685                         return 0;
1686
1687                 if (*s == ':') {
1688                         *s = 0;
1689                         if ((s > str) && isdigit(str[0])) {
1690                                 srcport = atol(str);
1691                                 ret ++;
1692                         }
1693                         break;
1694                 }
1695                 s ++;
1696         }
1697
1698         s += 1;
1699         str = s;
1700
1701         for (;;) {
1702                 if (*s == 0) {
1703                         if ((s > str) && isdigit(str[0])) {
1704                                 dstport = atol(str);
1705                                 ret ++;
1706                         }
1707                         break;
1708                 }
1709                 s ++;
1710         }
1711
1712         return ret;
1713 }
1714
1715 int main(int argc, char **argv)
1716 {
1717         int i, ret = 0;
1718         char *spec;
1719
1720         if (argc < 3)
1721                 return help();
1722
1723         srcport = -1;
1724         dstport = -1;
1725         spec = argv[1];
1726
1727         if (xseg_parse_spec(spec, &cfg)) {
1728                 fprintf(stderr, "Cannot parse spec\n");
1729                 return -1;
1730         }
1731
1732         if (xseg_initialize()) {
1733                 fprintf(stderr, "cannot initialize!\n");
1734                 return -1;
1735         }
1736
1737         for (i = 2; i < argc; i++) {
1738
1739                 if (!strcmp(argv[i], "create")) {
1740                         ret = cmd_create();
1741                         continue;
1742                 }
1743
1744                 if (!strcmp(argv[i], "join")) {
1745                         ret = cmd_join();
1746                         if (!ret)
1747                                 fprintf(stderr, "Segment joined.\n");
1748                         continue;
1749                 }
1750
1751                 if (!strcmp(argv[i], "destroy")) {
1752                         ret = cmd_destroy();
1753                         continue;
1754                 }
1755
1756                 if (cmd_join())
1757                         return -1;
1758
1759                 if (!strcmp(argv[i], "reportall")) {
1760                         ret = cmd_reportall();
1761                         continue;
1762                 }
1763
1764                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1765                         ret = cmd_bind(atol(argv[i+1]));
1766                         i += 1;
1767                         continue;
1768                 }
1769
1770                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1771                         ret = cmd_signal(atol(argv[i+1]));
1772                         i += 1;
1773                         continue;
1774                 }
1775
1776                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1777                         ret = cmd_bridge(atol(argv[i+1]),
1778                                          atol(argv[i+2]),
1779                                          argv[i+3],
1780                                          argv[i+4]);
1781                         i += 4;
1782                         continue;
1783                 }
1784
1785                 if (srcport == -1) {
1786                         if (!parse_ports(argv[i]))
1787                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1788                         continue;
1789                 }
1790
1791                 if (dstport == -1) {
1792                         if (!parse_ports(argv[i]))
1793                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1794                         continue;
1795                 }
1796
1797                 if (!strcmp(argv[i], "verify")) {
1798                         ret = cmd_verify(0);
1799                         continue;
1800                 }
1801
1802                 if (!strcmp(argv[i], "verify-fix")) {
1803                         ret = cmd_verify(1);
1804                         continue;
1805                 }
1806
1807                 if (!strcmp(argv[i], "failreq") && (i + 1 < argc)) {
1808                         struct xseg_request *req;
1809                         sscanf(argv[i+1], "%lx", &req);
1810                         ret = cmd_request(req, FAIL);
1811                         i += 1;
1812                         continue;
1813                 }
1814
1815                 if (!strcmp(argv[i], "inspect-freeq") && (i + 1 < argc)) {
1816                         ret = cmd_inspectq(atol(argv[i+1]), FREE_QUEUE);
1817                         i += 1;
1818                         continue;
1819                 }
1820
1821                 if (!strcmp(argv[i], "inspect-requestq") && (i + 1 < argc)) {
1822                         ret = cmd_inspectq(atol(argv[i+1]), REQUEST_QUEUE);
1823                         i += 1;
1824                         continue;
1825                 }
1826
1827                 if (!strcmp(argv[i], "inspect-replyq") && (i + 1 < argc)) {
1828                         ret = cmd_inspectq(atol(argv[i+1]), REPLY_QUEUE);
1829                         i += 1;
1830                         continue;
1831                 }
1832
1833                 if (!strcmp(argv[i], "report")) {
1834                         ret = cmd_report(dstport);
1835                         continue;
1836                 }
1837
1838                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1839                         ret = cmd_alloc_requests(atol(argv[i+1]));
1840                         i += 1;
1841                         continue;
1842                 }
1843
1844                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1845                         ret = cmd_free_requests(atol(argv[i+1]));
1846                         i += 1;
1847                         continue;
1848                 }
1849
1850                 if (!strcmp(argv[i], "put_requests")) {
1851                         ret = cmd_put_requests();
1852                         continue;
1853                 }
1854
1855                 if (!strcmp(argv[i], "put_replies")) {
1856                         ret = cmd_put_replies();
1857                         continue;
1858                 }
1859
1860                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1861                         ret = cmd_finish(atol(argv[i+1]), 0);
1862                         i += 1;
1863                         continue;
1864                 }
1865
1866                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1867                         ret = cmd_finish(atol(argv[i+1]), 1);
1868                         i += 1;
1869                         continue;
1870                 }
1871
1872                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1873                         ret = cmd_wait(atol(argv[i+1]));
1874                         i += 1;
1875                         continue;
1876                 }
1877
1878                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1879                         long nr_loops = atol(argv[i+1]);
1880                         unsigned int seed = atoi(argv[i+2]);
1881                         unsigned int targetlen = atoi(argv[i+3]);
1882                         unsigned int chunksize = atoi(argv[i+4]);
1883                         unsigned long objectsize = atol(argv[i+5]);
1884                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1885                         i += 5;
1886                         continue;
1887                 }
1888                 
1889                 if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1890                         long nr_loops = atol(argv[i+1]);
1891                         unsigned int seed = atoi(argv[i+2]);
1892                         unsigned int targetlen = atoi(argv[i+3]);
1893                         ret = cmd_rnddelete(nr_loops, seed, targetlen);
1894                         i += 3;
1895                         continue;
1896                 }
1897
1898                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1899                         long nr_loops = atol(argv[i+1]);
1900                         unsigned int seed = atoi(argv[i+2]);
1901                         unsigned int targetlen = atoi(argv[i+3]);
1902                         unsigned int chunksize = atoi(argv[i+4]);
1903                         unsigned long objectsize = atol(argv[i+5]);
1904                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1905                         i += 5;
1906                         continue;
1907                 }
1908
1909                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1910                         long nr_loops = atol(argv[i+1]);
1911                         long concurrent_reqs = atol(argv[i+2]);
1912                         int op = atoi(argv[i+3]);
1913                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1914                         i += 3;
1915                         continue;
1916                 }
1917
1918                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1919                         char *target = argv[i+1];
1920                         uint64_t offset = atol(argv[i+2]);
1921                         uint64_t size   = atol(argv[i+3]);
1922                         ret = cmd_read(target, offset, size);
1923                         i += 3;
1924                         continue;
1925                 }
1926
1927                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1928                         char *target = argv[i+1];
1929                         uint64_t offset = atol(argv[i+2]);
1930                         ret = cmd_write(target, offset);
1931                         i += 2;
1932                         continue;
1933                 }
1934
1935                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1936                         char *target = argv[i+1];
1937                         uint64_t offset = atol(argv[i+2]);
1938                         ret = cmd_truncate(target, offset);
1939                         i += 2;
1940                         continue;
1941                 }
1942
1943                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1944                         char *target = argv[i+1];
1945                         ret = cmd_delete(target);
1946                         i += 1;
1947                         continue;
1948                 }
1949
1950                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1951                         char *target = argv[i+1];
1952                         ret = cmd_acquire(target);
1953                         i += 1;
1954                         continue;
1955                 }
1956
1957                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1958                         char *target = argv[i+1];
1959                         ret = cmd_release(target);
1960                         i += 1;
1961                         continue;
1962                 }
1963
1964                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1965                         char *src = argv[i+1];
1966                         char *dst = argv[i+2];
1967                         ret = cmd_copy(src, dst);
1968                         i += 2;
1969                         continue;
1970                 }
1971
1972                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1973                         char *src = argv[i+1];
1974                         char *dst = argv[i+2];
1975                         ret = cmd_clone(src, dst);
1976                         i += 2;
1977                         continue;
1978                 }
1979
1980                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1981                         char *target = argv[i+1];
1982                         ret = cmd_info(target);
1983                         i += 1;
1984                         continue;
1985                 }
1986
1987
1988                 if (!parse_ports(argv[i]))
1989                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
1990         }
1991
1992         /* xseg_leave(); */
1993         return ret;
1994 }