Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / filed.c @ 0072a4af

History | View | Annotate | Download (14.1 kB)

1
#define _GNU_SOURCE
2
#include <stdio.h>
3
#include <stdlib.h>
4
#include <sys/types.h>
5
#include <sys/stat.h>
6
#include <unistd.h>
7
#include <string.h>
8
#include <fcntl.h>
9
#include <errno.h>
10
#include <aio.h>
11
#include <signal.h>
12
#include <limits.h>
13
#include <xseg/xseg.h>
14
#include <pthread.h>
15

    
16
#define MAX_PATH_SIZE 255
17
#define MAX_FILENAME_SIZE 255
18

    
19
static int usage(void)
20
{
21
        printf("Usage: ./filed <path_to_directory> [options]\n"
22
                "Options: [-p portno]\n"
23
                "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
24
                "         [-n nr_parallel_ops]\n");
25
        return 1;
26
}
27

    
28
struct fsync_io {
29
        unsigned long cacheidx;
30
        int fd;
31
        uint64_t time;
32
};
33

    
34
struct io {
35
        struct store *store;
36
        struct xseg_request *req;
37
        ssize_t retval;
38
        long fdcacheidx;
39
        pthread_cond_t cond;
40
        pthread_mutex_t lock;
41
};
42

    
43
#define READY (1 << 1)
44

    
45
struct fdcache_node {
46
        volatile int fd;
47
        volatile unsigned int ref;
48
        volatile unsigned long time;
49
        volatile unsigned int flags;
50
        pthread_cond_t cond;
51
        char name[MAX_FILENAME_SIZE + 1];
52
};
53

    
54
struct store {
55
        struct xseg *xseg;
56
        struct xseg_port *xport;
57
        uint32_t portno;
58
        uint64_t size;
59
        struct io *ios;
60
        struct xq free_ops;
61
        char *free_bufs;
62
        long nr_ops;
63
        struct sigevent sigevent;
64
        int dirfd;
65
        uint32_t path_len;
66
        uint64_t handled_reqs;
67
        unsigned long maxfds;
68
        struct fdcache_node *fdcache;
69
        pthread_t *iothread;
70
        pthread_mutex_t cache_lock;
71
        char path[MAX_PATH_SIZE + 1];
72
};
73

    
74
static unsigned long sigaction_count;
75

    
76
static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
77
{
78
        sigaction_count++;
79
}
80

    
81
static void log_io(char *msg, struct io *io)
82
{
83
        char name[64], data[64];
84
        /* null terminate name in case of req->name is less than 63 characters,
85
         * and next character after name (aka first byte of next buffer) is not
86
         * null
87
         */
88
        unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
89
        strncpy(name, io->req->name, end);
90
        name[end] = 0;
91
        strncpy(data, io->req->data, 63);
92
        data[63] = 0;
93
#if 0
94
vkoukis debug off
95
        fprintf(stderr,
96
                "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
97
                "name[%u]: '%s', data[%llu]:\n%s------------------\n\n",
98
                msg,
99
                (unsigned int)io->fdcacheidx, //this is cacheidx not fd
100
                (unsigned int)io->req->op,
101
                (unsigned long long)io->req->offset,
102
                (unsigned long)io->req->size,
103
                (unsigned long)io->retval,
104
                (unsigned int)io->req->state,
105
                (unsigned int)io->req->namesize, name,
106
                (unsigned long long)io->req->datasize, data);
107
#endif
108
}
109

    
110
static struct io *alloc_io(struct store *store)
111
{
112
        xqindex idx = xq_pop_head(&store->free_ops);
113
        if (idx == None)
114
                return NULL;
115
        return store->ios + idx;
116
}
117

    
118
static inline void free_io(struct store *store, struct io *io)
119
{
120
        xqindex idx = io - store->ios;
121
        io->req = NULL;
122
        xq_append_head(&store->free_ops, idx);
123
}
124

    
125

    
126
static void complete(struct store *store, struct io *io)
127
{
128
        struct xseg_request *req = io->req;
129
        req->state |= XS_SERVED;
130
        log_io("complete", io);
131
        xseg_respond(store->xseg, req->portno, req);
132
        xseg_signal(store->xseg, req->portno);
133
        __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
134
}
135

    
136
static void fail(struct store *store, struct io *io)
137
{
138
        struct xseg_request *req = io->req;
139
        req->state |= XS_FAILED;
140
        log_io("fail", io);
141
        xseg_respond(store->xseg, req->portno, req);
142
        xseg_signal(store->xseg, req->portno);
143
        if (io->fdcacheidx >= 0) {
144
                __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
145
        }
146
}
147

    
148
static void pending(struct store *store, struct io *io)
149
{
150
        io->req->state = XS_PENDING;
151
}
152

    
153
static void handle_unknown(struct store *store, struct io *io)
154
{
155
        struct xseg_request *req = io->req;
156
        snprintf(req->data, req->datasize, "unknown request op");
157
        fail(store, io);
158
}
159

    
160
static inline void prepare_io(struct store *store, struct io *io)
161
{
162
}
163

    
164

    
165
static int dir_open(        struct store *store, struct io *io,
166
                        char *name, uint32_t namesize, int mode        )
