Fix licence headers in multiple files
[archipelago] / xseg / peers / user / xseg-tool.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <stdint.h>
40 #include <ctype.h>
41 #include <sys/stat.h>
42 #include <sys/types.h>
43 #include <fcntl.h>
44 #include <unistd.h>
45
46 #include <xtypes/xhash.h>
47 #include <xtypes/xobj.h>
48 #include <xseg/xseg.h>
49 #include <xseg/protocol.h>
50 int help(void)
51 {
52         printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
53                 "spec:\n"
54                 "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
55                 "global commands:\n"
56                 "    reportall\n"
57                 "    create\n"
58                 "    destroy\n"
59                 "    bind <portno>\n"
60                 "    signal <portno>\n"
61                 "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
62                 "port commands:\n"
63                 "    report\n"
64                 "    alloc_requests (to source) <nr>\n"
65                 "    free_requests (from source) <nr>\n"
66                 "    put_requests (all from dest)\n"
67                 "    put_replies (all from dest)\n"
68                 "    wait        <nr_replies>\n"
69                 "    complete    <nr_requests>\n"
70                 "    fail        <nr_requests>\n"
71                 "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
72                 "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
73                 "    submit_reqs <nr_loops> <concurrent_reqs>\n"
74                 "    info        <target>\n"
75                 "    read        <target> <offset> <size>\n"
76                 "    write       <target> <offset> < data\n"
77                 "    truncate    <target> <size>\n"
78                 "    delete      <target>\n"
79                 "    acquire     <target>\n"
80                 "    release     <target>\n"
81                 "    copy        <src>  <dst>\n"
82                 "    clone       <src>  <dst>\n"
83         );
84         return 1;
85 }
86
87
88 enum req_action {
89         REPORT = 1,
90         FAIL = 2,
91         COMPLETE = 3
92 };
93
94 enum queue {
95         FREE_QUEUE = 0,
96         REQUEST_QUEUE = 1,
97         REPLY_QUEUE = 2
98 };
99
100 char *namebuf;
101 char *chunk;
102 struct xseg_config cfg;
103 struct xseg *xseg;
104 uint32_t srcport, dstport;
105 uint64_t reqs;
106 #define mkname mkname_heavy
107 /* heavy distributes duplicates much more widely than light
108  * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
109  */
110
111 xport sport = NoPort;
112 static void init_local_signal() 
113 {
114         if (xseg && sport != srcport){
115                 xseg_init_local_signal(xseg, srcport);
116                 sport = srcport;
117         }
118 }
119
120 void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
121 {
122         int i;
123         char c;
124         for (i = 0; i < namelen; i += 1) {
125                 c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
126                 c = '0' + ((c + (c >> 4)) & 0xf);
127                 if (c > '9')
128                         c += 'a'-'0'-10;
129                 name[i] = c;
130                 seed *= ((seed % 137911) | 1) * 137911;
131         }
132 }
133
134 void mkname_light(char *name, uint32_t namelen, uint32_t seed)
135 {
136         int i;
137         char c;
138         for (i = 0; i < namelen; i += 1) {
139                 c = seed;
140                 name[i] = 'A' + (c & 0xf);
141                 seed += 1;
142         }
143 }
144
145 uint64_t pick(uint64_t size)
146 {
147         return (uint64_t)((double)(RAND_MAX) / random());
148 }
149
150 void mkchunk(   char *chunk, uint32_t datalen,
151                 char *target, uint32_t targetlen, uint64_t offset)
152 {
153         long i, r, bufsize = targetlen + 16;
154         char buf[bufsize];
155         r = datalen % bufsize;
156         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
157
158         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
159                 memcpy(chunk + i, buf, bufsize);
160
161         memcpy(chunk + datalen - r, buf, r);
162 }
163
164 int chkchunk(   char *chunk, uint32_t datalen,
165                 char *target, uint32_t targetlen, uint64_t offset)
166 {
167         long i, r;
168         int bufsize = targetlen + 16;
169         char buf[bufsize];
170         r = datalen % targetlen;
171         snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
172
173         for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
174                 if (memcmp(chunk + i, buf, bufsize)) {
175                         /*printf("mismatch: '%*s'* vs '%*s'\n",
176                                 bufsize, buf, datalen, chunk);
177                         */
178                         return 0;
179                 }
180
181         if (memcmp(chunk + datalen - r, buf, r))
182                 return 0;
183
184         return 1;
185 }
186
187
188 #define ALLOC_MIN 4096
189 #define ALLOC_MAX 1048576
190
191 void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
192 {
193         static uint64_t alloc_size;
194         static char *buf;
195         uint64_t size = 0;
196         char *p;
197         size_t r;
198
199         if (alloc_size < ALLOC_MIN)
200                 alloc_size = ALLOC_MIN;
201
202         if (alloc_size > ALLOC_MAX)
203                 alloc_size = ALLOC_MAX;
204
205         p = realloc(buf, alloc_size);
206         if (!p) {
207                 if (buf)
208                         free(buf);
209                 buf = NULL;
210                 goto out;
211         }
212
213         buf = p;
214
215         while (!feof(fp)) {
216                 r = fread(buf + size, 1, alloc_size - size, fp);
217                 if (!r)
218                         break;
219                 size += r;
220                 if (size >= alloc_size) {
221                         p = realloc(buf, alloc_size * 2);
222                         if (!p) {
223                                 if (buf)
224                                         free(buf);
225                                 buf = NULL;
226                                 size = 0;
227                                 goto out;
228                         }
229                         buf = p;
230                         alloc_size *= 2;
231                 }
232         }
233
234 out:
235         *retbuf = buf;
236         *retsize = size;
237 }
238
239 void report_request(struct xseg_request *req)
240 {
241         char target[64], data[64];
242         char *req_target, *req_data;
243         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
244         req_target = xseg_get_target(xseg, req);
245         req_data = xseg_get_data(xseg, req);
246
247         strncpy(target, req_target, end);
248         target[end] = 0;
249         strncpy(data, req_data, 63);
250         data[63] = 0;
251         fprintf(stderr,
252                 "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
253                 "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
254                 "src: %u, transit: %u, dst: %u, effective dst: %u\n",
255                 (unsigned long) req, req->targetlen, (unsigned long long)req->target,
256                 target,
257                 (unsigned long long) req->datalen, (unsigned long long) req->data,
258                 data,
259                 (unsigned long long) req->offset, (unsigned long long) req->size,
260                 (unsigned long long) req->serviced, req->op, req->state, req->flags,
261                 (unsigned int) req->src_portno, (unsigned int) req->transit_portno,
262                 (unsigned int) req->dst_portno, (unsigned int) req->effective_dst_portno);
263
264
265 }
266
267 int cmd_info(char *target)
268 {
269         uint32_t targetlen = strlen(target);
270         size_t size = sizeof(uint64_t);
271         int r;
272         xport p;
273         struct xseg_request *req;
274         char *req_target;
275
276         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
277         if (!req) {
278                 fprintf(stderr, "No request!\n");
279                 return -1;
280         }
281
282         r = xseg_prep_request(xseg, req, targetlen, size);
283         if (r < 0) {
284                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
285                         (unsigned long) targetlen, (unsigned long) size);
286                 xseg_put_request(xseg, req, srcport);
287                 return -1;
288         }
289
290         req_target = xseg_get_target(xseg, req);
291         strncpy(req_target, target, targetlen);
292         req->offset = 0;
293         req->size = size;
294         req->op = X_INFO;
295
296         p = xseg_submit(xseg, req, srcport, X_ALLOC);
297         if (p == NoPort)
298                 return -1;
299
300         xseg_signal(xseg, p);
301
302         return 0;
303 }
304
305 int cmd_read(char *target, uint64_t offset, uint64_t size)
306 {
307         uint32_t targetlen = strlen(target);
308         int r;
309         xport p;
310         char *req_target;
311         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
312         if (!req) {
313                 fprintf(stderr, "No request\n");
314                 return -1;
315         }
316
317         r = xseg_prep_request(xseg, req, targetlen, size);
318         if (r < 0) {
319                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
320                         (unsigned long)targetlen, (unsigned long long)size);
321                 xseg_put_request(xseg, req, srcport);
322                 return -1;
323         }
324
325         req_target = xseg_get_target(xseg, req);
326         strncpy(req_target, target, targetlen);
327         req->offset = offset;
328         req->size = size;
329         req->op = X_READ;
330         report_request(req);
331         p = xseg_submit(xseg, req, srcport, X_ALLOC);
332         if (p == NoPort)
333                 return -1;
334
335         xseg_signal(xseg, p);
336         return 0;
337 }
338
339 int cmd_write(char *target, uint64_t offset)
340 {
341         char *buf = NULL;
342         int r;
343         xport p;
344         uint64_t size = 0;
345         char *req_target, *req_data;
346         uint32_t targetlen = strlen(target);
347         struct xseg_request *req;
348
349         inputbuf(stdin, &buf, &size);
350         if (!size) {
351                 fprintf(stderr, "No input\n");
352                 return -1;
353         }
354
355         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
356         if (!req) {
357                 fprintf(stderr, "No request\n");
358                 return -1;
359         }
360
361         r = xseg_prep_request(xseg, req, targetlen, size);
362         if (r < 0) {
363                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
364                         (unsigned long)targetlen, (unsigned long long)size);
365                 xseg_put_request(xseg, req, srcport);
366                 return -1;
367         }
368
369         req_target = xseg_get_target(xseg, req);
370         strncpy(req_target, target, targetlen);
371         
372         req_data = xseg_get_data(xseg, req);
373         memcpy(req_data, buf, size);
374         req->offset = offset;
375         req->size = size;
376         req->op = X_WRITE;
377
378         p = xseg_submit(xseg, req, srcport, X_ALLOC);
379         if (p == NoPort) {
380                 fprintf(stderr, "Cannot submit\n");
381                 return -1;
382         }
383         xseg_signal(xseg, p);
384
385         return 0;
386 }
387
388 int cmd_truncate(char *target, uint64_t offset)
389 {
390         return 0;
391 }
392
393 int cmd_delete(char *target)
394 {
395         uint32_t targetlen = strlen(target);
396         int r;
397         struct xseg_request *req;
398         init_local_signal();
399         xseg_bind_port(xseg, srcport, NULL);
400
401         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
402         if (!req) {
403                 fprintf(stderr, "No request!\n");
404                 return -1;
405         }
406
407         r = xseg_prep_request(xseg, req, targetlen, 0);
408         if (r < 0) {
409                 fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
410                         (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
411                 xseg_put_request(xseg, req, srcport);
412                 return -1;
413         }
414
415         char *reqtarget = xseg_get_target(xseg, req);
416         strncpy(reqtarget, target, targetlen);
417         req->op = X_DELETE;
418
419         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
420         if (p == NoPort){
421                 fprintf(stderr, "Couldn't submit request\n");
422                 xseg_put_request(xseg, req, srcport);
423                 return -1;
424         }
425
426         xseg_signal(xseg, p);
427
428         return 0;
429 }
430
431 int cmd_acquire(char *target)
432 {
433         uint32_t targetlen = strlen(target);
434         int r;
435         xport p;
436         char *req_target;
437         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
438         if (!req) {
439                 fprintf(stderr, "No request\n");
440                 return -1;
441         }
442
443         r = xseg_prep_request(xseg, req, targetlen, 0);
444         if (r < 0) {
445                 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
446                         (unsigned long)targetlen);
447                 xseg_put_request(xseg, req, srcport);
448                 return -1;
449         }
450
451         req_target = xseg_get_target(xseg, req);
452         strncpy(req_target, target, targetlen);
453         req->offset = 0;
454         req->size = 0;
455         req->op = X_OPEN;
456         p = xseg_submit(xseg, req, srcport, X_ALLOC);
457         if (p == NoPort)
458                 return -1;
459
460         xseg_signal(xseg, p);
461         return 0;
462 }
463
464 int cmd_release(char *target)
465 {
466         uint32_t targetlen = strlen(target);
467         int r;
468         xport p;
469         char *req_target;
470         struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
471         if (!req) {
472                 fprintf(stderr, "No request\n");
473                 return -1;
474         }
475
476         r = xseg_prep_request(xseg, req, targetlen, 0);
477         if (r < 0) {
478                 fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
479                         (unsigned long)targetlen);
480                 xseg_put_request(xseg, req, srcport);
481                 return -1;
482         }
483
484         req_target = xseg_get_target(xseg, req);
485         strncpy(req_target, target, targetlen);
486         req->offset = 0;
487         req->size = 0;
488         req->op = X_CLOSE;
489         req->flags = XF_FORCE;
490         p = xseg_submit(xseg, req, srcport, X_ALLOC);
491         if (p == NoPort)
492                 return -1;
493
494         xseg_signal(xseg, p);
495         return 0;
496         return 0;
497 }
498
499 int cmd_copy(char *src, char *dst)
500 {
501         uint32_t targetlen = strlen(dst);
502         uint32_t parentlen = strlen(src);
503         struct xseg_request *req;
504         struct xseg_request_copy *xcopy;
505         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
506         if (!req) {
507                 fprintf(stderr, "No request\n");
508                 return -1;
509         }
510
511         int r = xseg_prep_request(xseg, req, targetlen,
512                         sizeof(struct xseg_request_copy));
513         if (r < 0) {
514                 fprintf(stderr, "Cannot prepare request!\n");
515                 xseg_put_request(xseg, req, srcport);
516                 return -1;
517         }
518
519         char *target = xseg_get_target(xseg, req);
520         char *data = xseg_get_data(xseg, req);
521
522         strncpy(target, dst, targetlen);
523         xcopy = (struct xseg_request_copy *) data;
524         strncpy(xcopy->target, src, parentlen);
525         xcopy->targetlen = parentlen;
526         req->offset = 0;
527         req->size = sizeof(struct xseg_request_copy);
528         req->op = X_COPY;
529
530         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
531         if (p == NoPort){
532                 fprintf(stderr, "Cannot submit request\n");
533                 return -1;
534         }
535         xseg_signal(xseg, p);
536
537         return 0;
538         return 0;
539 }
540
541 int cmd_clone(char *src, char *dst)
542 {
543
544         uint32_t targetlen = strlen(dst);
545         uint32_t parentlen = strlen(src);
546         struct xseg_request *req;
547         struct xseg_request_clone *xclone;
548         xseg_bind_port(xseg, srcport, NULL);
549         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
550         if (!req) {
551                 fprintf(stderr, "No request\n");
552                 return -1;
553         }
554
555         int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
556         if (r < 0) {
557                 fprintf(stderr, "Cannot prepare request!\n");
558                 xseg_put_request(xseg, req, srcport);
559                 return -1;
560         }
561
562         char *target = xseg_get_target(xseg, req);
563         char *data = xseg_get_data(xseg, req);
564
565         strncpy(target, dst, targetlen);
566         xclone = (struct xseg_request_clone *) data;
567         strncpy(xclone->target, src, parentlen);
568         xclone->targetlen = parentlen;
569         xclone->size = -1;
570         req->offset = 0;
571         req->size = sizeof(struct xseg_request_clone);
572         req->op = X_CLONE;
573
574         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
575         if (p == NoPort){
576                 fprintf(stderr, "Cannot submit request\n");
577                 return -1;
578         }
579         xseg_signal(xseg, p);
580
581         return 0;
582 }
583
584 int cmd_snapshot(char *src, char *dst, long block_size)
585 {
586
587         uint32_t targetlen = strlen(src);
588         uint32_t parentlen = strlen(dst);
589         struct xseg_request *req;
590         struct xseg_request_snapshot *xsnapshot;
591         xseg_bind_port(xseg, srcport, NULL);
592         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
593         if (!req) {
594                 fprintf(stderr, "No request\n");
595                 return -1;
596         }
597
598         int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_snapshot));
599         if (r < 0) {
600                 fprintf(stderr, "Cannot prepare request!\n");
601                 xseg_put_request(xseg, req, srcport);
602                 return -1;
603         }
604
605         char *target = xseg_get_target(xseg, req);
606         char *data = xseg_get_data(xseg, req);
607
608         strncpy(target, src, targetlen);
609         xsnapshot = (struct xseg_request_snapshot *) data;
610         strncpy(xsnapshot->target, dst, parentlen);
611         xsnapshot->targetlen = parentlen;
612         req->offset = 0;
613         req->size = (uint64_t) block_size;
614         req->op = X_SNAPSHOT;
615
616         xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
617         if (p == NoPort){
618                 fprintf(stderr, "Cannot submit request\n");
619                 return -1;
620         }
621         xseg_signal(xseg, p);
622
623         return 0;
624 }
625
626 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
627                 struct xseg_request *req)
628 {
629         FILE *logfp;
630         char target[64], data[64];
631         char *req_target, *req_data;
632         /* null terminate name in case of req->target is less than 63 characters,
633          * and next character after name (aka first byte of next buffer) is not
634          * null
635          */
636         unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
637         
638         req_target = xseg_get_target(xseg, req);
639         req_data = xseg_get_data(xseg, req);
640
641         logfp = fdopen(logfd, "a");
642         if (!logfp)
643                 return;
644
645         switch(method) {
646         case 0:
647                 strncpy(target, req_target, end);
648                 target[end] = 0;
649                 strncpy(data, req_data, 63);
650                 data[63] = 0;
651
652                 fprintf(logfp,
653                         "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
654                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
655                         (unsigned int)portno1,
656                         (unsigned int)portno2,
657                         (unsigned int)req->op,
658                         (unsigned long long)req->offset,
659                         (unsigned long)req->size,
660                         (unsigned int)req->state,
661                         (unsigned int)req->targetlen, target,
662                         (unsigned long long)req->datalen, data);
663                 break;
664         case 1:
665                 fprintf(logfp,
666                         "src port: %u, dst port: %u, op: %u\n",
667                         (unsigned int)portno1,
668                         (unsigned int)portno2,
669                         (unsigned int)req->op);
670                 break;
671         case 2:
672                 fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
673                         (unsigned int)portno1,
674                         (unsigned int)portno2,
675                         (unsigned long long)++reqs);
676         }
677
678         fclose(logfp);
679         return;
680 }
681
682 #define LOG_ACCEPT  0
683 #define LOG_RECEIVE 1
684
685 int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
686 {
687         struct xseg_request *req;
688         int logfd, method;
689         if (!strcmp(logfile, "-"))
690                 logfd = 1;
691         else {
692                 logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
693                 if (logfd < 0) {
694                         perror(logfile);
695                         return -1;
696                 }
697         }
698
699         if (!strcmp(how, "full"))
700                 method = 0;
701         else if (!strcmp(how, "summary"))
702                 method = 1;
703         else
704                 method = 2;
705
706         for (;;) {
707                 int reloop = 0, active;
708                 xseg_prepare_wait(xseg, portno1);
709                 xseg_prepare_wait(xseg, portno2);
710                 req = NULL;
711
712                 for (;;) {
713                         active = 0;
714
715                         //FIXME
716                         req = xseg_accept(xseg, portno1, 0);
717                         if (req) {
718                                 xseg_submit(xseg, req, portno2, X_ALLOC);
719                                 log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
720                                 active += 1;
721                         }
722
723                         req = xseg_accept(xseg, portno2, 0);
724                         if (req) {
725                                 xseg_submit(xseg, req, portno1, X_ALLOC);
726                                 log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
727                                 active += 1;
728                         }
729
730                         req = xseg_receive(xseg, portno1, 0);
731                         if (req) {
732                                 xseg_respond(xseg, req, portno2, X_ALLOC);
733                                 log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
734                                 active += 1;
735                         }
736
737                         req = xseg_receive(xseg, portno2, 0);
738                         if (req) {
739                                 xseg_respond(xseg, req, portno1, X_ALLOC);
740                                 log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
741                                 active += 1;
742                         }
743
744                         if (active == 0) {
745                                 if (reloop)
746                                         break;
747                                 /* wait on multiple queues? */
748                                 xseg_wait_signal(xseg, 100000);
749                                 break;
750                         } else {
751                                 xseg_cancel_wait(xseg, portno1);        
752                                 xseg_cancel_wait(xseg, portno2);        
753                                 reloop = 1;
754                         }
755                 }
756         }
757
758         close(logfd);
759
760         return 0;
761 }
762
763 int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
764 {
765         if (loops < 0)
766                 return help();
767
768         if (targetlen >= chunksize) {
769                 fprintf(stderr, "targetlen >= chunksize\n");
770                 return -1;
771         }
772
773         char *p = realloc(namebuf, targetlen+1);
774         if (!p) {
775                 fprintf(stderr, "Cannot allocate memory\n");
776                 return -1;
777         }
778         namebuf = p;
779
780         p = realloc(chunk, chunksize);
781         if (!p) {
782                 fprintf(stderr, "Cannot allocate memory\n");
783                 return -1;
784         }
785         chunk = p;
786         memset(chunk, 0, chunksize);
787
788         srandom(seed);
789
790         struct xseg_request *submitted = NULL, *received;
791         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
792         int reported = 0, r;
793         uint64_t offset;
794         xport port;
795         char *req_data, *req_target;
796         seed = random();
797         init_local_signal();
798
799         for (;;) {
800                 xseg_prepare_wait(xseg, srcport);
801                 if (nr_submitted < loops &&
802                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
803                         xseg_cancel_wait(xseg, srcport);
804                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
805                         if (r < 0) {
806                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
807                                         targetlen, chunksize);
808                                 xseg_put_request(xseg, submitted, srcport);
809                                 return -1;
810                         }
811                         
812                         req_target = xseg_get_target(xseg, submitted);
813                         req_data = xseg_get_data(xseg, submitted);
814
815                         reported = 0;
816                         mkname(namebuf, targetlen, seed);
817                         namebuf[targetlen] = 0;
818                         //printf("%ld: %s\n", nr_submitted, namebuf);
819                         strncpy(req_target, namebuf, targetlen);
820                         offset = 0;// pick(size);
821                         mkchunk(req_data, chunksize, namebuf, targetlen, offset);
822
823                         submitted->offset = offset;
824                         submitted->size = chunksize;
825                         submitted->op = X_WRITE;
826                         submitted->flags |= XF_NOSYNC;
827
828                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
829                         if (port == NoPort) {
830                                 xseg_put_request(xseg, submitted, srcport);
831                         } else {
832                                 seed = random();
833                                 nr_submitted += 1;
834                                 xseg_signal(xseg, port);
835                         }
836                 }
837
838                 received = xseg_receive(xseg, srcport, 0);
839                 if (received) {
840                         xseg_cancel_wait(xseg, srcport);
841                         nr_received += 1;
842                         if (!(received->state & XS_SERVED)) {
843                                 nr_failed += 1;
844                                 report_request(received);
845                         }
846                         if (xseg_put_request(xseg, received, srcport))
847                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
848                 }
849
850                 if (!submitted && !received)
851                         xseg_wait_signal(xseg, 1000000);
852
853                         if (nr_submitted % 1000 == 0 && !reported) {
854                                 reported = 1;
855                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
856                                         nr_submitted, nr_received, nr_failed);
857                         }
858
859                         if (nr_received >= loops)
860                                 break;
861         }
862
863         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
864                 nr_submitted, nr_received, nr_failed);
865         return 0;
866 }
867
868 int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
869 {
870         if (loops < 0)
871                 return help();
872         char *p = realloc(namebuf, targetlen+1);
873         if (!p) {
874                 fprintf(stderr, "Cannot allocate memory\n");
875                 return -1;
876         }
877         namebuf = p;
878
879         srandom(seed);
880
881         struct xseg_request *submitted = NULL, *received;
882         long nr_submitted = 0, nr_received = 0, nr_failed = 0;
883         int reported = 0, r;
884         xport port;
885         char *req_target;
886         seed = random();
887         init_local_signal();
888
889         for (;;) {
890                 xseg_prepare_wait(xseg, srcport);
891                 if (nr_submitted < loops &&
892                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
893                         xseg_cancel_wait(xseg, srcport);
894                         r = xseg_prep_request(xseg, submitted, targetlen, 0);
895                         if (r < 0) {
896                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
897                                         targetlen, 0);
898                                 xseg_put_request(xseg, submitted, srcport);
899                                 return -1;
900                         }
901                         
902                         req_target = xseg_get_target(xseg, submitted);
903
904                         reported = 0;
905                         mkname(namebuf, targetlen, seed);
906                         namebuf[targetlen] = 0;
907                         //printf("%ld: %s\n", nr_submitted, namebuf);
908                         strncpy(req_target, namebuf, targetlen);
909                         submitted->offset = 0;
910                         submitted->size = 0;
911                         submitted->op = X_DELETE;
912                         submitted->flags = 0;
913
914                         port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
915                         if (port == NoPort) {
916                                 xseg_put_request(xseg, submitted, srcport);
917                         } else {
918                                 seed = random();
919                                 nr_submitted += 1;
920                                 xseg_signal(xseg, port);
921                         }
922                 }
923
924                 received = xseg_receive(xseg, srcport, 0);
925                 if (received) {
926                         xseg_cancel_wait(xseg, srcport);
927                         nr_received += 1;
928                         if (!(received->state & XS_SERVED)) {
929                                 nr_failed += 1;
930                                 report_request(received);
931                         }
932                         if (xseg_put_request(xseg, received, srcport))
933                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
934                 }
935
936                 if (!submitted && !received)
937                         xseg_wait_signal(xseg, 1000000);
938
939                         if (nr_submitted % 1000 == 0 && !reported) {
940                                 reported = 1;
941                                 fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
942                                         nr_submitted, nr_received, nr_failed);
943                         }
944
945                         if (nr_received >= loops)
946                                 break;
947         }
948
949         fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
950                 nr_submitted, nr_received, nr_failed);
951         return 0;
952 }
953 /* note:
954  * prepare/wait rhythm,
955  * files are converted to independent chunk access patterns,
956 */
957
958 int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
959 {
960         if (loops < 0)
961                 return help();
962
963         if (targetlen >= chunksize) {
964                 fprintf(stderr, "targetlen >= chunksize\n");
965                 return -1;
966         }
967
968         char *p = realloc(namebuf, targetlen+1);
969         if (!p) {
970                 fprintf(stderr, "Cannot allocate memory\n");
971                 return -1;
972         }
973         namebuf = p;
974
975         p = realloc(chunk, chunksize);
976         if (!p) {
977                 fprintf(stderr, "Cannot allocate memory\n");
978                 return -1;
979         }
980         chunk = p;
981         memset(chunk, 0, chunksize);
982
983         srandom(seed);
984
985         struct xseg_request *submitted = NULL, *received;
986         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
987         int reported = 0, r;
988         uint64_t offset;
989         xport port;
990         char *req_data, *req_target;
991         init_local_signal();
992
993         seed = random();
994         for (;;) {
995                 submitted = NULL;
996                 xseg_prepare_wait(xseg, srcport);
997                 if (nr_submitted < loops &&
998                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
999                         xseg_cancel_wait(xseg, srcport);
1000                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
1001                         if (r < 0) {
1002                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
1003                                         targetlen, chunksize);
1004                                 xseg_put_request(xseg, submitted, srcport);
1005                                 return -1;
1006                         }
1007
1008                         req_target = xseg_get_target(xseg, submitted);
1009                         reported = 0;
1010                         mkname(namebuf, targetlen, seed);
1011                         namebuf[targetlen] = 0;
1012                         //printf("%ld: %s\n", nr_submitted, namebuf);
1013                         offset = 0;//pick(size);
1014
1015                         strncpy(req_target, namebuf, targetlen);
1016                         submitted->offset = offset;
1017                         submitted->size = chunksize;
1018                         submitted->op = X_READ;
1019                         port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
1020                         if (port == NoPort) {
1021                                 xseg_put_request(xseg, submitted, srcport);
1022                         } else {
1023                                 seed = random();
1024                                 nr_submitted += 1;
1025                                 xseg_signal(xseg, port);
1026                         }
1027                 }
1028
1029                 received = xseg_receive(xseg, srcport, 0);
1030                 if (received) {
1031                         xseg_cancel_wait(xseg, srcport);
1032                         nr_received += 1;
1033                         req_target = xseg_get_target(xseg, received);
1034                         req_data = xseg_get_data(xseg, received);
1035                         if (!(received->state & XS_SERVED)) {
1036                                 nr_failed += 1;
1037                                 report_request(received);
1038                         } else if (!chkchunk(req_data, received->datalen,
1039                                         req_target, received->targetlen, received->offset)) {
1040         //                      report_request(received);
1041                                 nr_mismatch += 1;
1042                         }
1043
1044                         if (xseg_put_request(xseg, received, srcport))
1045                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
1046                 }
1047
1048                 if (!submitted && !received)
1049                         xseg_wait_signal(xseg, 1000000);
1050
1051                 if (nr_submitted % 1000 == 0 && !reported) {
1052                         reported = 1;
1053                         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1054                         nr_submitted, nr_received, nr_failed, nr_mismatch);
1055                 }
1056
1057                 if (nr_received >= loops)
1058                         break;
1059         }
1060
1061         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1062                 nr_submitted, nr_received, nr_failed, nr_mismatch);
1063         return 0;
1064 }
1065
1066 int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
1067 {
1068         if (loops < 0)
1069                 return help();
1070
1071         struct xseg_request *submitted = NULL, *received;
1072         long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
1073         int r;
1074         uint64_t offset;
1075         uint32_t targetlen = 10, chunksize = 4096;
1076         struct timeval tv1, tv2;
1077         xport p;
1078         char *req_data, *req_target;
1079
1080         xseg_bind_port(xseg, srcport, NULL);
1081
1082         gettimeofday(&tv1, NULL);
1083         for (;;) {
1084                 submitted = NULL;
1085                 xseg_prepare_wait(xseg, srcport);
1086                 if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
1087                     (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
1088                         xseg_cancel_wait(xseg, srcport);
1089                         r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
1090                         if (r < 0) {
1091                                 fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
1092                                         targetlen, chunksize);
1093                                 xseg_put_request(xseg, submitted, srcport);
1094                                 return -1;
1095                         }
1096                         
1097                         //FIXME
1098                         ++nr_flying;
1099                         nr_submitted += 1;
1100                         offset = 0;//pick(size);
1101
1102                         submitted->offset = offset;
1103                         submitted->size = chunksize;
1104                         req_target = xseg_get_target(xseg, submitted);
1105                         req_data = xseg_get_data(xseg, submitted);
1106
1107                         if (op == 0)
1108                                 submitted->op = X_INFO;
1109                         else if (op == 1)
1110                                 submitted->op = X_READ;
1111                         else if (op == 2) {
1112                                 submitted->op = X_WRITE;
1113                                 mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
1114                         }
1115
1116                         p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
1117                         if ( p != NoPort){
1118                                 if (xseg_signal(xseg, p) < 0)
1119                                         perror("Cannot signal peer");
1120                         }
1121                 }
1122                 received = xseg_receive(xseg, srcport, 0);
1123                 if (received) {
1124                         xseg_cancel_wait(xseg, srcport);
1125                         --nr_flying;
1126                         if (nr_received == 0)
1127                                 fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
1128                                         (unsigned long long)received->elapsed);
1129                         nr_received += 1;
1130                         if (!(received->state & XS_SERVED)) {
1131                                 nr_failed += 1;
1132                                 //report_request(received);
1133                         }
1134
1135                         if (xseg_put_request(xseg, received, srcport))
1136                                 fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
1137                 }
1138
1139                 if (!submitted && !received)
1140                         xseg_wait_signal(xseg, 10000000L);
1141
1142                 if (nr_received >= loops)
1143                         break;
1144         }
1145         gettimeofday(&tv2, NULL);
1146
1147         fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1148                 nr_submitted, nr_received, nr_failed, nr_mismatch);
1149         long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
1150         fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
1151
1152         return 0;
1153 }
1154
1155 static void lock_status(struct xlock *lock, char *buf, int len)
1156 {
1157         int r;
1158         if (lock->owner == Noone)
1159                 r = snprintf(buf, len, "Locked: No");
1160         else
1161                 r = snprintf(buf, len, "Locked: Yes (Owner: %lu)", lock->owner);
1162         if (r >= len)
1163                 buf[len-1] = 0;
1164 }
1165
1166 int cmd_report(uint32_t portno)
1167 {
1168         char fls[64], rls[64], pls[64]; // buffer to store lock status
1169         struct xseg_port *port = xseg_get_port(xseg, portno);
1170         if (!port) {
1171                 printf("port %u is not assigned\n", portno);
1172                 return 0;
1173         }
1174         struct xq *fq, *rq, *pq;
1175         fq = xseg_get_queue(xseg, port, free_queue);
1176         rq = xseg_get_queue(xseg, port, request_queue);
1177         pq = xseg_get_queue(xseg, port, reply_queue);
1178         lock_status(&port->fq_lock, fls, 64);
1179         lock_status(&port->rq_lock, rls, 64);
1180         lock_status(&port->pq_lock, pls, 64);
1181         fprintf(stderr, "port %u:\n"
1182                 "   requests: %llu/%llu  next: %u  dst gw: %u\n"
1183                 "       free_queue [%p] count : %4llu | %s\n"
1184                 "    request_queue [%p] count : %4llu | %s\n"
1185                 "      reply_queue [%p] count : %4llu | %s\n",
1186                 portno, (unsigned long long)port->alloc_reqs, 
1187                 (unsigned long long)port->max_alloc_reqs,
1188                 xseg->path_next[portno],
1189                 xseg->dst_gw[portno],
1190                 (void *)fq, (unsigned long long)xq_count(fq), fls,
1191                 (void *)rq, (unsigned long long)xq_count(rq), rls,
1192                 (void *)pq, (unsigned long long)xq_count(pq), pls);
1193         return 0;
1194 }
1195
1196 int cmd_join(void)
1197 {
1198         if (xseg)
1199                 return 0;
1200
1201         xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
1202         if (!xseg) {
1203                 fprintf(stderr, "cannot join segment!\n");
1204                 return -1;
1205         }
1206         return 0;
1207 }
1208 static void print_hanlder(char *name, struct xobject_h *obj_h)
1209 {
1210         char ls[64];
1211         lock_status(&obj_h->lock, ls, 64);
1212         fprintf(stderr, "%20s: free: %4llu, allocated: %4llu, allocated space: %7llu (object size: %llu), Lock %s\n",
1213                         name,
1214                         (unsigned long long) obj_h->nr_free,
1215                         (unsigned long long) obj_h->nr_allocated,
1216                         (unsigned long long) obj_h->allocated_space,
1217                         (unsigned long long) obj_h->obj_size, ls);
1218 }
1219
1220 //FIXME ugly
1221 static void print_heap(struct xseg *xseg)
1222 {
1223         char *UNIT[4];
1224         UNIT[0] = "B";
1225         UNIT[1] = "KiB";
1226         UNIT[2] = "MiB";
1227         UNIT[3] = "GiB";
1228         uint64_t MULT[4];
1229         MULT[0] = 1;
1230         MULT[1] = 1024;
1231         MULT[2] = 1024*1024;
1232         MULT[3] = 1024*1024*1024;
1233
1234         int u;
1235         uint64_t t;
1236         fprintf(stderr, "Heap usage: ");
1237         u = 0;
1238         t = xseg->heap->cur;
1239         while (t > 0) {
1240                 t /= 1024;
1241                 u++;
1242         }
1243         if (!t)
1244                 u--;
1245         t = xseg->heap->cur / MULT[u];
1246         if (t < 10){
1247                 float tf = ((float)(xseg->heap->cur))/((float)MULT[u]);
1248                 fprintf(stderr, "%2.1f %s/", tf, UNIT[u]);
1249         }
1250         else {
1251                 unsigned int tu = xseg->heap->cur / MULT[u];
1252                 fprintf(stderr, "%3u %s/", tu, UNIT[u]);
1253         }
1254
1255         u = 0;
1256         t = xseg->config.heap_size;
1257         while (t > 0) {
1258                 t /= 1024;
1259                 u++;
1260         }
1261         if (!t)
1262                 u--;
1263         t = xseg->config.heap_size/MULT[u];
1264         if (t < 10){
1265                 float tf = ((float)(xseg->config.heap_size))/(float)MULT[u];
1266                 fprintf(stderr, "%2.1f %s ", tf, UNIT[u]);
1267         }
1268         else {
1269                 unsigned int tu = xseg->config.heap_size / MULT[u];
1270                 fprintf(stderr, "%3u %s ", tu, UNIT[u]);
1271         }
1272         char ls[64];
1273         lock_status(&xseg->heap->lock, ls, 64);
1274         fprintf(stderr, "(%llu / %llu), %s\n",
1275                         (unsigned long long)xseg->heap->cur,
1276                         (unsigned long long)xseg->config.heap_size,
1277                         ls);
1278 }
1279
1280 int cmd_reportall(void)
1281 {
1282         uint32_t t;
1283
1284         if (cmd_join())
1285                 return -1;
1286
1287         fprintf(stderr, "Segment lock: %s\n",
1288                 (xseg->shared->flags & XSEG_F_LOCK) ? "Locked" : "Unlocked");
1289         print_heap(xseg);
1290         /* fprintf(stderr, "Heap usage: %llu / %llu\n", */
1291         /*              (unsigned long long)xseg->heap->cur, */
1292         /*              (unsigned long long)xseg->config.heap_size); */
1293         fprintf(stderr, "Handlers: \n");
1294         print_hanlder("Requests handler", xseg->request_h);
1295         print_hanlder("Ports handler", xseg->port_h);
1296         print_hanlder("Objects handler", xseg->object_handlers);
1297         fprintf(stderr, "\n");
1298
1299         for (t = 0; t < xseg->config.nr_ports; t++)
1300                 cmd_report(t);
1301
1302         return 0;
1303 }
1304
1305
1306 int finish_req(struct xseg_request *req, enum req_action action)
1307 {
1308         if (action == COMPLETE){
1309                 req->state &= ~XS_FAILED;
1310                 req->state |= XS_SERVED;
1311         } else {
1312                 req->state |= XS_FAILED;
1313                 req->state &= ~XS_SERVED;
1314         }
1315         req->serviced = 0;
1316         xport p = xseg_respond(xseg, req, srcport, X_ALLOC);
1317         if (p == NoPort)
1318                 xseg_put_request(xseg, req, srcport);
1319         else
1320                 xseg_signal(xseg, p);
1321         return 0;
1322 }
1323
1324 //FIXME this should be in xseg lib?
1325 static int isDangling(struct xseg_request *req)
1326 {
1327         xport i;
1328         struct xseg_port *port;
1329         for (i = 0; i < xseg->config.nr_ports; i++) {
1330                 if (xseg->ports[i]){
1331                         port = xseg_get_port(xseg, i);
1332                         if (!port){
1333                                 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1334                                 continue;
1335                         }
1336                         struct xq *fq, *rq, *pq;
1337                         fq = xseg_get_queue(xseg, port, free_queue);
1338                         rq = xseg_get_queue(xseg, port, request_queue);
1339                         pq = xseg_get_queue(xseg, port, reply_queue);
1340                         xlock_acquire(&port->fq_lock, srcport);
1341                         if (__xq_check(fq, XPTR_MAKE(req, xseg->segment))){
1342                                         xlock_release(&port->fq_lock);
1343                                         return 0;
1344                         }
1345                         xlock_release(&port->fq_lock);
1346                         xlock_acquire(&port->rq_lock, srcport);
1347                         if (__xq_check(rq, XPTR_MAKE(req, xseg->segment))){
1348                                         xlock_release(&port->rq_lock);
1349                                         return 0;
1350                         }
1351                         xlock_release(&port->rq_lock);
1352                         xlock_acquire(&port->pq_lock, srcport);
1353                         if (__xq_check(pq, XPTR_MAKE(req, xseg->segment))){
1354                                         xlock_release(&port->pq_lock);
1355                                         return 0;
1356                         }
1357                         xlock_release(&port->pq_lock);
1358                 }
1359         }
1360         return 1;
1361 }
1362
1363 int prompt_user(char *msg)
1364 {
1365         int c = 0, r = -1;
1366         printf("%s [y/n]: ", msg);
1367         while (1) {
1368                 c = fgetc(stdin);
1369                 if (c == 'y' || c == 'Y')
1370                         r = 1;
1371                 else if (c == 'n' || c == 'N')
1372                         r = 0;
1373                 else if (c == '\n'){
1374                         if (r == -1)
1375                                 printf("%s [y/n]: ", msg);
1376                         else
1377                                 break;
1378                 }
1379         }
1380         return r;
1381 }
1382
1383 //FIXME this should be in xseg lib?
1384 int cmd_verify(int fix)
1385 {
1386         if (cmd_join())
1387                 return -1;
1388         //segment lock
1389         if (xseg->shared->flags & XSEG_F_LOCK){
1390                 fprintf(stderr, "Segment lock: Locked\n");
1391                 if (fix && prompt_user("Unlock it ?"))
1392                         xseg->shared->flags &= ~XSEG_F_LOCK;
1393         }
1394         //heap lock
1395         if (xseg->heap->lock.owner != Noone){
1396                 fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
1397                         (unsigned long long)xseg->heap->lock.owner);
1398                 if (fix && prompt_user("Unlock it ?"))
1399                         xlock_release(&xseg->heap->lock);
1400         }
1401         //obj_h locks
1402         if (xseg->request_h->lock.owner != Noone){
1403                 fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
1404                         (unsigned long long)xseg->request_h->lock.owner);
1405                 if (fix && prompt_user("Unlock it ?"))
1406                         xlock_release(&xseg->request_h->lock);
1407         }
1408         if (xseg->port_h->lock.owner != Noone){
1409                 fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
1410                         (unsigned long long)xseg->port_h->lock.owner);
1411                 if (fix && prompt_user("Unlock it ?"))
1412                         xlock_release(&xseg->port_h->lock);
1413         }
1414         if (xseg->object_handlers->lock.owner != Noone){
1415                 fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
1416                         (unsigned long long)xseg->object_handlers->lock.owner);
1417                 if (fix && prompt_user("Unlock it ?"))
1418                         xlock_release(&xseg->object_handlers->lock);
1419         }
1420         //take segment lock?
1421         xport i;
1422         struct xseg_port *port;
1423         for (i = 0; i < xseg->config.nr_ports; i++) {
1424                 if (xseg->ports[i]){
1425                         port = xseg_get_port(xseg, i);
1426                         if (!port){
1427                                 fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1428                                 continue;
1429                         }
1430                         if (port->fq_lock.owner != Noone) {
1431                                 fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
1432                                                 i, (unsigned long long)port->fq_lock.owner);
1433                                 if (fix && prompt_user("Unlock it ?"))
1434                                         xlock_release(&port->fq_lock);
1435                         }
1436                         if (port->rq_lock.owner != Noone) {
1437                                 fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
1438                                                 i, (unsigned long long)port->rq_lock.owner);
1439                                 if (fix && prompt_user("Unlock it ?"))
1440                                         xlock_release(&port->rq_lock);
1441                         }
1442                         if (port->pq_lock.owner != Noone) {
1443                                 fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
1444                                                 i, (unsigned long long)port->pq_lock.owner);
1445                                 if (fix && prompt_user("Unlock it ?"))
1446                                         xlock_release(&port->pq_lock);
1447                         }
1448                 }
1449         }
1450
1451         struct xobject_h *obj_h = xseg->request_h;
1452         struct xobject_iter it;
1453
1454         struct xseg_request *req;
1455         xlock_acquire(&obj_h->lock, srcport);
1456         xobj_iter_init(obj_h, &it);
1457         while (xobj_iterate(obj_h, &it, (void **)&req)){
1458                 //FIXME this will not work cause obj->magic - req->serial is not
1459                 //touched when a request is get
1460                 /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
1461                 if (isDangling(req) && !__xobj_isFree(obj_h, req)){
1462                         report_request(req);
1463                         if (fix && prompt_user("Fail it ?")){
1464                                 printf("Finishing ...\n");
1465                                 finish_req(req, FAIL);
1466                         }
1467                 }
1468         }
1469         xlock_release(&obj_h->lock);
1470         return 0;
1471 }
1472
1473 int cmd_inspectq(xport portno, enum queue qt)
1474 {
1475         if (cmd_join())
1476                 return -1;
1477
1478         struct xq *q;
1479         struct xlock *l;
1480         struct xseg_port *port = xseg_get_port(xseg, portno);
1481         if (!port)
1482                 return -1;
1483         if (qt == FREE_QUEUE){
1484                 q = xseg_get_queue(xseg, port, free_queue);
1485                 l = &port->fq_lock;
1486         }
1487         else if (qt == REQUEST_QUEUE){
1488                 q = xseg_get_queue(xseg, port, request_queue);
1489                 l = &port->rq_lock;
1490         }
1491         else if (qt == REPLY_QUEUE) {
1492                 q = xseg_get_queue(xseg, port, reply_queue);
1493                 l = &port->rq_lock;
1494         }
1495         else
1496                 return -1;
1497         xlock_acquire(l, srcport);
1498         xqindex i,c = xq_count(q);
1499         if (c) {
1500                 struct xseg_request *req;
1501                 xptr xqi;
1502                 for (i = 0; i < c; i++) {
1503                         xqi = __xq_pop_head(q);
1504                         req = XPTR_TAKE(xqi, xseg->segment);
1505                         report_request(req);
1506                         __xq_append_tail(q, xqi);
1507                 }
1508         }
1509         else {
1510                 fprintf(stderr, "Queue is empty\n\n");
1511         }
1512         xlock_release(l);
1513         return 0;
1514 }
1515
1516
1517 int cmd_request(struct xseg_request *req, enum req_action action)
1518 {
1519         if (cmd_join())
1520                 return -1;
1521
1522         struct xobject_h *obj_h = xseg->request_h;
1523         if (!xobj_check(obj_h, req))
1524                 return -1;
1525
1526         if (action == REPORT)
1527                 report_request(req);
1528         else if (action == FAIL){
1529                 report_request(req);
1530                 if (prompt_user("fail it ?")){
1531                         printf("Finishing ...\n");
1532                         finish_req(req, FAIL);
1533                 }
1534         }
1535         else if (action == COMPLETE){
1536                 report_request(req);
1537                 if (prompt_user("Complete it ?")){
1538                         printf("Finishing ...\n");
1539                         finish_req(req, COMPLETE);
1540                 }
1541         }
1542         return 0;
1543 }
1544
1545 int cmd_create(void)
1546 {
1547         int r = xseg_create(&cfg);
1548         if (r) {
1549                 fprintf(stderr, "cannot create segment!\n");
1550                 return -1;
1551         }
1552
1553         fprintf(stderr, "Segment initialized.\n");
1554         return 0;
1555 }
1556
1557 int cmd_destroy(void)
1558 {
1559         if (!xseg && cmd_join())
1560                 return -1;
1561         xseg_leave(xseg);
1562         xseg_destroy(xseg);
1563         xseg = NULL;
1564         fprintf(stderr, "Segment destroyed.\n");
1565         return 0;
1566 }
1567
1568 int cmd_alloc_requests(unsigned long nr)
1569 {
1570         return xseg_alloc_requests(xseg, srcport, nr);
1571 }
1572
1573 int cmd_free_requests(unsigned long nr)
1574 {
1575         return xseg_free_requests(xseg, srcport, nr);
1576 }
1577
1578 int cmd_put_requests(void)
1579 {
1580         struct xseg_request *req;
1581
1582         for (;;) {
1583                 req = xseg_accept(xseg, dstport, 0);
1584                 if (!req)
1585                         break;
1586                 if (xseg_put_request(xseg, req, srcport))
1587                         fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1588         }
1589
1590         return 0;
1591 }
1592
1593 int cmd_finish(unsigned long nr, int fail)
1594 {
1595         struct xseg_request *req;
1596         char *buf = malloc(sizeof(char) * 8128);
1597         char *req_target, *req_data;
1598         xseg_bind_port(xseg, srcport, NULL);
1599         xport p;
1600
1601         for (; nr--;) {
1602                 xseg_prepare_wait(xseg, srcport);
1603                 req = xseg_accept(xseg, srcport, 0);
1604                 if (req) {
1605                         req_target = xseg_get_target(xseg, req);
1606                         req_data = xseg_get_data(xseg, req);
1607                         xseg_cancel_wait(xseg, srcport);
1608                         if (fail == 1)
1609                                 req->state &= ~XS_SERVED;
1610                         else {
1611                                 if (req->op == X_READ)
1612                                         mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1613                                 else if (req->op == X_WRITE) 
1614                                         memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1615                                 else if (req->op == X_INFO)
1616                                         *((uint64_t *) req->data) = 4294967296;
1617                                 
1618                                 req->state |= XS_SERVED;
1619                                 req->serviced = req->size;
1620                         }
1621
1622                         p = xseg_respond(xseg, req, srcport, X_ALLOC);
1623                         xseg_signal(xseg, p);
1624                         continue;
1625                 }
1626                 ++nr;
1627                 xseg_wait_signal(xseg, 10000000L);
1628         }
1629
1630         free(buf);
1631
1632         return 0;
1633 }
1634
1635 void handle_reply(struct xseg_request *req)
1636 {
1637         char *req_data = xseg_get_data(xseg, req);
1638         char *req_target = xseg_get_target(xseg, req);
1639         if (!(req->state & XS_SERVED)) {
1640                 report_request(req);
1641                 goto put;
1642         }
1643
1644         switch (req->op) {
1645         case X_READ:
1646                 fwrite(req_data, 1, req->datalen, stdout);
1647                 break;
1648
1649         case X_WRITE:
1650                 fprintf(stdout, "wrote: ");
1651                 fwrite(req_data, 1, req->datalen, stdout);
1652                 break;
1653         case X_SYNC:
1654         case X_DELETE:
1655                 fprintf(stderr, "deleted %s\n", req_target);
1656                 break;
1657         case X_TRUNCATE:
1658         case X_COMMIT:
1659         case X_CLONE:
1660                 fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1661                 break;
1662         case X_INFO:
1663                 fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1664                 break;
1665         case X_COPY:
1666                 fprintf(stderr, "copied %s\n", ((struct xseg_request_copy *)req_data)->target);
1667                 break;
1668         case X_CLOSE:
1669                 fprintf(stderr, "Closed %s\n", req_target);
1670                 break;
1671         case X_OPEN:
1672                 fprintf(stderr, "Opened %s\n", req_target);
1673                 break;
1674         case X_SNAPSHOT:
1675                 fprintf(stderr, "Snapshotted %s\n", req_target);
1676                 break;
1677         default:
1678                 break;
1679         }
1680
1681 put:
1682         if (xseg_put_request(xseg, req, srcport))
1683                 fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1684 }
1685
1686 int cmd_wait(uint32_t nr)
1687 {
1688         struct xseg_request *req;
1689         long ret;
1690         init_local_signal(); 
1691
1692         for (;;) {
1693                 req = xseg_receive(xseg, srcport, 0);
1694                 if (req) {
1695                         handle_reply(req);
1696                         nr--;
1697                         if (nr == 0)
1698                                 break;
1699                         continue;
1700                 }
1701
1702                 ret = xseg_prepare_wait(xseg, srcport);
1703                 if (ret)
1704                         return -1;
1705
1706                 ret = xseg_wait_signal(xseg, 1000000);
1707                 ret = xseg_cancel_wait(xseg, srcport);
1708                 if (ret)
1709                         return -1;
1710         }
1711
1712         return 0;
1713 }
1714
1715 int cmd_put_replies(void)
1716 {
1717         struct xseg_request *req;
1718
1719         for (;;) {
1720                 req = xseg_receive(xseg, dstport, 0);
1721                 if (!req)
1722                         break;
1723                 fprintf(stderr, "request: %08llx%08llx\n"
1724                         "     op: %u\n"
1725                         "  state: %u\n",
1726                         0LL, (unsigned long long)req->serial,
1727                         req->op,
1728                         req->state);
1729                 report_request(req);
1730
1731                 //fwrite(req->buffer, 1, req->bufferlen, stdout);
1732
1733                 if (xseg_put_request(xseg, req, srcport))
1734                         fprintf(stderr, "Cannot put reply\n");
1735         }
1736
1737         return 0;
1738 }
1739
1740 int cmd_bind(long portno)
1741 {
1742         struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1743         if (!port) {
1744                 fprintf(stderr, "failed to bind port %ld\n", portno);
1745                 return 1;
1746         }
1747
1748         fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1749         return 0;
1750 }
1751
1752 int cmd_signal(uint32_t portno)
1753 {
1754         return xseg_signal(xseg, portno);
1755 }
1756
1757 int cmd_set_next(xport portno, xport next)
1758 {
1759         xseg->path_next[portno] = next;
1760         return 0;
1761 }
1762
1763 int parse_ports(char *str)
1764 {
1765         int ret = 0;
1766         char *s = str;
1767
1768         for (;;) {
1769                 if (*s == 0)
1770                         return 0;
1771
1772                 if (*s == ':') {
1773                         *s = 0;
1774                         if ((s > str) && isdigit(str[0])) {
1775                                 srcport = atol(str);
1776                                 ret ++;
1777                         }
1778                         break;
1779                 }
1780                 s ++;
1781         }
1782
1783         s += 1;
1784         str = s;
1785
1786         for (;;) {
1787                 if (*s == 0) {
1788                         if ((s > str) && isdigit(str[0])) {
1789                                 dstport = atol(str);
1790                                 ret ++;
1791                         }
1792                         break;
1793                 }
1794                 s ++;
1795         }
1796
1797         return ret;
1798 }
1799
1800 int main(int argc, char **argv)
1801 {
1802         int i, ret = 0;
1803         char *spec;
1804
1805         if (argc < 3)
1806                 return help();
1807
1808         srcport = -1;
1809         dstport = -1;
1810         spec = argv[1];
1811
1812         if (xseg_parse_spec(spec, &cfg)) {
1813                 fprintf(stderr, "Cannot parse spec\n");
1814                 return -1;
1815         }
1816
1817         if (xseg_initialize()) {
1818                 fprintf(stderr, "cannot initialize!\n");
1819                 return -1;
1820         }
1821
1822         for (i = 2; i < argc; i++) {
1823
1824                 if (!strcmp(argv[i], "create")) {
1825                         ret = cmd_create();
1826                         continue;
1827                 }
1828
1829                 if (!strcmp(argv[i], "join")) {
1830                         ret = cmd_join();
1831                         if (!ret)
1832                                 fprintf(stderr, "Segment joined.\n");
1833                         continue;
1834                 }
1835
1836                 if (!strcmp(argv[i], "destroy")) {
1837                         ret = cmd_destroy();
1838                         continue;
1839                 }
1840
1841                 if (cmd_join())
1842                         return -1;
1843
1844                 if (!strcmp(argv[i], "reportall")) {
1845                         ret = cmd_reportall();
1846                         continue;
1847                 }
1848
1849                 if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1850                         ret = cmd_bind(atol(argv[i+1]));
1851                         i += 1;
1852                         continue;
1853                 }
1854
1855                 if (!strcmp(argv[i], "set-next") && (i + 2 < argc)) {
1856                         ret = cmd_set_next(atol(argv[i+1]), atol(argv[i+2]));
1857                         i += 2;
1858                         continue;
1859                 }
1860
1861                 if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1862                         ret = cmd_signal(atol(argv[i+1]));
1863                         i += 1;
1864                         continue;
1865                 }
1866
1867                 if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1868                         ret = cmd_bridge(atol(argv[i+1]),
1869                                          atol(argv[i+2]),
1870                                          argv[i+3],
1871                                          argv[i+4]);
1872                         i += 4;
1873                         continue;
1874                 }
1875
1876                 if (srcport == -1) {
1877                         if (!parse_ports(argv[i]))
1878                                 fprintf(stderr, "source port undefined: %s\n", argv[i]);
1879                         continue;
1880                 }
1881
1882                 if (dstport == -1) {
1883                         if (!parse_ports(argv[i]))
1884                                 fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1885                         continue;
1886                 }
1887
1888                 if (!strcmp(argv[i], "verify")) {
1889                         ret = cmd_verify(0);
1890                         continue;
1891                 }
1892
1893                 if (!strcmp(argv[i], "verify-fix")) {
1894                         ret = cmd_verify(1);
1895                         continue;
1896                 }
1897
1898                 if (!strcmp(argv[i], "failreq") && (i + 1 < argc)) {
1899                         struct xseg_request *req;
1900                         sscanf(argv[i+1], "%lx", (unsigned long *)&req);
1901                         ret = cmd_request(req, FAIL);
1902                         i += 1;
1903                         continue;
1904                 }
1905
1906                 if (!strcmp(argv[i], "inspect-freeq") && (i + 1 < argc)) {
1907                         ret = cmd_inspectq(atol(argv[i+1]), FREE_QUEUE);
1908                         i += 1;
1909                         continue;
1910                 }
1911
1912                 if (!strcmp(argv[i], "inspect-requestq") && (i + 1 < argc)) {
1913                         ret = cmd_inspectq(atol(argv[i+1]), REQUEST_QUEUE);
1914                         i += 1;
1915                         continue;
1916                 }
1917
1918                 if (!strcmp(argv[i], "inspect-replyq") && (i + 1 < argc)) {
1919                         ret = cmd_inspectq(atol(argv[i+1]), REPLY_QUEUE);
1920                         i += 1;
1921                         continue;
1922                 }
1923
1924                 if (!strcmp(argv[i], "report")) {
1925                         ret = cmd_report(dstport);
1926                         continue;
1927                 }
1928
1929                 if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1930                         ret = cmd_alloc_requests(atol(argv[i+1]));
1931                         i += 1;
1932                         continue;
1933                 }
1934
1935                 if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1936                         ret = cmd_free_requests(atol(argv[i+1]));
1937                         i += 1;
1938                         continue;
1939                 }
1940
1941                 if (!strcmp(argv[i], "put_requests")) {
1942                         ret = cmd_put_requests();
1943                         continue;
1944                 }
1945
1946                 if (!strcmp(argv[i], "put_replies")) {
1947                         ret = cmd_put_replies();
1948                         continue;
1949                 }
1950
1951                 if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1952                         ret = cmd_finish(atol(argv[i+1]), 0);
1953                         i += 1;
1954                         continue;
1955                 }
1956
1957                 if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1958                         ret = cmd_finish(atol(argv[i+1]), 1);
1959                         i += 1;
1960                         continue;
1961                 }
1962
1963                 if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1964                         ret = cmd_wait(atol(argv[i+1]));
1965                         i += 1;
1966                         continue;
1967                 }
1968
1969                 if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1970                         long nr_loops = atol(argv[i+1]);
1971                         unsigned int seed = atoi(argv[i+2]);
1972                         unsigned int targetlen = atoi(argv[i+3]);
1973                         unsigned int chunksize = atoi(argv[i+4]);
1974                         unsigned long objectsize = atol(argv[i+5]);
1975                         ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1976                         i += 5;
1977                         continue;
1978                 }
1979                 
1980                 if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1981                         long nr_loops = atol(argv[i+1]);
1982                         unsigned int seed = atoi(argv[i+2]);
1983                         unsigned int targetlen = atoi(argv[i+3]);
1984                         ret = cmd_rnddelete(nr_loops, seed, targetlen);
1985                         i += 3;
1986                         continue;
1987                 }
1988
1989                 if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1990                         long nr_loops = atol(argv[i+1]);
1991                         unsigned int seed = atoi(argv[i+2]);
1992                         unsigned int targetlen = atoi(argv[i+3]);
1993                         unsigned int chunksize = atoi(argv[i+4]);
1994                         unsigned long objectsize = atol(argv[i+5]);
1995                         ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1996                         i += 5;
1997                         continue;
1998                 }
1999
2000                 if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
2001                         long nr_loops = atol(argv[i+1]);
2002                         long concurrent_reqs = atol(argv[i+2]);
2003                         int op = atoi(argv[i+3]);
2004                         ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
2005                         i += 3;
2006                         continue;
2007                 }
2008
2009                 if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
2010                         char *target = argv[i+1];
2011                         uint64_t offset = atol(argv[i+2]);
2012                         uint64_t size   = atol(argv[i+3]);
2013                         ret = cmd_read(target, offset, size);
2014                         i += 3;
2015                         continue;
2016                 }
2017
2018                 if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
2019                         char *target = argv[i+1];
2020                         uint64_t offset = atol(argv[i+2]);
2021                         ret = cmd_write(target, offset);
2022                         i += 2;
2023                         continue;
2024                 }
2025
2026                 if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
2027                         char *target = argv[i+1];
2028                         uint64_t offset = atol(argv[i+2]);
2029                         ret = cmd_truncate(target, offset);
2030                         i += 2;
2031                         continue;
2032                 }
2033
2034                 if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
2035                         char *target = argv[i+1];
2036                         ret = cmd_delete(target);
2037                         i += 1;
2038                         continue;
2039                 }
2040
2041                 if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
2042                         char *target = argv[i+1];
2043                         ret = cmd_acquire(target);
2044                         i += 1;
2045                         continue;
2046                 }
2047
2048                 if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
2049                         char *target = argv[i+1];
2050                         ret = cmd_release(target);
2051                         i += 1;
2052                         continue;
2053                 }
2054
2055                 if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
2056                         char *src = argv[i+1];
2057                         char *dst = argv[i+2];
2058                         ret = cmd_copy(src, dst);
2059                         i += 2;
2060                         continue;
2061                 }
2062
2063                 if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
2064                         char *src = argv[i+1];
2065                         char *dst = argv[i+2];
2066                         ret = cmd_clone(src, dst);
2067                         i += 2;
2068                         continue;
2069                 }
2070                 if (!strcmp(argv[i], "snapshot") && (i + 2 < argc)) {
2071                         char *src = argv[i+1];
2072                         char *dst = argv[i+2];
2073                         ret = cmd_snapshot(src, dst, 4096*1024);
2074                         i += 2;
2075                         continue;
2076                 }
2077
2078                 if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
2079                         char *target = argv[i+1];
2080                         ret = cmd_info(target);
2081                         i += 1;
2082                         continue;
2083                 }
2084
2085
2086                 if (!parse_ports(argv[i]))
2087                         fprintf(stderr, "invalid argument: %s\n", argv[i]);
2088         }
2089
2090         /* xseg_leave(); */
2091         return ret;
2092 }