Statistics
| Branch: | Revision:

root / xseg / peers / user / xseg-tool.c @ 26e49332

History | View | Annotate | Download (45.7 kB)

1
#define _GNU_SOURCE
2
#include <stdio.h>
3
#include <stdlib.h>
4
#include <string.h>
5
#include <stdint.h>
6
#include <ctype.h>
7
#include <sys/stat.h>
8
#include <sys/types.h>
9
#include <fcntl.h>
10
#include <unistd.h>
11

    
12
#include <xtypes/xhash.h>
13
#include <xtypes/xobj.h>
14
#include <xseg/xseg.h>
15
#include <xseg/protocol.h>
16
int help(void)
17
{
18
        printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
19
                "spec:\n"
20
                "    <type:name:nr_ports:nr_requests:request_size:extra_size:page_shift>\n"
21
                "global commands:\n"
22
                "    reportall\n"
23
                "    create\n"
24
                "    destroy\n"
25
                "    bind <portno>\n"
26
                "    signal <portno>\n"
27
                "    bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
28
                "port commands:\n"
29
                "    report\n"
30
                "    alloc_requests (to source) <nr>\n"
31
                "    free_requests (from source) <nr>\n"
32
                "    put_requests (all from dest)\n"
33
                "    put_replies (all from dest)\n"
34
                "    wait        <nr_replies>\n"
35
                "    complete    <nr_requests>\n"
36
                "    fail        <nr_requests>\n"
37
                "    rndwrite    <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
38
                "    rndread     <nr_loops> <seed> <targetlen> <datalen> <objectsize>\n"
39
                "    submit_reqs <nr_loops> <concurrent_reqs>\n"
40
                "    info        <target>\n"
41
                "    read        <target> <offset> <size>\n"
42
                "    write       <target> <offset> < data\n"
43
                "    truncate    <target> <size>\n"
44
                "    delete      <target>\n"
45
                "    acquire     <target>\n"
46
                "    release     <target>\n"
47
                "    copy        <src>  <dst>\n"
48
                "    clone       <src>  <dst>\n"
49
        );
50
        return 1;
51
}
52

    
53

    
54
enum req_action {
55
        REPORT = 1,
56
        FAIL = 2,
57
        COMPLETE = 3
58
};
59

    
60
enum queue {
61
        FREE_QUEUE = 0,
62
        REQUEST_QUEUE = 1,
63
        REPLY_QUEUE = 2
64
};
65

    
66
char *namebuf;
67
char *chunk;
68
struct xseg_config cfg;
69
struct xseg *xseg;
70
uint32_t srcport, dstport;
71
uint64_t reqs;
72
#define mkname mkname_heavy
73
/* heavy distributes duplicates much more widely than light
74
 * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
75
 */
76

    
77
xport sport = NoPort;
78
static void init_local_signal() 
79
{
80
        if (xseg && sport != srcport){
81
                xseg_init_local_signal(xseg, srcport);
82
                sport = srcport;
83
        }
84
}
85

    
86
void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
87
{
88
        int i;
89
        char c;
90
        for (i = 0; i < namelen; i += 1) {
91
                c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24);
92
                c = '0' + ((c + (c >> 4)) & 0xf);
93
                if (c > '9')
94
                        c += 'a'-'0'-10;
95
                name[i] = c;
96
                seed *= ((seed % 137911) | 1) * 137911;
97
        }
98
}
99

    
100
void mkname_light(char *name, uint32_t namelen, uint32_t seed)
101
{
102
        int i;
103
        char c;
104
        for (i = 0; i < namelen; i += 1) {
105
                c = seed;
106
                name[i] = 'A' + (c & 0xf);
107
                seed += 1;
108
        }
109
}
110

    
111
uint64_t pick(uint64_t size)
112
{
113
        return (uint64_t)((double)(RAND_MAX) / random());
114
}
115

    
116
void mkchunk(        char *chunk, uint32_t datalen,
117
                char *target, uint32_t targetlen, uint64_t offset)
118
{
119
        long i, r, bufsize = targetlen + 16;
120
        char buf[bufsize];
121
        r = datalen % bufsize;
122
        snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
123

    
124
        for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
125
                memcpy(chunk + i, buf, bufsize);
126

    
127
        memcpy(chunk + datalen - r, buf, r);
128
}
129

    
130
int chkchunk(        char *chunk, uint32_t datalen,
131
                char *target, uint32_t targetlen, uint64_t offset)
132
{
133
        long i, r;
134
        int bufsize = targetlen + 16;
135
        char buf[bufsize];
136
        r = datalen % targetlen;
137
        snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, target);
138

    
139
        for (i = 0; i <= (long)datalen - bufsize; i += bufsize)
140
                if (memcmp(chunk + i, buf, bufsize)) {
141
                        /*printf("mismatch: '%*s'* vs '%*s'\n",
142
                                bufsize, buf, datalen, chunk);
143
                        */
144
                        return 0;
145
                }
146

    
147
        if (memcmp(chunk + datalen - r, buf, r))
148
                return 0;
149

    
150
        return 1;