167
{
168
        int fd = -1, r;
169
        struct fdcache_node *ce = NULL;
170
        long i, lru;
171
        uint64_t min;
172
        io->fdcacheidx = -1;
173
        if (namesize > MAX_FILENAME_SIZE)
174
                goto out_err;
175

    
176
start:
177
        /* check cache */
178
        pthread_mutex_lock(&store->cache_lock);
179
start_locked:
180
        lru = -1;
181
        min = UINT64_MAX;
182
        for (i = 0; i < store->maxfds; i++) {
183
                if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
184
                                && (store->fdcache[i].flags & READY)) {
185
                        min = store->fdcache[i].time;
186
                        lru = i;
187

    
188
                }
189
                if (!strncmp(store->fdcache[i].name, name, namesize)) {
190
                        if (store->fdcache[i].name[namesize] == 0) {
191
                                ce = &store->fdcache[i];
192
                                /* if any other io thread is currently opening
193
                                 * the file, block until it succeeds or fails
194
                                 */
195
                                if (!(ce->flags & READY)) {
196
                                        pthread_cond_wait(&ce->cond, &store->cache_lock);
197
                                        /* when ready, restart lookup */
198
                                        goto start_locked;
199
                                }
200
                                /* if successfully opened */
201
                                if (ce->fd > 0) {
202
                                        fd = store->fdcache[i].fd;
203
                                        io->fdcacheidx = i;
204
                                        goto out;
205
                                }
206
                                /* else open failed for the other io thread, so
207
                                 * it should fail for everyone waiting on this
208
                                 * file.
209
                                 */
210
                                else {
211
                                        fd = -1;
212
                                        io->fdcacheidx = -1;
213
                                        goto out_err_unlock;
214
                                }
215
                        }
216
                }
217
        }
218
        if (lru < 0){
219
                /* all cache entries are currently being used */
220
                pthread_mutex_unlock(&store->cache_lock);
221
                goto start;
222
        }
223
        if (store->fdcache[lru].ref){
224
                fd = -1;
225
                printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
226
                goto out_err_unlock;
227
        }
228
        /* make room for new file */
229
        ce = &store->fdcache[lru];
230
        /* set name here and state to not ready, for any other requests on the
231
         * same target that may follow
232
         */
233
        strncpy(ce->name, name, namesize);
234
        ce->name[namesize] = 0;
235
        ce->flags &= ~READY;
236
        pthread_mutex_unlock(&store->cache_lock);
237

    
238
        if (ce->fd >0){
239
                if (close(ce->fd) < 0){
240
                        perror("close");
241
                }
242
        }
243
        fd = openat(store->dirfd, ce->name, O_RDWR);        
244
        if (fd < 0) {
245
                if (errno == ENOENT){
246
                        fd = openat(store->dirfd, ce->name, 
247
                                        O_RDWR | O_CREAT, 0600);
248
                        if (fd >= 0)
249
                                goto new_entry;
250
                }
251
                perror(store->path);
252
                /* insert in cache a negative fd to indicate opening error to
253
                 * any other ios waiting for the file to open
254
                 */
255
        }        
256
        /* insert in cache */
