13 #include <xseg/xseg.h>
16 #define MAX_PATH_SIZE 255
17 #define MAX_FILENAME_SIZE 255
19 static int usage(void)
21 printf("Usage: ./filed <path_to_directory> [options]\n"
22 "Options: [-p portno]\n"
23 " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
24 " [-n nr_parallel_ops]\n"
30 unsigned long cacheidx;
37 struct xseg_request *req;
44 #define READY (1 << 1)
48 volatile unsigned int ref;
49 volatile unsigned long time;
50 volatile unsigned int flags;
52 char target[MAX_FILENAME_SIZE + 1];
57 struct xseg_port *xport;
64 struct sigevent sigevent;
67 uint64_t handled_reqs;
69 struct fdcache_node *fdcache;
71 pthread_mutex_t cache_lock;
72 char path[MAX_PATH_SIZE + 1];
75 static unsigned verbose;
77 static unsigned long sigaction_count;
79 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
84 static void log_io(char *msg, struct io *io)
86 char target[64], data[64];
87 /* null terminate name in case of req->target is less than 63 characters,
88 * and next character after name (aka first byte of next buffer) is not
91 unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
92 strncpy(target, io->req->target, end);
94 strncpy(data, io->req->data, 63);
98 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
99 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
101 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
102 (unsigned int)io->req->op,
103 (unsigned long long)io->req->offset,
104 (unsigned long)io->req->size,
105 (unsigned long)io->retval,
106 (unsigned int)io->req->state,
107 (unsigned int)io->req->targetlen, target,
108 (unsigned long long)io->req->datalen, data);
111 static struct io *alloc_io(struct store *store)
113 xqindex idx = xq_pop_head(&store->free_ops);
116 return store->ios + idx;
119 static inline void free_io(struct store *store, struct io *io)
121 xqindex idx = io - store->ios;
123 xq_append_head(&store->free_ops, idx);
127 static void complete(struct store *store, struct io *io)
129 struct xseg_request *req = io->req;
130 req->state |= XS_SERVED;
132 log_io("complete", io);
133 xseg_respond(store->xseg, req->portno, req);
134 xseg_signal(store->xseg, req->portno);
135 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
138 static void fail(struct store *store, struct io *io)
140 struct xseg_request *req = io->req;
141 req->state |= XS_FAILED;
144 xseg_respond(store->xseg, req->portno, req);
145 xseg_signal(store->xseg, req->portno);
146 if (io->fdcacheidx >= 0) {
147 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
151 static void pending(struct store *store, struct io *io)
153 io->req->state = XS_PENDING;
156 static void handle_unknown(struct store *store, struct io *io)
158 struct xseg_request *req = io->req;
159 snprintf(req->data, req->datalen, "unknown request op");
163 static inline void prepare_io(struct store *store, struct io *io)
168 static int dir_open( struct store *store, struct io *io,
169 char *target, uint32_t targetlen, int mode )
172 struct fdcache_node *ce = NULL;
176 if (targetlen> MAX_FILENAME_SIZE)
181 pthread_mutex_lock(&store->cache_lock);
185 for (i = 0; i < store->maxfds; i++) {
186 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time
187 && (store->fdcache[i].flags & READY)) {
188 min = store->fdcache[i].time;
192 if (!strncmp(store->fdcache[i].target, target, targetlen)) {
193 if (store->fdcache[i].target[targetlen] == 0) {
194 ce = &store->fdcache[i];
195 /* if any other io thread is currently opening
196 * the file, block until it succeeds or fails
198 if (!(ce->flags & READY)) {
199 pthread_cond_wait(&ce->cond, &store->cache_lock);
200 /* when ready, restart lookup */
203 /* if successfully opened */
205 fd = store->fdcache[i].fd;
209 /* else open failed for the other io thread, so
210 * it should fail for everyone waiting on this
222 /* all cache entries are currently being used */
223 pthread_mutex_unlock(&store->cache_lock);
226 if (store->fdcache[lru].ref){
228 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
231 /* make room for new file */
232 ce = &store->fdcache[lru];
233 /* set name here and state to not ready, for any other requests on the
234 * same target that may follow
236 strncpy(ce->target, target, targetlen);
237 ce->target[targetlen] = 0;
239 pthread_mutex_unlock(&store->cache_lock);
242 if (close(ce->fd) < 0){
246 fd = openat(store->dirfd, ce->target, O_RDWR);
248 if (errno == ENOENT){
249 fd = openat(store->dirfd, ce->target,
250 O_RDWR | O_CREAT, 0600);
255 /* insert in cache a negative fd to indicate opening error to
256 * any other ios waiting for the file to open
259 /* insert in cache */
261 pthread_mutex_lock(&store->cache_lock);
265 pthread_cond_broadcast(&ce->cond);
267 io->fdcacheidx = lru;
275 store->handled_reqs++;
276 ce->time = store->handled_reqs;
277 __sync_fetch_and_add(&ce->ref, 1);
278 pthread_mutex_unlock(&store->cache_lock);
283 pthread_mutex_unlock(&store->cache_lock);
287 static void handle_read_write(struct store *store, struct io *io)
290 struct xseg_request *req = io->req;
292 if (req->op == X_WRITE)
296 fd = dir_open(store, io, req->target, req->targetlen, mode);
304 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
306 if (req->flags & (XF_FLUSH | XF_FUA)) {
307 /* No FLUSH/FUA support yet (O_SYNC ?).
308 * note that with FLUSH/size == 0
309 * there will probably be a (uint64_t)-1 offset */
319 prepare_io(store, io);
323 while (req->serviced < req->datalen) {
324 r = pread(fd, req->data + req->serviced,
325 req->datalen - req->serviced,
326 req->offset + req->serviced);
328 req->datalen = req->serviced;
332 /* reached end of file. zero out the rest data buffer */
333 memset(req->data + req->serviced, 0, req->datalen - req->serviced);
334 req->serviced = req->datalen;
342 while (req->serviced < req->datalen) {
343 r = pwrite(fd, req->data + req->serviced,
344 req->datalen - req->serviced,
345 req->offset + req->serviced);
347 req->datalen = req->serviced;
350 /* reached end of file. zero out the rest data buffer */
351 memset(req->data + req->serviced, 0, req->datalen - req->serviced);
352 req->serviced = req->datalen;
361 /* if fsync fails, then no bytes serviced correctly */
366 snprintf(req->data, req->datalen,
367 "wtf, corrupt op %u?\n", req->op);
372 if (req->serviced > 0 ) {
376 strerror_r(errno, req->data, req->datalen);
382 static void handle_info(struct store *store, struct io *io)
384 struct xseg_request *req = io->req;
389 fd = dir_open(store, io, req->target, req->targetlen, 0);
394 r = fstat(fd, &stat);
401 *((off_t *) req->data) = size;
402 req->datalen = sizeof(size);
407 static void dispatch(struct store *store, struct io *io)
410 printf("io: 0x%p, req: 0x%p, op %u\n",
411 (void *)io, (void *)io->req, io->req->op);
412 switch (io->req->op) {
415 handle_read_write(store, io); break;
417 handle_info(store, io); break;
420 handle_unknown(store, io);
424 static void handle_accepted(struct store *store, struct io *io)
426 struct xseg_request *req = io->req;
428 req->state = XS_ACCEPTED;
433 static struct io* wake_up_next_iothread(struct store *store)
435 struct io *io = alloc_io(store);
438 pthread_mutex_lock(&io->lock);
439 pthread_cond_signal(&io->cond);
440 pthread_mutex_unlock(&io->lock);
445 void *io_loop(void *arg)
447 struct io *io = (struct io *) arg;
448 struct store *store = io->store;
449 struct xseg *xseg = store->xseg;
450 uint32_t portno = store->portno;
451 struct xseg_request *accepted;
455 accepted = xseg_accept(xseg, portno);
458 wake_up_next_iothread(store);
459 handle_accepted(store, io);
462 pthread_mutex_lock(&io->lock);
464 pthread_cond_wait(&io->cond, &io->lock);
465 pthread_mutex_unlock(&io->lock);
472 static struct xseg *join(char *spec)
474 struct xseg_config config;
477 (void)xseg_parse_spec(spec, &config);
478 xseg = xseg_join(config.type, config.name, "posix", NULL);
482 fprintf(stderr, "Failed to join xseg, creating it...\n");
483 (void)xseg_create(&config);
484 return xseg_join(config.type, config.name, "posix", NULL);
487 static int filed_loop(struct store *store)
489 struct xseg *xseg = store->xseg;
490 uint32_t portno = store->portno;
494 io = wake_up_next_iothread(store);
495 xseg_prepare_wait(xseg, portno);
496 xseg_wait_signal(xseg, 1000000UL);
501 static int filed( char *path, unsigned long size, uint32_t nr_ops,
502 char *spec, long portno )
510 store = malloc(sizeof(struct store));
523 store->sigevent.sigev_notify = SIGEV_SIGNAL;
524 store->sigevent.sigev_signo = SIGIO;
525 sa.sa_sigaction = sigaction_handler;
526 sa.sa_flags = SA_SIGINFO;
527 if (sigemptyset(&sa.sa_mask))
528 perror("sigemptyset");
530 if (sigaction(SIGIO, &sa, NULL)) {
535 store->nr_ops = nr_ops;
536 store->maxfds = 2 * nr_ops;
538 store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
542 store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
543 if(!store->free_bufs)
546 store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
550 store->ios = calloc(nr_ops, sizeof(struct io));
557 for (i = 0; i < nr_ops; i++) {
558 store->ios[i].store = store;
559 pthread_cond_init(&store->ios[i].cond, NULL);
560 pthread_mutex_init(&store->ios[i].lock, NULL);
563 xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
565 store->handled_reqs = 0;
566 strncpy(store->path, path, MAX_PATH_SIZE);
567 store->path[MAX_PATH_SIZE] = 0;
569 store->path_len = strlen(store->path);
570 if (store->path[store->path_len -1] != '/'){
571 store->path[store->path_len] = '/';
572 store->path[++store->path_len]= 0;
574 store->dirfd = open(store->path, O_RDWR);
575 if (!(store->dirfd < 0 && errno == EISDIR)){
576 fprintf(stderr, "%s is not a directory\n", store->path);
580 store->dirfd = open(store->path, O_RDONLY);
581 if (store->dirfd < 0){
582 perror("Directory open");
587 int fd = dir_open(store, ".__tmp", 6, 1);
589 perror("Directory check");
593 if (xseg_initialize()) {
594 printf("cannot initialize library\n");
597 store->xseg = join(spec);
601 store->xport = xseg_bind_port(store->xseg, portno);
603 printf("cannot bind to port %ld\n", portno);
607 store->portno = xseg_portno(store->xseg, store->xport);
608 printf("filed on port %u/%u\n",
609 store->portno, store->xseg->config.nr_ports);
611 for (i = 0; i < nr_ops; i++) {
612 pthread_cond_init(&store->fdcache[i].cond, NULL);
613 store->fdcache[i].flags = READY;
615 for (i = 0; i < nr_ops; i++) {
616 //TODO error check + cond variable to stop io from starting
617 //unless all threads are created successfully
618 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
620 pthread_mutex_init(&store->cache_lock, NULL);
621 return filed_loop(store);
624 int main(int argc, char **argv)
626 char *path, *spec = "";
640 for (i = 2; i < argc; i++) {
641 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
647 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
648 portno = strtoul(argv[i+1], NULL, 10);
653 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
654 nr_ops = strtoul(argv[i+1], NULL, 10);
658 if (!strcmp(argv[i], "-v")) {
667 return filed(path, size, nr_ops, spec, portno);