151
}
152

    
153

    
154
#define ALLOC_MIN 4096
155
#define ALLOC_MAX 1048576
156

    
157
void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize)
158
{
159
        static uint64_t alloc_size;
160
        static char *buf;
161
        uint64_t size = 0;
162
        char *p;
163
        size_t r;
164

    
165
        if (alloc_size < ALLOC_MIN)
166
                alloc_size = ALLOC_MIN;
167

    
168
        if (alloc_size > ALLOC_MAX)
169
                alloc_size = ALLOC_MAX;
170

    
171
        p = realloc(buf, alloc_size);
172
        if (!p) {
173
                if (buf)
174
                        free(buf);
175
                buf = NULL;
176
                goto out;
177
        }
178

    
179
        buf = p;
180

    
181
        while (!feof(fp)) {
182
                r = fread(buf + size, 1, alloc_size - size, fp);
183
                if (!r)
184
                        break;
185
                size += r;
186
                if (size >= alloc_size) {
187
                        p = realloc(buf, alloc_size * 2);
188
                        if (!p) {
189
                                if (buf)
190
                                        free(buf);
191
                                buf = NULL;
192
                                size = 0;
193
                                goto out;
194
                        }
195
                        buf = p;
196
                        alloc_size *= 2;
197
                }
198
        }
199

    
200
out:
201
        *retbuf = buf;
202
        *retsize = size;
203
}
204

    
205
void report_request(struct xseg_request *req)
206
{
207
        char target[64], data[64];
208
        char *req_target, *req_data;
209
        unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
210
        req_target = xseg_get_target(xseg, req);
211
        req_data = xseg_get_data(xseg, req);
212

    
213
        strncpy(target, req_target, end);
214
        target[end] = 0;
215
        strncpy(data, req_data, 63);
216
        data[63] = 0;
217
        fprintf(stderr,
218
                "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
219
                "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
220
                "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
221
                (unsigned long) req, req->targetlen, (unsigned long long)req->target,
222
                target,
223
                (unsigned long long) req->datalen, (unsigned long long) req->data,
224
                data,
225
                (unsigned long long) req->offset, (unsigned long long) req->size,
226
                (unsigned long long) req->serviced, req->op, req->state, req->flags,
227
                (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
228
                (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
229

    
230

    
231
}
232

    
233
int cmd_info(char *target)
234
{
235
        uint32_t targetlen = strlen(target);
236
        size_t size = sizeof(uint64_t);
237
        int r;
238
        xport p;
239
        struct xseg_request *req;
240
        char *req_target;
241

    
242
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
243
        if (!req) {
244
                fprintf(stderr, "No request!\n");
245
                return -1;
246
        }
247

    
248
        r = xseg_prep_request(xseg, req, targetlen, size);
249
        if (r < 0) {
250
                fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
251
                        (unsigned long) targetlen, (unsigned long) size);
252
                xseg_put_request(xseg, req, srcport);
253
                return -1;
254
        }
255

    
256
        req_target = xseg_get_target(xseg, req);
257
        strncpy(req_target, target, targetlen);
258
        req->offset = 0;
259
        req->size = size;
260
        req->op = X_INFO;
261

    
262
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
263
        if (p == NoPort)
264
                return -1;
265

    
266
        xseg_signal(xseg, p);
267

    
268
        return 0;
269
}
270

    
271
int cmd_read(char *target, uint64_t offset, uint64_t size)
272
{
273
        uint32_t targetlen = strlen(target);
274
        int r;
275
        xport p;
276
        char *req_target;
277
        struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
278
        if (!req) {
279
                fprintf(stderr, "No request\n");
280
                return -1;
281
        }
282

    
283
        r = xseg_prep_request(xseg, req, targetlen, size);
284
        if (r < 0) {
285
                fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
286
                        (unsigned long)targetlen, (unsigned long long)size);
287
                xseg_put_request(xseg, req, srcport);
288
                return -1;
289
        }
290

    
291
        req_target = xseg_get_target(xseg, req);
292
        strncpy(req_target, target, targetlen);
293
        req->offset = offset;
294
        req->size = size;
295
        req->op = X_READ;
296
        report_request(req);
297
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
298
        if (p == NoPort)
299
                return -1;
300

    
301
        xseg_signal(xseg, p);
302
        return 0;
303
}
304

    
305
int cmd_write(char *target, uint64_t offset)
306
{
307
        char *buf = NULL;
308
        int r;
309
        xport p;
310
        uint64_t size = 0;
311
        char *req_target, *req_data;
312
        uint32_t targetlen = strlen(target);
313
        struct xseg_request *req;
314

    
315
        inputbuf(stdin, &buf, &size);
316
        if (!size) {
317
                fprintf(stderr, "No input\n");
318
                return -1;
319
        }
320

    
321
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
322
        if (!req) {
323
                fprintf(stderr, "No request\n");
324
                return -1;
325
        }
326

    
327
        r = xseg_prep_request(xseg, req, targetlen, size);
328
        if (r < 0) {
329
                fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
330
                        (unsigned long)targetlen, (unsigned long long)size);
331
                xseg_put_request(xseg, req, srcport);
332
                return -1;
333
        }
334

    
335
        req_target = xseg_get_target(xseg, req);
336
        strncpy(req_target, target, targetlen);
337
        
338
        req_data = xseg_get_data(xseg, req);
339
        memcpy(req_data, buf, size);
340
        req->offset = offset;
341
        req->size = size;
342
        req->op = X_WRITE;
343

    
344
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
345
        if (p == NoPort) {
346
                fprintf(stderr, "Cannot submit\n");
347
                return -1;
348
        }
349
        xseg_signal(xseg, p);
350

    
351
        return 0;
352
}
353

    
354
int cmd_truncate(char *target, uint64_t offset)
355
{
356
        return 0;
357
}
358

    
359
int cmd_delete(char *target)
360
{
361
        uint32_t targetlen = strlen(target);
362
        int r;
363
        struct xseg_request *req;
364
        init_local_signal();
365
        xseg_bind_port(xseg, srcport, NULL);
366

    
367
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
368
        if (!req) {
369
                fprintf(stderr, "No request!\n");
370
                return -1;
371
        }
372

    
373
        r = xseg_prep_request(xseg, req, targetlen, 0);
374
        if (r < 0) {
375
                fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
376
                        (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
377
                xseg_put_request(xseg, req, srcport);
378
                return -1;
379
        }
380

    
381
        char *reqtarget = xseg_get_target(xseg, req);
382
        strncpy(reqtarget, target, targetlen);
383
        req->op = X_DELETE;
384

    
385
        xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
386
        if (p == NoPort){
387
                fprintf(stderr, "Couldn't submit request\n");
388
                xseg_put_request(xseg, req, srcport);
389
                return -1;
390
        }
391

    
392
        xseg_signal(xseg, p);
393

    
394
        return 0;
395
}
396

    
397
int cmd_acquire(char *target)
398
{
399
        uint32_t targetlen = strlen(target);
400
        int r;
401
        xport p;
402
        char *req_target;
403
        struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
404
        if (!req) {
405
                fprintf(stderr, "No request\n");
406
                return -1;
407
        }
408

    
409
        r = xseg_prep_request(xseg, req, targetlen, 0);
410
        if (r < 0) {
411
                fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
412
                        (unsigned long)targetlen);
413
                xseg_put_request(xseg, req, srcport);
414
                return -1;
415
        }
416

    
417
        req_target = xseg_get_target(xseg, req);
418
        strncpy(req_target, target, targetlen);
419
        req->offset = 0;
420
        req->size = 0;
421
        req->op = X_OPEN;
422
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
423
        if (p == NoPort)
424
                return -1;
425

    
426
        xseg_signal(xseg, p);
427
        return 0;
428
}
429

    
430
int cmd_release(char *target)
431
{
432
        uint32_t targetlen = strlen(target);
433
        int r;
434
        xport p;
435
        char *req_target;
436
        struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
437
        if (!req) {
438
                fprintf(stderr, "No request\n");
439
                return -1;
440
        }
441

    
442
        r = xseg_prep_request(xseg, req, targetlen, 0);
443
        if (r < 0) {
444
                fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
445
                        (unsigned long)targetlen);
446
                xseg_put_request(xseg, req, srcport);
447
                return -1;
448
        }
449

    
450
        req_target = xseg_get_target(xseg, req);
451
        strncpy(req_target, target, targetlen);
452
        req->offset = 0;
453
        req->size = 0;
454
        req->op = X_CLOSE;
455
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
456
        if (p == NoPort)
457
                return -1;
458

    
459
        xseg_signal(xseg, p);
460
        return 0;
461
        return 0;
462
}
463

    
464
int cmd_copy(char *src, char *dst)
465
{
466
        uint32_t targetlen = strlen(dst);
467
        uint32_t parentlen = strlen(src);
468
        struct xseg_request *req;
469
        struct xseg_request_copy *xcopy;
470
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
471
        if (!req) {
472
                fprintf(stderr, "No request\n");
473
                return -1;
474
        }
475

    
476
        int r = xseg_prep_request(xseg, req, targetlen,
477
                        sizeof(struct xseg_request_copy));
478
        if (r < 0) {
479
                fprintf(stderr, "Cannot prepare request!\n");
480
                xseg_put_request(xseg, req, srcport);
481
                return -1;
482
        }
483

    
484
        char *target = xseg_get_target(xseg, req);
485
        char *data = xseg_get_data(xseg, req);
486

    
487
        strncpy(target, dst, targetlen);
488
        xcopy = (struct xseg_request_copy *) data;
489
        strncpy(xcopy->target, src, parentlen);
490
        xcopy->targetlen = parentlen;
491
        req->offset = 0;
492
        req->size = sizeof(struct xseg_request_copy);
493
        req->op = X_COPY;
494

    
495
        xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
496
        if (p == NoPort){
497
                fprintf(stderr, "Cannot submit request\n");
498
                return -1;
499
        }
500
        xseg_signal(xseg, p);
501

    
502
        return 0;
503
        return 0;
504
}
505

    
506
int cmd_clone(char *src, char *dst)
507
{
508

    
509
        uint32_t targetlen = strlen(dst);
510
        uint32_t parentlen = strlen(src);
511
        struct xseg_request *req;
512
        struct xseg_request_clone *xclone;
513
        xseg_bind_port(xseg, srcport, NULL);
514
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
515
        if (!req) {
516
                fprintf(stderr, "No request\n");
517
                return -1;
518
        }
519

    
520
        int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
521
        if (r < 0) {
522
                fprintf(stderr, "Cannot prepare request!\n");
523
                xseg_put_request(xseg, req, srcport);
524
                return -1;
525
        }
526

    
527
        char *target = xseg_get_target(xseg, req);
528
        char *data = xseg_get_data(xseg, req);
529

    
530
        strncpy(target, dst, targetlen);
531
        xclone = (struct xseg_request_clone *) data;
532
        strncpy(xclone->target, src, parentlen);
533
        xclone->targetlen = parentlen;
534
        xclone->size = -1;
535
        req->offset = 0;
536
        req->size = sizeof(struct xseg_request_clone);
537
        req->op = X_CLONE;
538

    
539
        xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
540
        if (p == NoPort){
541
                fprintf(stderr, "Cannot submit request\n");
542
                return -1;
543
        }
544
        xseg_signal(xseg, p);
545

    
546
        return 0;
547
}
548

    
549
void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
550
                struct xseg_request *req)