257
new_entry:
258
        pthread_mutex_lock(&store->cache_lock);
259
        ce->fd = fd;
260
        ce->ref = 0;
261
        ce->flags = READY;
262
        pthread_cond_broadcast(&ce->cond);
263
        if (fd > 0) {
264
                io->fdcacheidx = lru;
265
        }
266
        else {
267
                io->fdcacheidx = -1;
268
                goto out_err_unlock;
269
        }
270

    
271
out:
272
        store->handled_reqs++;
273
        ce->time = store->handled_reqs;
274
        __sync_fetch_and_add(&ce->ref, 1);
275
        pthread_mutex_unlock(&store->cache_lock);
276
out_err:
277
        return fd;
278

    
279
out_err_unlock:
280
        pthread_mutex_unlock(&store->cache_lock);
281
        goto out_err;
282
}
283

    
284
static void handle_read_write(struct store *store, struct io *io)
285
{
286
        int r, fd, mode;
287
        struct xseg_request *req = io->req;
288

    
289
        if (req->op == X_WRITE)
290
                mode = 1;
291
        else
292
                mode = 0;
293
        fd = dir_open(store, io, req->name, req->namesize, mode);
294
        if (fd < 0){
295
                perror("dir_open");
296
                fail(store, io);
297
                return;
298
        }
299

    
300
        if (req != io->req)
301
                printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
302
        if (!req->size) {
303
                if (req->flags & (XF_FLUSH | XF_FUA)) {
304
                        /* No FLUSH/FUA support yet (O_SYNC ?).
305
                         * note that with FLUSH/size == 0 
306
                         * there will probably be a (uint64_t)-1 offset */
307
                        complete(store, io);
308
                        return;
309
                } else {
310
                        complete(store, io);
311
                        return;
312
                }
313
        }
314

    
315

    
316
        prepare_io(store, io);
317

    
318
        switch (req->op) {
319
        case X_READ:
320
                while (req->serviced < req->datasize) {
321
                        r = pread(fd, req->data + req->serviced, 
322
                                        req->datasize - req->serviced,
323
                                               req->offset + req->serviced);
324
                        if (r < 0) {
325
                                req->datasize = req->serviced;
326
                                perror("pread");
327
                        }
328
                        else if (r == 0) {
329
                                /* reached end of file. zero out the rest data buffer */
330
                                memset(req->data + req->serviced, 0, req->datasize - req->serviced);
331
                                req->serviced = req->datasize;
332
                        }
333
                        else {
334
                                req->serviced += r;
335
                        }
336
                }
337
                break;
338
        case X_WRITE:
339
                while (req->serviced < req->datasize) {
340
                        r = pwrite(fd, req->data + req->serviced, 
341
                                        req->datasize - req->serviced,
342
                                               req->offset + req->serviced);
343
                        if (r < 0) {
344
                                req->datasize = req->serviced;
345
                        }
346
                        else if (r == 0) {
347
                                /* reached end of file. zero out the rest data buffer */
348
                                memset(req->data + req->serviced, 0, req->datasize - req->serviced);
349
                                req->serviced = req->datasize;
350
                        }
351
                        else {
352
                                req->serviced += r;
353
                        }
354
                }
355
                r = fsync(fd);
356
                if (r< 0) {
357
                        perror("fsync");
358
                        /* if fsync fails, then no bytes serviced correctly */
359
                        req->serviced = 0;
360
                }
361
                break;
362
        default:
363
                snprintf(req->data, req->datasize,
364
                         "wtf, corrupt op %u?\n", req->op);
365
                fail(store, io);
366
                return;
367
        }
368

    
369
        if (req->serviced > 0 ) {
370
                complete(store, io);
371
        }
372
        else {
373
                strerror_r(errno, req->data, req->datasize);
374
                fail(store, io);
375
        }
376
        return;
377
}
378

    
379
static void handle_info(struct store *store, struct io *io)
380
{
381
        struct xseg_request *req = io->req;
382
        struct stat stat;
383
        int fd, r;
384
        off_t size;
385

    
386
        fd = dir_open(store, io, req->name, req->namesize, 0);
387
        if (fd < 0) {
388
                fail(store, io);
389
                return;
390
        }
391
        r = fstat(fd, &stat);
392
        if (r < 0) {
393
                perror("fstat");
394
                fail(store, io);
395
                return;
396
        }
397
        size = stat.st_size;
398
        *((off_t *) req->data) = size;
399
        req->datasize = sizeof(size);
400

    
401
        complete(store, io);
402
}
403

    
404
static void dispatch(struct store *store, struct io *io)
405
{
406
        printf("io: 0x%p, req: 0x%p, op %u\n",
407
                (void *)io, (void *)io->req, io->req->op);
408
        switch (io->req->op) {
409
        case X_READ:
410
        case X_WRITE:
411
                handle_read_write(store, io); break;
412
        case X_INFO:
413
                handle_info(store, io); break;
414
        case X_SYNC:
415
        default:
416
                handle_unknown(store, io);
417
        }
418
}
419

    
420
static void handle_accepted(struct store *store, struct io *io)
421
{
422
        struct xseg_request *req = io->req;
423
        req->serviced = 0;
424
        req->state = XS_ACCEPTED;
425
        io->retval = 0;
426
        dispatch(store, io);
427
}
428

    
429
static struct io* wake_up_next_iothread(struct store *store)
430
{
431
        struct io *io = alloc_io(store);
432
        if (io){        
433
                pthread_cond_signal(&io->cond);
434
        }
435
        return io;
436
}
437

    
438
void *io_loop(void *arg)
439
{
440
        struct io *io = (struct io *) arg;
441
        struct store *store = io->store;
442
        struct xseg *xseg = store->xseg;
443
        uint32_t portno = store->portno;
444
        struct xseg_request *accepted;
445

    
446
        for (;;) {
447
                accepted = NULL;
448
                accepted = xseg_accept(xseg, portno);
449
                if (accepted) {
450
                        io->req = accepted;
451
                        wake_up_next_iothread(store);
452
                        handle_accepted(store, io);
453
                }
454
                else {
455
                        free_io(store, io);
456
                        pthread_mutex_lock(&io->lock);
457
                        pthread_cond_wait(&io->cond, &io->lock);
458
                        pthread_mutex_unlock(&io->lock);
459
                }
460
        }
461

    
462
        return NULL;
463
}
464

    
465
static struct xseg *join(char *spec)
466
{
467
        struct xseg_config config;
468
        struct xseg *xseg;
469

    
470
        (void)xseg_parse_spec(spec, &config);
471
        xseg = xseg_join(config.type, config.name);
472
        if (xseg)
473
                return xseg;
474

    
475
        fprintf(stderr, "Failed to join xseg, creating it...\n");
476
        (void)xseg_create(&config);
477
        return xseg_join(config.type, config.name);
478
}
479

    
480
static int filed_loop(struct store *store)
481
{
482
        struct xseg *xseg = store->xseg;
483
        uint32_t portno = store->portno;
484
        struct io *io;
485

    
486
        for (;;) {
487
                io = wake_up_next_iothread(store);
488
                xseg_prepare_wait(xseg, portno);
489
                xseg_wait_signal(xseg, portno, 10000);
490
        }
491
        return 0;
492
}
493

    
494
static int filed(        char *path, unsigned long size, uint32_t nr_ops,
495
                        char *spec, long portno        )