551
{
552
        FILE *logfp;
553
        char target[64], data[64];
554
        char *req_target, *req_data;
555
        /* null terminate name in case of req->target is less than 63 characters,
556
         * and next character after name (aka first byte of next buffer) is not
557
         * null
558
         */
559
        unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
560
        
561
        req_target = xseg_get_target(xseg, req);
562
        req_data = xseg_get_data(xseg, req);
563

    
564
        logfp = fdopen(logfd, "a");
565
        if (!logfp)
566
                return;
567

    
568
        switch(method) {
569
        case 0:
570
                strncpy(target, req_target, end);
571
                target[end] = 0;
572
                strncpy(data, req_data, 63);
573
                data[63] = 0;
574

    
575
                fprintf(logfp,
576
                        "src port: %u, dst port: %u,  op:%u offset: %llu size: %lu, reqstate: %u\n"
577
                        "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
578
                        (unsigned int)portno1,
579
                        (unsigned int)portno2,
580
                        (unsigned int)req->op,
581
                        (unsigned long long)req->offset,
582
                        (unsigned long)req->size,
583
                        (unsigned int)req->state,
584
                        (unsigned int)req->targetlen, target,
585
                        (unsigned long long)req->datalen, data);
586
                break;
587
        case 1:
588
                fprintf(logfp,
589
                        "src port: %u, dst port: %u, op: %u\n",
590
                        (unsigned int)portno1,
591
                        (unsigned int)portno2,
592
                        (unsigned int)req->op);
593
                break;
594
        case 2:
595
                fprintf(logfp, "src port: %u, dst port: %u, reqs: %llu\n",
596
                        (unsigned int)portno1,
597
                        (unsigned int)portno2,
598
                        (unsigned long long)++reqs);
599
        }
600

    
601
        fclose(logfp);
602
        return;
603
}
604

    
605
#define LOG_ACCEPT  0
606
#define LOG_RECEIVE 1
607

    
608
int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
609
{
610
        struct xseg_request *req;
611
        int logfd, method;
612
        if (!strcmp(logfile, "-"))
613
                logfd = 1;
614
        else {
615
                logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600);
616
                if (logfd < 0) {
617
                        perror(logfile);
618
                        return -1;
619
                }
620
        }
621

    
622
        if (!strcmp(how, "full"))
623
                method = 0;
624
        else if (!strcmp(how, "summary"))
625
                method = 1;
626
        else
627
                method = 2;
628

    
629
        for (;;) {
630
                int reloop = 0, active;
631
                xseg_prepare_wait(xseg, portno1);
632
                xseg_prepare_wait(xseg, portno2);
633
                req = NULL;
634

    
635
                for (;;) {
636
                        active = 0;
637

    
638
                        //FIXME
639
                        req = xseg_accept(xseg, portno1, 0);
640
                        if (req) {
641
                                xseg_submit(xseg, req, portno2, X_ALLOC);
642
                                log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
643
                                active += 1;
644
                        }
645

    
646
                        req = xseg_accept(xseg, portno2, 0);
647
                        if (req) {
648
                                xseg_submit(xseg, req, portno1, X_ALLOC);
649
                                log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
650
                                active += 1;
651
                        }
652

    
653
                        req = xseg_receive(xseg, portno1, 0);
654
                        if (req) {
655
                                xseg_respond(xseg, req, portno2, X_ALLOC);
656
                                log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
657
                                active += 1;
658
                        }
659

    
660
                        req = xseg_receive(xseg, portno2, 0);
661
                        if (req) {
662
                                xseg_respond(xseg, req, portno1, X_ALLOC);
663
                                log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
664
                                active += 1;
665
                        }
666

    
667
                        if (active == 0) {
668
                                if (reloop)
669
                                        break;
670
                                /* wait on multiple queues? */
671
                                xseg_wait_signal(xseg, 100000);
672
                                break;
673
                        } else {
674
                                xseg_cancel_wait(xseg, portno1);        
675
                                xseg_cancel_wait(xseg, portno2);        
676
                                reloop = 1;
677
                        }
678
                }
679
        }
680

    
681
        close(logfd);
682

    
683
        return 0;
684
}
685

    
686
int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
687
{
688
        if (loops < 0)
689
                return help();
690

    
691
        if (targetlen >= chunksize) {
692
                fprintf(stderr, "targetlen >= chunksize\n");
693
                return -1;
694
        }
695

    
696
        char *p = realloc(namebuf, targetlen+1);
697
        if (!p) {
698
                fprintf(stderr, "Cannot allocate memory\n");
699
                return -1;
700
        }
701
        namebuf = p;
702

    
703
        p = realloc(chunk, chunksize);
704
        if (!p) {
705
                fprintf(stderr, "Cannot allocate memory\n");
706
                return -1;
707
        }
708
        chunk = p;
709
        memset(chunk, 0, chunksize);
710

    
711
        srandom(seed);
712

    
713
        struct xseg_request *submitted = NULL, *received;
714
        long nr_submitted = 0, nr_received = 0, nr_failed = 0;
715
        int reported = 0, r;
716
        uint64_t offset;
717
        xport port;
718
        char *req_data, *req_target;
719
        seed = random();
720
        init_local_signal();
721

    
722
        for (;;) {
723
                xseg_prepare_wait(xseg, srcport);
724
                if (nr_submitted < loops &&
725
                    (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
726
                        xseg_cancel_wait(xseg, srcport);
727
                        r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
728
                        if (r < 0) {
729
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
730
                                        targetlen, chunksize);
731
                                xseg_put_request(xseg, submitted, srcport);
732
                                return -1;
733
                        }
734
                        
735
                        req_target = xseg_get_target(xseg, submitted);
736
                        req_data = xseg_get_data(xseg, submitted);
737

    
738
                        reported = 0;
739
                        mkname(namebuf, targetlen, seed);
740
                        namebuf[targetlen] = 0;
741
                        //printf("%ld: %s\n", nr_submitted, namebuf);
742
                        strncpy(req_target, namebuf, targetlen);
743
                        offset = 0;// pick(size);
744
                        mkchunk(req_data, chunksize, namebuf, targetlen, offset);
745

    
746
                        submitted->offset = offset;
747
                        submitted->size = chunksize;
748
                        submitted->op = X_WRITE;
749
                        submitted->flags |= XF_NOSYNC;
750

    
751
                        port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
752
                        if (port == NoPort) {
753
                                xseg_put_request(xseg, submitted, srcport);
754
                        } else {
755
                                seed = random();
756
                                nr_submitted += 1;
757
                                xseg_signal(xseg, port);
758
                        }
759
                }
760

    
761
                received = xseg_receive(xseg, srcport, 0);
762
                if (received) {
763
                        xseg_cancel_wait(xseg, srcport);
764
                        nr_received += 1;
765
                        if (!(received->state & XS_SERVED)) {
766
                                nr_failed += 1;
767
                                report_request(received);
768
                        }
769
                        if (xseg_put_request(xseg, received, srcport))
770
                                fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
771
                }
772

    
773
                if (!submitted && !received)
774
                        xseg_wait_signal(xseg, 1000000);
775

    
776
                        if (nr_submitted % 1000 == 0 && !reported) {
777
                                reported = 1;
778
                                fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
779
                                        nr_submitted, nr_received, nr_failed);
780
                        }
781

    
782
                        if (nr_received >= loops)
783
                                break;
784
        }
785

    
786
        fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
787
                nr_submitted, nr_received, nr_failed);
788
        return 0;
789
}
790

    
791
int cmd_rnddelete(long loops, int32_t seed, uint32_t targetlen)
792
{
793
        if (loops < 0)
794
                return help();
795
        char *p = realloc(namebuf, targetlen+1);
796
        if (!p) {
797
                fprintf(stderr, "Cannot allocate memory\n");
798
                return -1;
799
        }
800
        namebuf = p;
801

    
802
        srandom(seed);
803

    
804
        struct xseg_request *submitted = NULL, *received;
805
        long nr_submitted = 0, nr_received = 0, nr_failed = 0;
806
        int reported = 0, r;
807
        xport port;
808
        char *req_target;
809
        seed = random();
810
        init_local_signal();
811

    
812
        for (;;) {
813
                xseg_prepare_wait(xseg, srcport);
814
                if (nr_submitted < loops &&
815
                    (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
816
                        xseg_cancel_wait(xseg, srcport);
817
                        r = xseg_prep_request(xseg, submitted, targetlen, 0);
818
                        if (r < 0) {
819
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
820
                                        targetlen, 0);
821
                                xseg_put_request(xseg, submitted, srcport);
822
                                return -1;
823
                        }
824
                        
825
                        req_target = xseg_get_target(xseg, submitted);
826

    
827
                        reported = 0;
828
                        mkname(namebuf, targetlen, seed);
829
                        namebuf[targetlen] = 0;
830
                        //printf("%ld: %s\n", nr_submitted, namebuf);
831
                        strncpy(req_target, namebuf, targetlen);
832
                        submitted->offset = 0;
833
                        submitted->size = 0;
834
                        submitted->op = X_DELETE;
835
                        submitted->flags = 0;
836

    
837
                        port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
838
                        if (port == NoPort) {
839
                                xseg_put_request(xseg, submitted, srcport);
840
                        } else {
841
                                seed = random();
842
                                nr_submitted += 1;
843
                                xseg_signal(xseg, port);
844
                        }
845
                }
846

    
847
                received = xseg_receive(xseg, srcport, 0);
848
                if (received) {
849
                        xseg_cancel_wait(xseg, srcport);
850
                        nr_received += 1;
851
                        if (!(received->state & XS_SERVED)) {
852
                                nr_failed += 1;
853
                                report_request(received);
854
                        }
855
                        if (xseg_put_request(xseg, received, srcport))
856
                                fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
857
                }
858

    
859
                if (!submitted && !received)
860
                        xseg_wait_signal(xseg, 1000000);
861

    
862
                        if (nr_submitted % 1000 == 0 && !reported) {
863
                                reported = 1;
864
                                fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
865
                                        nr_submitted, nr_received, nr_failed);
866
                        }
867

    
868
                        if (nr_received >= loops)
869
                                break;
870
        }
871

    
872
        fprintf(stderr, "submitted %ld, received %ld, failed %ld\n",
873
                nr_submitted, nr_received, nr_failed);
874
        return 0;
875
}
876
/* note:
877
 * prepare/wait rhythm,
878
 * files are converted to independent chunk access patterns,
879
*/
880

    
881
int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize, uint64_t size)
882
{
883
        if (loops < 0)
884
                return help();
885

    
886
        if (targetlen >= chunksize) {
887
                fprintf(stderr, "targetlen >= chunksize\n");
888
                return -1;
889
        }
890

    
891
        char *p = realloc(namebuf, targetlen+1);
892
        if (!p) {
893
                fprintf(stderr, "Cannot allocate memory\n");
894
                return -1;
895
        }
896
        namebuf = p;
897

    
898
        p = realloc(chunk, chunksize);
899
        if (!p) {
900
                fprintf(stderr, "Cannot allocate memory\n");
901
                return -1;
902
        }
903
        chunk = p;
904
        memset(chunk, 0, chunksize);
905

    
906
        srandom(seed);
907

    
908
        struct xseg_request *submitted = NULL, *received;
909
        long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
910
        int reported = 0, r;
911
        uint64_t offset;
912
        xport port;
913
        char *req_data, *req_target;
914
        init_local_signal();
915

    
916
        seed = random();
917
        for (;;) {
918
                submitted = NULL;
919
                xseg_prepare_wait(xseg, srcport);
920
                if (nr_submitted < loops &&
921
                    (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
922
                        xseg_cancel_wait(xseg, srcport);
923
                        r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
924
                        if (r < 0) {
925
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
926
                                        targetlen, chunksize);
927
                                xseg_put_request(xseg, submitted, srcport);
928
                                return -1;
929
                        }
930

    
931
                        req_target = xseg_get_target(xseg, submitted);
932
                        reported = 0;
933
                        mkname(namebuf, targetlen, seed);
934
                        namebuf[targetlen] = 0;
935
                        //printf("%ld: %s\n", nr_submitted, namebuf);
936
                        offset = 0;//pick(size);
937

    
938
                        strncpy(req_target, namebuf, targetlen);
939
                        submitted->offset = offset;
940
                        submitted->size = chunksize;
941
                        submitted->op = X_READ;
942
                        port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
943
                        if (port == NoPort) {
944
                                xseg_put_request(xseg, submitted, srcport);
945
                        } else {
946
                                seed = random();
947
                                nr_submitted += 1;
948
                                xseg_signal(xseg, port);
949
                        }
950
                }
951

    
952
                received = xseg_receive(xseg, srcport, 0);
953
                if (received) {
954
                        xseg_cancel_wait(xseg, srcport);
955
                        nr_received += 1;
956
                        req_target = xseg_get_target(xseg, received);
957
                        req_data = xseg_get_data(xseg, received);
958
                        if (!(received->state & XS_SERVED)) {
959
                                nr_failed += 1;
960
                                report_request(received);
961
                        } else if (!chkchunk(req_data, received->datalen,
962
                                        req_target, received->targetlen, received->offset)) {
963
        //                        report_request(received);
964
                                nr_mismatch += 1;
965
                        }
966

    
967
                        if (xseg_put_request(xseg, received, srcport))
968
                                fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
969
                }
970

    
971
                if (!submitted && !received)
972
                        xseg_wait_signal(xseg, 1000000);
973

    
974
                if (nr_submitted % 1000 == 0 && !reported) {
975
                        reported = 1;
976
                        fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
977
                        nr_submitted, nr_received, nr_failed, nr_mismatch);
978
                }
979

    
980
                if (nr_received >= loops)
981
                        break;
982
        }
983

    
984
        fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
985
                nr_submitted, nr_received, nr_failed, nr_mismatch);
986
        return 0;
987
}
988

    
989
int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
990
{
991
        if (loops < 0)
992
                return help();
993

    
994
        struct xseg_request *submitted = NULL, *received;
995
        long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
996
        int reported = 0, r;
997
        uint64_t offset;
998
        uint32_t targetlen = 10, chunksize = 4096;
999
        struct timeval tv1, tv2;
1000
        xport p;
1001
        char *req_data, *req_target;
1002

    
1003
        xseg_bind_port(xseg, srcport, NULL);
1004

    
1005
        gettimeofday(&tv1, NULL);
1006
        for (;;) {
1007
                submitted = NULL;
1008
                xseg_prepare_wait(xseg, srcport);
1009
                if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
1010
                    (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
1011
                        xseg_cancel_wait(xseg, srcport);
1012
                        r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
1013
                        if (r < 0) {
1014
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
1015
                                        targetlen, chunksize);
1016
                                xseg_put_request(xseg, submitted, srcport);
1017
                                return -1;
1018
                        }
1019
                        
1020
                        //FIXME
1021
                        ++nr_flying;
1022
                        nr_submitted += 1;
1023
                        reported = 0;
1024
                        offset = 0;//pick(size);
1025

    
1026
                        submitted->offset = offset;
1027
                        submitted->size = chunksize;
1028
                        req_target = xseg_get_target(xseg, submitted);
1029
                        req_data = xseg_get_data(xseg, submitted);
1030

    
1031
                        if (op == 0)
1032
                                submitted->op = X_INFO;
1033
                        else if (op == 1)
1034
                                submitted->op = X_READ;
1035
                        else if (op == 2) {
1036
                                submitted->op = X_WRITE;
1037
                                mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
1038
                        }
1039

    
1040
                        p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
1041
                        if ( p != NoPort){
1042
                                if (xseg_signal(xseg, p) < 0)
1043
                                        perror("Cannot signal peer");
1044
                        }
1045
                }
1046
                received = xseg_receive(xseg, srcport, 0);
1047
                if (received) {
1048
                        xseg_cancel_wait(xseg, srcport);
1049
                        --nr_flying;
1050
                        if (nr_received == 0)
1051
                                fprintf(stderr, "latency (time for the first req to complete): %llu usecs\n",
1052
                                        (unsigned long long)received->elapsed);
1053
                        nr_received += 1;
1054
                        if (!(received->state & XS_SERVED)) {
1055
                                nr_failed += 1;
1056
                                //report_request(received);
1057
                        }
1058

    
1059
                        if (xseg_put_request(xseg, received, srcport))
1060
                                fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
1061
                }
1062

    
1063
                if (!submitted && !received)
1064
                        xseg_wait_signal(xseg, 10000000L);
1065

    
1066
                if (nr_received >= loops)
1067
                        break;
1068
        }
1069
        gettimeofday(&tv2, NULL);
1070

    
1071
        fprintf(stderr, "submitted %ld, received %ld, failed %ld, mismatched %ld\n",
1072
                nr_submitted, nr_received, nr_failed, nr_mismatch);
1073
        long t = (tv2.tv_sec - tv1.tv_sec)*1000000 + (tv2.tv_usec - tv1.tv_usec);