496
{
497
        struct stat stat;
498
        struct sigaction sa;
499
        struct store *store;
500
        int r, mode, i;
501
        void *status;
502

    
503
        store = malloc(sizeof(struct store));
504
        if (!store) {
505
                perror("malloc");
506
                return -1;
507
        }
508

    
509

    
510
        /*
511
        r = daemon(1, 1);
512
        if (r < 0)
513
                return r;
514
                */
515

    
516
        store->sigevent.sigev_notify = SIGEV_SIGNAL;
517
        store->sigevent.sigev_signo = SIGIO;
518
        sa.sa_sigaction = sigaction_handler;
519
        sa.sa_flags = SA_SIGINFO;
520
        if (sigemptyset(&sa.sa_mask))
521
                perror("sigemptyset");
522

    
523
        if (sigaction(SIGIO, &sa, NULL)) {
524
                perror("sigaction");
525
                return -1;
526
        }
527

    
528
        store->nr_ops = nr_ops;
529
        store->maxfds = 2 * nr_ops;
530

    
531
        store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
532
        if(!store->fdcache)
533
                goto malloc_fail;
534

    
535
        store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
536
        if(!store->free_bufs)
537
                goto malloc_fail;
538

    
539
        store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
540
        if(!store->iothread)
541
                goto malloc_fail;
542

    
543
        store->ios = calloc(nr_ops, sizeof(struct io));
544
        if (!store->ios) {
545
malloc_fail:
546
                perror("malloc");
547
                return -1;
548
        }
549

    
550
        for (i = 0; i < nr_ops; i++) {
551
                store->ios[i].store = store;
552
                pthread_cond_init(&store->ios[i].cond, NULL);
553
                pthread_mutex_init(&store->ios[i].lock, NULL);
554
        }
555

    
556
        xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
557

    
558
        store->handled_reqs = 0;
559
        strncpy(store->path, path, MAX_PATH_SIZE);
560
        store->path[MAX_PATH_SIZE] = 0;
561

    
562
        store->path_len = strlen(store->path);
563
        if (store->path[store->path_len -1] != '/'){
564
                store->path[store->path_len] = '/';
565
                store->path[++store->path_len]= 0;
566
        }
567
        store->dirfd = open(store->path, O_RDWR);
568
        if (!(store->dirfd < 0 && errno == EISDIR)){
569
                fprintf(stderr, "%s is not a directory\n", store->path);
570
                return -1;
571
        }
572

    
573
        store->dirfd = open(store->path, O_RDONLY);
574
        if (store->dirfd < 0){
575
                perror("Directory open");
576
                return -1;
577
        }
578
/*
579
        mode = 1;
580
        int fd = dir_open(store, ".__tmp", 6, 1);
581
        if (fd < 0){
582
                perror("Directory check");
583
                return -1;
584
        }
585
*/
586
        if (xseg_initialize("posix")) {
587
                printf("cannot initialize library\n");
588
                return -1;
589
        }
590
        store->xseg = join(spec);
591
        if (!store->xseg)
592
                return -1;
593

    
594
        store->xport = xseg_bind_port(store->xseg, portno);
595
        if (!store->xport) {
596
                printf("cannot bind to port %ld\n", portno);
597
                return -1;
598
        }
599

    
600
        store->portno = xseg_portno(store->xseg, store->xport);
601
        printf("filed on port %u/%u\n",
602
                store->portno, store->xseg->config.nr_ports);
603

    
604
        for (i = 0; i < nr_ops; i++) {
605
                pthread_cond_init(&store->fdcache[i].cond, NULL);
606
                store->fdcache[i].flags = READY;
607
        }
608
        for (i = 0; i < nr_ops; i++) {
609
                //TODO error check + cond variable to stop io from starting
610
                //unless all threads are created successfully
611
                pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
612
        }
613
        pthread_mutex_init(&store->cache_lock, NULL);
614
        return filed_loop(store);
615
}
616

    
617
int main(int argc, char **argv)
618
{
619
        char *path, *spec = "";
620
        unsigned long size;
621
        int i;
622
        long portno;
623
        uint32_t nr_ops;
624

    
625
        if (argc < 2)
626
                return usage();
627

    
628
        path = argv[1];
629
        size = 0;
630
        portno = -1;
631
        nr_ops = 0;
632

    
633
        for (i = 2; i < argc; i++) {
634
                if (!strcmp(argv[i], "-g") && i + 1 < argc) {
635
                        spec = argv[i+1];
636
                        i += 1;
637
                        continue;
638
                }
639

    
640
                if (!strcmp(argv[i], "-p") && i + 1 < argc) {
641
                        portno = strtoul(argv[i+1], NULL, 10);
642
                        i += 1;
643
                        continue;
644
                }
645

    
646
                if (!strcmp(argv[i], "-n") && i + 1 < argc) {
647
                        nr_ops = strtoul(argv[i+1], NULL, 10);
648
                        i += 1;
649
                        continue;
650
                }
651
        }
652

    
653
        if (nr_ops <= 0)
654
                nr_ops = 16;
655

    
656
        return filed(path, size, nr_ops, spec, portno);
657
}
658