1074
        fprintf(stderr, "elpased time: %lf secs, throughput: %lf reqs/sec\n", (double) t / 1000000.0, (double) nr_submitted / (t / 1000000.0));
1075

    
1076
        return 0;
1077
}
1078

    
1079
static void lock_status(struct xlock *lock, char *buf, int len)
1080
{
1081
        int r;
1082
        if (lock->owner == Noone)
1083
                r = snprintf(buf, len, "Locked: No");
1084
        else
1085
                r = snprintf(buf, len, "Locked: Yes (Owner: %lu)", lock->owner);
1086
        if (r >= len)
1087
                buf[len-1] = 0;
1088
}
1089

    
1090
int cmd_report(uint32_t portno)
1091
{
1092
        char fls[64], rls[64], pls[64]; // buffer to store lock status
1093
        struct xseg_port *port = xseg_get_port(xseg, portno);
1094
        if (!port) {
1095
                printf("port %u is not assigned\n", portno);
1096
                return 0;
1097
        }
1098
        struct xq *fq, *rq, *pq;
1099
        fq = xseg_get_queue(xseg, port, free_queue);
1100
        rq = xseg_get_queue(xseg, port, request_queue);
1101
        pq = xseg_get_queue(xseg, port, reply_queue);
1102
        lock_status(&port->fq_lock, fls, 64);
1103
        lock_status(&port->rq_lock, rls, 64);
1104
        lock_status(&port->pq_lock, pls, 64);
1105
        fprintf(stderr, "port %u:\n"
1106
                "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
1107
                "       free_queue [%p] count : %4llu | %s\n"
1108
                "    request_queue [%p] count : %4llu | %s\n"
1109
                "      reply_queue [%p] count : %4llu | %s\n",
1110
                portno, (unsigned long long)port->alloc_reqs, 
1111
                (unsigned long long)port->max_alloc_reqs,
1112
                xseg->src_gw[portno],
1113
                xseg->dst_gw[portno],
1114
                (void *)fq, (unsigned long long)xq_count(fq), fls,
1115
                (void *)rq, (unsigned long long)xq_count(rq), rls,
1116
                (void *)pq, (unsigned long long)xq_count(pq), pls);
1117
        return 0;
1118
}
1119

    
1120
int cmd_join(void)
1121
{
1122
        if (xseg)
1123
                return 0;
1124

    
1125
        xseg = xseg_join(cfg.type, cfg.name, "posix", NULL);
1126
        if (!xseg) {
1127
                fprintf(stderr, "cannot join segment!\n");
1128
                return -1;
1129
        }
1130
        return 0;
1131
}
1132
static void print_hanlder(char *name, struct xobject_h *obj_h)
1133
{
1134
        char ls[64];
1135
        lock_status(&obj_h->lock, ls, 64);
1136
        fprintf(stderr, "%20s: free: %4llu, allocated: %4llu, allocated space: %7llu (object size: %llu), Lock %s\n",
1137
                        name,
1138
                        (unsigned long long) obj_h->nr_free,
1139
                        (unsigned long long) obj_h->nr_allocated,
1140
                        (unsigned long long) obj_h->allocated_space,
1141
                        (unsigned long long) obj_h->obj_size, ls);
1142
}
1143

    
1144
//FIXME ugly
1145
static void print_heap(struct xseg *xseg)
1146
{
1147
        char *UNIT[4];
1148
        UNIT[0] = "B";
1149
        UNIT[1] = "KiB";
1150
        UNIT[2] = "MiB";
1151
        UNIT[3] = "GiB";
1152
        uint64_t MULT[4];
1153
        MULT[0] = 1;
1154
        MULT[1] = 1024;
1155
        MULT[2] = 1024*1024;
1156
        MULT[3] = 1024*1024*1024;
1157

    
1158
        int u;
1159
        uint64_t t;
1160
        fprintf(stderr, "Heap usage: ");
1161
        u = 0;
1162
        t = xseg->heap->cur;
1163
        while (t > 0) {
1164
                t /= 1024;
1165
                u++;
1166
        }
1167
        if (!t)
1168
                u--;
1169
        t = xseg->heap->cur / MULT[u];
1170
        if (t < 10){
1171
                float tf = ((float)(xseg->heap->cur))/((float)MULT[u]);
1172
                fprintf(stderr, "%2.1f %s/", tf, UNIT[u]);
1173
        }
1174
        else {
1175
                unsigned int tu = xseg->heap->cur / MULT[u];
1176
                fprintf(stderr, "%3u %s/", tu, UNIT[u]);
1177
        }
1178

    
1179
        u = 0;
1180
        t = xseg->config.heap_size;
1181
        while (t > 0) {
1182
                t /= 1024;
1183
                u++;
1184
        }
1185
        if (!t)
1186
                u--;
1187
        t = xseg->config.heap_size/MULT[u];
1188
        if (t < 10){
1189
                float tf = ((float)(xseg->config.heap_size))/(float)MULT[u];
1190
                fprintf(stderr, "%2.1f %s ", tf, UNIT[u]);
1191
        }
1192
        else {
1193
                unsigned int tu = xseg->config.heap_size / MULT[u];
1194
                fprintf(stderr, "%3u %s ", tu, UNIT[u]);
1195
        }
1196
        char ls[64];
1197
        lock_status(&xseg->heap->lock, ls, 64);
1198
        fprintf(stderr, "(%llu / %llu), %s\n",
1199
                        (unsigned long long)xseg->heap->cur,
1200
                        (unsigned long long)xseg->config.heap_size,
1201
                        ls);
1202
}
1203

    
1204
int cmd_reportall(void)
1205
{
1206
        uint32_t t;
1207

    
1208
        if (cmd_join())
1209
                return -1;
1210

    
1211
        fprintf(stderr, "Segment lock: %s\n",
1212
                (xseg->shared->flags & XSEG_F_LOCK) ? "Locked" : "Unlocked");
1213
        print_heap(xseg);
1214
        /* fprintf(stderr, "Heap usage: %llu / %llu\n", */
1215
        /*                 (unsigned long long)xseg->heap->cur, */
1216
        /*                 (unsigned long long)xseg->config.heap_size); */
1217
        fprintf(stderr, "Handlers: \n");
1218
        print_hanlder("Requests handler", xseg->request_h);
1219
        print_hanlder("Ports handler", xseg->port_h);
1220
        print_hanlder("Objects handler", xseg->object_handlers);
1221
        fprintf(stderr, "\n");
1222

    
1223
        for (t = 0; t < xseg->config.nr_ports; t++)
1224
                cmd_report(t);
1225

    
1226
        return 0;
1227
}
1228

    
1229

    
1230
int finish_req(struct xseg_request *req, enum req_action action)
1231
{
1232
        if (action == COMPLETE){
1233
                req->state &= ~XS_FAILED;
1234
                req->state |= XS_SERVED;
1235
        } else {
1236
                req->state |= XS_FAILED;
1237
                req->state &= ~XS_SERVED;
1238
        }
1239
        req->serviced = 0;
1240
        xport p = xseg_respond(xseg, req, srcport, X_ALLOC);
1241
        if (p == NoPort)
1242
                xseg_put_request(xseg, req, srcport);
1243
        else
1244
                xseg_signal(xseg, p);
1245
        return 0;
1246
}
1247

    
1248
//FIXME this should be in xseg lib?
1249
static int isDangling(struct xseg_request *req)
1250
{
1251
        xport i;
1252
        struct xseg_port *port;
1253
        for (i = 0; i < xseg->config.nr_ports; i++) {
1254
                if (xseg->ports[i]){
1255
                        port = xseg_get_port(xseg, i);
1256
                        if (!port){
1257
                                fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1258
                                continue;
1259
                        }
1260
                        struct xq *fq, *rq, *pq;
1261
                        fq = xseg_get_queue(xseg, port, free_queue);
1262
                        rq = xseg_get_queue(xseg, port, request_queue);
1263
                        pq = xseg_get_queue(xseg, port, reply_queue);
1264
                        xlock_acquire(&port->fq_lock, srcport);
1265
                        if (__xq_check(fq, XPTR_MAKE(req, xseg->segment))){
1266
                                        xlock_release(&port->fq_lock);
1267
                                        return 0;
1268
                        }
1269
                        xlock_release(&port->fq_lock);
1270
                        xlock_acquire(&port->rq_lock, srcport);
1271
                        if (__xq_check(rq, XPTR_MAKE(req, xseg->segment))){
1272
                                        xlock_release(&port->rq_lock);
1273
                                        return 0;
1274
                        }
1275
                        xlock_release(&port->rq_lock);
1276
                        xlock_acquire(&port->pq_lock, srcport);
1277
                        if (__xq_check(pq, XPTR_MAKE(req, xseg->segment))){
1278
                                        xlock_release(&port->pq_lock);
1279
                                        return 0;
1280
                        }
1281
                        xlock_release(&port->pq_lock);
1282
                }
1283
        }
1284
        return 1;
1285
}
1286

    
1287
int prompt_user(char *msg)
1288
{
1289
        int c = 0, r = -1;
1290
        printf("%s [y/n]: ", msg);
1291
        while (1) {
1292
                c = fgetc(stdin);
1293
                if (c == 'y' || c == 'Y')
1294
                        r = 1;
1295
                else if (c == 'n' || c == 'N')
1296
                        r = 0;
1297
                else if (c == '\n'){
1298
                        if (r == -1)
1299
                                printf("%s [y/n]: ", msg);
1300
                        else
1301
                                break;
1302
                }
1303
        }
1304
        return r;
1305
}
1306

    
1307
//FIXME this should be in xseg lib?
1308
int cmd_verify(int fix)
1309
{
1310
        if (cmd_join())
1311
                return -1;
1312
        //segment lock
1313
        if (xseg->shared->flags & XSEG_F_LOCK){
1314
                fprintf(stderr, "Segment lock: Locked\n");
1315
                if (fix && prompt_user("Unlock it ?"))
1316
                        xseg->shared->flags &= ~XSEG_F_LOCK;
1317
        }
1318
        //heap lock
1319
        if (xseg->heap->lock.owner != Noone){
1320
                fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
1321
                        (unsigned long long)xseg->heap->lock.owner);
1322
                if (fix && prompt_user("Unlock it ?"))
1323
                        xlock_release(&xseg->heap->lock);
1324
        }
1325
        //obj_h locks
1326
        if (xseg->request_h->lock.owner != Noone){
1327
                fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
1328
                        (unsigned long long)xseg->request_h->lock.owner);
1329
                if (fix && prompt_user("Unlock it ?"))
1330
                        xlock_release(&xseg->request_h->lock);
1331
        }
1332
        if (xseg->port_h->lock.owner != Noone){
1333
                fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
1334
                        (unsigned long long)xseg->port_h->lock.owner);
1335
                if (fix && prompt_user("Unlock it ?"))
1336
                        xlock_release(&xseg->port_h->lock);
1337
        }
1338
        if (xseg->object_handlers->lock.owner != Noone){
1339
                fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
1340
                        (unsigned long long)xseg->object_handlers->lock.owner);
1341
                if (fix && prompt_user("Unlock it ?"))
1342
                        xlock_release(&xseg->object_handlers->lock);
1343
        }
1344
        //take segment lock?
1345
        xport i;
1346
        struct xseg_port *port;
1347
        for (i = 0; i < xseg->config.nr_ports; i++) {
1348
                if (xseg->ports[i]){
1349
                        port = xseg_get_port(xseg, i);
1350
                        if (!port){
1351
                                fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
1352
                                continue;
1353
                        }
1354
                        if (port->fq_lock.owner != Noone) {
1355
                                fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
1356
                                                i, (unsigned long long)port->fq_lock.owner);
1357
                                if (fix && prompt_user("Unlock it ?"))
1358
                                        xlock_release(&port->fq_lock);
1359
                        }
1360
                        if (port->rq_lock.owner != Noone) {
1361
                                fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
1362
                                                i, (unsigned long long)port->rq_lock.owner);
1363
                                if (fix && prompt_user("Unlock it ?"))
1364
                                        xlock_release(&port->rq_lock);
1365
                        }
1366
                        if (port->pq_lock.owner != Noone) {
1367
                                fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
1368
                                                i, (unsigned long long)port->pq_lock.owner);
1369
                                if (fix && prompt_user("Unlock it ?"))
1370
                                        xlock_release(&port->pq_lock);
1371
                        }
1372
                }
1373
        }
1374

    
1375
        struct xobject_h *obj_h = xseg->request_h;
1376
        struct xobject_iter it;
1377

    
1378
        struct xseg_request *req;
1379
        xlock_acquire(&obj_h->lock, srcport);
1380
        xobj_iter_init(obj_h, &it);
1381
        while (xobj_iterate(obj_h, &it, (void **)&req)){
1382
                //FIXME this will not work cause obj->magic - req->serial is not
1383
                //touched when a request is get
1384
                /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
1385
                if (isDangling(req) && !__xobj_isFree(obj_h, req)){
1386
                        report_request(req);
1387
                        if (fix && prompt_user("Fail it ?")){
1388
                                printf("Finishing ...\n");
1389
                                finish_req(req, FAIL);
1390
                        }
1391
                }
1392
        }
1393
        xlock_release(&obj_h->lock);
1394
        return 0;
1395
}
1396

    
1397
int cmd_inspectq(xport portno, enum queue qt)
1398
{
1399
        if (cmd_join())
1400
                return -1;
1401

    
1402
        struct xq *q;
1403
        struct xlock *l;
1404
        struct xseg_port *port = xseg_get_port(xseg, portno);
1405
        if (!port)
1406
                return -1;
1407
        if (qt == FREE_QUEUE){
1408
                q = xseg_get_queue(xseg, port, free_queue);
1409
                l = &port->fq_lock;
1410
        }
1411
        else if (qt == REQUEST_QUEUE){
1412
                q = xseg_get_queue(xseg, port, request_queue);
1413
                l = &port->rq_lock;
1414
        }
1415
        else if (qt == REPLY_QUEUE) {
1416
                q = xseg_get_queue(xseg, port, reply_queue);
1417
                l = &port->rq_lock;
1418
        }
1419
        else
1420
                return -1;
1421
        xlock_acquire(l, srcport);
1422
        xqindex i,c = xq_count(q);
1423
        if (c) {
1424
                struct xseg_request *req;
1425
                xptr xqi;
1426
                for (i = 0; i < c; i++) {
1427
                        xqi = __xq_pop_head(q);
1428
                        req = XPTR_TAKE(xqi, xseg->segment);
1429
                        report_request(req);
1430
                        __xq_append_tail(q, xqi);
1431
                }
1432
        }
1433
        else {
1434
                fprintf(stderr, "Queue is empty\n\n");
1435
        }
1436
        xlock_release(l);
1437
        return 0;
1438
}
1439

    
1440

    
1441
int cmd_request(struct xseg_request *req, enum req_action action)
1442
{
1443
        if (cmd_join())
1444
                return -1;
1445

    
1446
        struct xobject_h *obj_h = xseg->request_h;
1447
        if (!xobj_check(obj_h, req))
1448
                return -1;
1449

    
1450
        if (action == REPORT)
1451
                report_request(req);
1452
        else if (action == FAIL){
1453
                report_request(req);
1454
                if (prompt_user("fail it ?")){
1455
                        printf("Finishing ...\n");
1456
                        finish_req(req, FAIL);
1457
                }
1458
        }
1459
        else if (action == COMPLETE){
1460
                report_request(req);
1461
                if (prompt_user("Complete it ?")){
1462
                        printf("Finishing ...\n");
1463
                        finish_req(req, COMPLETE);
1464
                }
1465
        }
1466
        return 0;
1467
}
1468

    
1469
int cmd_create(void)
1470
{
1471
        int r = xseg_create(&cfg);
1472
        if (r) {
1473
                fprintf(stderr, "cannot create segment!\n");
1474
                return -1;
1475
        }
1476

    
1477
        fprintf(stderr, "Segment initialized.\n");
1478
        return 0;
1479
}
1480

    
1481
int cmd_destroy(void)
1482
{
1483
        if (!xseg && cmd_join())
1484
                return -1;
1485
        xseg_leave(xseg);
1486
        xseg_destroy(xseg);
1487
        xseg = NULL;
1488
        fprintf(stderr, "Segment destroyed.\n");
1489
        return 0;
1490
}
1491

    
1492
int cmd_alloc_requests(unsigned long nr)
1493
{
1494
        return xseg_alloc_requests(xseg, srcport, nr);
1495
}
1496

    
1497
int cmd_free_requests(unsigned long nr)
1498
{
1499
        return xseg_free_requests(xseg, srcport, nr);
1500
}
1501

    
1502
int cmd_put_requests(void)
1503
{
1504
        struct xseg_request *req;
1505

    
1506
        for (;;) {
1507
                req = xseg_accept(xseg, dstport, 0);
1508
                if (!req)
1509
                        break;
1510
                if (xseg_put_request(xseg, req, srcport))
1511
                        fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
1512
        }
1513

    
1514
        return 0;
1515
}
1516

    
1517
int cmd_finish(unsigned long nr, int fail)
1518
{
1519
        struct xseg_request *req;
1520
        char *buf = malloc(sizeof(char) * 8128);
1521
        char *req_target, *req_data;
1522
        xseg_bind_port(xseg, srcport, NULL);
1523
        xport p;
1524

    
1525
        for (; nr--;) {
1526
                xseg_prepare_wait(xseg, srcport);
1527
                req = xseg_accept(xseg, srcport, 0);
1528
                if (req) {
1529
                        req_target = xseg_get_target(xseg, req);
1530
                        req_data = xseg_get_data(xseg, req);
1531
                        xseg_cancel_wait(xseg, srcport);
1532
                        if (fail == 1)
1533
                                req->state &= ~XS_SERVED;
1534
                        else {
1535
                                if (req->op == X_READ)
1536
                                        mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
1537
                                else if (req->op == X_WRITE) 
1538
                                        memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
1539
                                else if (req->op == X_INFO)
1540
                                        *((uint64_t *) req->data) = 4294967296;
1541
                                
1542
                                req->state |= XS_SERVED;
1543
                                req->serviced = req->size;
1544
                        }
1545

    
1546
                        p = xseg_respond(xseg, req, srcport, X_ALLOC);
1547
                        xseg_signal(xseg, p);
1548
                        continue;
1549
                }
1550
                ++nr;
1551
                xseg_wait_signal(xseg, 10000000L);
1552
        }
1553

    
1554
        free(buf);
1555

    
1556
        return 0;
1557
}
1558

    
1559
void handle_reply(struct xseg_request *req)
1560
{
1561
        char *req_data = xseg_get_data(xseg, req);
1562
        char *req_target = xseg_get_target(xseg, req);
1563
        if (!(req->state & XS_SERVED)) {
1564
                report_request(req);
1565
                goto put;
1566
        }
1567

    
1568
        switch (req->op) {
1569
        case X_READ:
1570
                fwrite(req_data, 1, req->datalen, stdout);
1571
                break;
1572

    
1573
        case X_WRITE:
1574
                fprintf(stdout, "wrote: ");
1575
                fwrite(req_data, 1, req->datalen, stdout);
1576
                break;
1577
        case X_SYNC:
1578
        case X_DELETE:
1579
                fprintf(stderr, "deleted %s\n", req_target);
1580
                break;
1581
        case X_TRUNCATE:
1582
        case X_COMMIT:
1583
        case X_CLONE:
1584
                fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
1585
                break;
1586
        case X_INFO:
1587
                fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
1588
                break;
1589
        case X_COPY:
1590
                fprintf(stderr, "copied %s\n", ((struct xseg_request_copy *)req_data)->target);
1591
                break;
1592
        case X_CLOSE:
1593
                fprintf(stderr, "Closed %s\n", req_target);
1594
        case X_OPEN:
1595
                fprintf(stderr, "Opened %s\n", req_target);
1596

    
1597
        default:
1598
                break;
1599
        }
1600

    
1601
put:
1602
        if (xseg_put_request(xseg, req, srcport))
1603
                fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
1604
}
1605

    
1606
int cmd_wait(uint32_t nr)
1607
{
1608
        struct xseg_request *req;
1609
        long ret;
1610
        init_local_signal(); 
1611

    
1612
        for (;;) {
1613
                req = xseg_receive(xseg, srcport, 0);
1614
                if (req) {
1615
                        handle_reply(req);
1616
                        nr--;
1617
                        if (nr == 0)
1618
                                break;
1619
                        continue;
1620
                }
1621

    
1622
                ret = xseg_prepare_wait(xseg, srcport);
1623
                if (ret)
1624
                        return -1;
1625

    
1626
                ret = xseg_wait_signal(xseg, 1000000);
1627
                ret = xseg_cancel_wait(xseg, srcport);
1628
                if (ret)
1629
                        return -1;
1630
        }
1631

    
1632
        return 0;
1633
}
1634

    
1635
int cmd_put_replies(void)
1636
{
1637
        struct xseg_request *req;
1638

    
1639
        for (;;) {
1640
                req = xseg_receive(xseg, dstport, 0);
1641
                if (!req)
1642
                        break;
1643
                fprintf(stderr, "request: %08llx%08llx\n"
1644
                        "     op: %u\n"
1645
                        "  state: %u\n",
1646
                        0LL, (unsigned long long)req->serial,
1647
                        req->op,
1648
                        req->state);
1649
                report_request(req);
1650

    
1651
                //fwrite(req->buffer, 1, req->bufferlen, stdout);
1652

    
1653
                if (xseg_put_request(xseg, req, srcport))
1654
                        fprintf(stderr, "Cannot put reply\n");
1655
        }
1656

    
1657
        return 0;
1658
}
1659

    
1660
int cmd_bind(long portno)
1661
{
1662
        struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
1663
        if (!port) {
1664
                fprintf(stderr, "failed to bind port %ld\n", portno);
1665
                return 1;
1666
        }
1667

    
1668
        fprintf(stderr, "bound port %u\n", xseg_portno(xseg, port));
1669
        return 0;
1670
}
1671

    
1672
int cmd_signal(uint32_t portno)
1673
{
1674
        return xseg_signal(xseg, portno);
1675
}
1676

    
1677
int parse_ports(char *str)
1678
{
1679
        int ret = 0;
1680
        char *s = str;
1681

    
1682
        for (;;) {
1683
                if (*s == 0)
1684
                        return 0;
1685

    
1686
                if (*s == ':') {
1687
                        *s = 0;
1688
                        if ((s > str) && isdigit(str[0])) {
1689
                                srcport = atol(str);
1690
                                ret ++;
1691
                        }
1692
                        break;
1693
                }
1694
                s ++;
1695
        }
1696

    
1697
        s += 1;
1698
        str = s;
1699

    
1700
        for (;;) {
1701
                if (*s == 0) {
1702
                        if ((s > str) && isdigit(str[0])) {
1703
                                dstport = atol(str);
1704
                                ret ++;
1705
                        }
1706
                        break;
1707
                }
1708
                s ++;
1709
        }
1710

    
1711
        return ret;
1712
}
1713

    
1714
int main(int argc, char **argv)
1715
{
1716
        int i, ret = 0;
1717
        char *spec;
1718

    
1719
        if (argc < 3)
1720
                return help();
1721

    
1722
        srcport = -1;
1723
        dstport = -1;
1724
        spec = argv[1];
1725

    
1726
        if (xseg_parse_spec(spec, &cfg)) {
1727
                fprintf(stderr, "Cannot parse spec\n");
1728
                return -1;
1729
        }
1730

    
1731
        if (xseg_initialize()) {
1732
                fprintf(stderr, "cannot initialize!\n");
1733
                return -1;
1734
        }
1735

    
1736
        for (i = 2; i < argc; i++) {
1737

    
1738
                if (!strcmp(argv[i], "create")) {
1739
                        ret = cmd_create();
1740
                        continue;
1741
                }
1742

    
1743
                if (!strcmp(argv[i], "join")) {
1744
                        ret = cmd_join();
1745
                        if (!ret)
1746
                                fprintf(stderr, "Segment joined.\n");
1747
                        continue;
1748
                }
1749

    
1750
                if (!strcmp(argv[i], "destroy")) {
1751
                        ret = cmd_destroy();
1752
                        continue;
1753
                }
1754

    
1755
                if (cmd_join())
1756
                        return -1;
1757

    
1758
                if (!strcmp(argv[i], "reportall")) {
1759
                        ret = cmd_reportall();
1760
                        continue;
1761
                }
1762

    
1763
                if (!strcmp(argv[i], "bind") && (i + 1 < argc)) {
1764
                        ret = cmd_bind(atol(argv[i+1]));
1765
                        i += 1;
1766
                        continue;
1767
                }
1768

    
1769
                if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
1770
                        ret = cmd_signal(atol(argv[i+1]));
1771
                        i += 1;
1772
                        continue;
1773
                }
1774

    
1775
                if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) {
1776
                        ret = cmd_bridge(atol(argv[i+1]),
1777
                                         atol(argv[i+2]),
1778
                                         argv[i+3],
1779
                                         argv[i+4]);
1780
                        i += 4;
1781
                        continue;
1782
                }
1783

    
1784
                if (srcport == -1) {
1785
                        if (!parse_ports(argv[i]))
1786
                                fprintf(stderr, "source port undefined: %s\n", argv[i]);
1787
                        continue;
1788
                }
1789

    
1790
                if (dstport == -1) {
1791
                        if (!parse_ports(argv[i]))
1792
                                fprintf(stderr, "destination port undefined: %s\n", argv[i]);
1793
                        continue;
1794
                }
1795

    
1796
                if (!strcmp(argv[i], "verify")) {
1797
                        ret = cmd_verify(0);
1798
                        continue;
1799
                }
1800

    
1801
                if (!strcmp(argv[i], "verify-fix")) {
1802
                        ret = cmd_verify(1);
1803
                        continue;
1804
                }
1805

    
1806
                if (!strcmp(argv[i], "failreq") && (i + 1 < argc)) {
1807
                        struct xseg_request *req;
1808
                        sscanf(argv[i+1], "%lx", &req);
1809
                        ret = cmd_request(req, FAIL);
1810
                        i += 1;
1811
                        continue;
1812
                }
1813

    
1814
                if (!strcmp(argv[i], "inspect-freeq") && (i + 1 < argc)) {
1815
                        ret = cmd_inspectq(atol(argv[i+1]), FREE_QUEUE);
1816
                        i += 1;
1817
                        continue;
1818
                }
1819

    
1820
                if (!strcmp(argv[i], "inspect-requestq") && (i + 1 < argc)) {
1821
                        ret = cmd_inspectq(atol(argv[i+1]), REQUEST_QUEUE);
1822
                        i += 1;
1823
                        continue;
1824
                }
1825

    
1826
                if (!strcmp(argv[i], "inspect-replyq") && (i + 1 < argc)) {
1827
                        ret = cmd_inspectq(atol(argv[i+1]), REPLY_QUEUE);
1828
                        i += 1;
1829
                        continue;
1830
                }
1831

    
1832
                if (!strcmp(argv[i], "report")) {
1833
                        ret = cmd_report(dstport);
1834
                        continue;
1835
                }
1836

    
1837
                if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) {
1838
                        ret = cmd_alloc_requests(atol(argv[i+1]));
1839
                        i += 1;
1840
                        continue;
1841
                }
1842

    
1843
                if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) {
1844
                        ret = cmd_free_requests(atol(argv[i+1]));
1845
                        i += 1;
1846
                        continue;
1847
                }
1848

    
1849
                if (!strcmp(argv[i], "put_requests")) {
1850
                        ret = cmd_put_requests();
1851
                        continue;
1852
                }
1853

    
1854
                if (!strcmp(argv[i], "put_replies")) {
1855
                        ret = cmd_put_replies();
1856
                        continue;
1857
                }
1858

    
1859
                if (!strcmp(argv[i], "complete") && (i + 1 < argc)) {
1860
                        ret = cmd_finish(atol(argv[i+1]), 0);
1861
                        i += 1;
1862
                        continue;
1863
                }
1864

    
1865
                if (!strcmp(argv[i], "fail") && (i + 1 < argc)) {
1866
                        ret = cmd_finish(atol(argv[i+1]), 1);
1867
                        i += 1;
1868
                        continue;
1869
                }
1870

    
1871
                if (!strcmp(argv[i], "wait") && (i + 1 < argc)) {
1872
                        ret = cmd_wait(atol(argv[i+1]));
1873
                        i += 1;
1874
                        continue;
1875
                }
1876

    
1877
                if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) {
1878
                        long nr_loops = atol(argv[i+1]);
1879
                        unsigned int seed = atoi(argv[i+2]);
1880
                        unsigned int targetlen = atoi(argv[i+3]);
1881
                        unsigned int chunksize = atoi(argv[i+4]);
1882
                        unsigned long objectsize = atol(argv[i+5]);
1883
                        ret = cmd_rndwrite(nr_loops, seed, targetlen, chunksize, objectsize);
1884
                        i += 5;
1885
                        continue;
1886
                }
1887
                
1888
                if (!strcmp(argv[i], "rnddelete") && (i + 3 < argc)) {
1889
                        long nr_loops = atol(argv[i+1]);
1890
                        unsigned int seed = atoi(argv[i+2]);
1891
                        unsigned int targetlen = atoi(argv[i+3]);
1892
                        ret = cmd_rnddelete(nr_loops, seed, targetlen);
1893
                        i += 3;
1894
                        continue;
1895
                }
1896

    
1897
                if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) {
1898
                        long nr_loops = atol(argv[i+1]);
1899
                        unsigned int seed = atoi(argv[i+2]);
1900
                        unsigned int targetlen = atoi(argv[i+3]);
1901
                        unsigned int chunksize = atoi(argv[i+4]);
1902
                        unsigned long objectsize = atol(argv[i+5]);
1903
                        ret = cmd_rndread(nr_loops, seed, targetlen, chunksize, objectsize);
1904
                        i += 5;
1905
                        continue;
1906
                }
1907

    
1908
                if (!strcmp(argv[i], "submit_reqs") && (i + 3 < argc)) {
1909
                        long nr_loops = atol(argv[i+1]);
1910
                        long concurrent_reqs = atol(argv[i+2]);
1911
                        int op = atoi(argv[i+3]);
1912
                        ret = cmd_submit_reqs(nr_loops, concurrent_reqs, op);
1913
                        i += 3;
1914
                        continue;
1915
                }
1916

    
1917
                if (!strcmp(argv[i], "read") && (i + 3 < argc)) {
1918
                        char *target = argv[i+1];
1919
                        uint64_t offset = atol(argv[i+2]);
1920
                        uint64_t size   = atol(argv[i+3]);
1921
                        ret = cmd_read(target, offset, size);
1922
                        i += 3;
1923
                        continue;
1924
                }
1925

    
1926
                if (!strcmp(argv[i], "write") && (i + 2 < argc)) {
1927
                        char *target = argv[i+1];
1928
                        uint64_t offset = atol(argv[i+2]);
1929
                        ret = cmd_write(target, offset);
1930
                        i += 2;
1931
                        continue;
1932
                }
1933

    
1934
                if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) {
1935
                        char *target = argv[i+1];
1936
                        uint64_t offset = atol(argv[i+2]);
1937
                        ret = cmd_truncate(target, offset);
1938
                        i += 2;
1939
                        continue;
1940
                }
1941

    
1942
                if (!strcmp(argv[i], "delete") && (i + 1 < argc)) {
1943
                        char *target = argv[i+1];
1944
                        ret = cmd_delete(target);
1945
                        i += 1;
1946
                        continue;
1947
                }
1948

    
1949
                if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) {
1950
                        char *target = argv[i+1];
1951
                        ret = cmd_acquire(target);
1952
                        i += 1;
1953
                        continue;
1954
                }
1955

    
1956
                if (!strcmp(argv[i], "release") && (i + 1 < argc)) {
1957
                        char *target = argv[i+1];
1958
                        ret = cmd_release(target);
1959
                        i += 1;
1960
                        continue;
1961
                }
1962

    
1963
                if (!strcmp(argv[i], "copy") && (i + 2) < argc) {
1964
                        char *src = argv[i+1];
1965
                        char *dst = argv[i+2];
1966
                        ret = cmd_copy(src, dst);
1967
                        i += 2;
1968
                        continue;
1969
                }
1970

    
1971
                if (!strcmp(argv[i], "clone") && (i + 2 < argc)) {
1972
                        char *src = argv[i+1];
1973
                        char *dst = argv[i+2];
1974
                        ret = cmd_clone(src, dst);
1975
                        i += 2;
1976
                        continue;
1977
                }
1978

    
1979
                if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
1980
                        char *target = argv[i+1];
1981
                        ret = cmd_info(target);
1982
                        i += 1;
1983
                        continue;
1984
                }
1985

    
1986

    
1987
                if (!parse_ports(argv[i]))
1988
                        fprintf(stderr, "invalid argument: %s\n", argv[i]);
1989
        }
1990

    
1991
        /* xseg_leave(); */
1992
        return ret;
1993
}