13 #include <xseg/xseg.h>
15 #include <xseg/protocol.h>
16 #include <sys/sendfile.h>
19 #define MAX_PATH_SIZE 255
20 #define MAX_FILENAME_SIZE 255
22 static int usage(void)
24 printf("Usage: ./filed <path_to_directory> [options]\n"
25 "Options: [-p portno]\n"
26 " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
27 " [-n nr_parallel_ops]\n"
33 unsigned long cacheidx;
40 struct xseg_request *req;
47 #define READY (1 << 1)
51 volatile unsigned int ref;
52 volatile unsigned long time;
53 volatile unsigned int flags;
55 char target[MAX_FILENAME_SIZE + 1];
60 struct xseg_port *xport;
67 struct sigevent sigevent;
70 uint64_t handled_reqs;
72 struct fdcache_node *fdcache;
74 pthread_mutex_t cache_lock;
75 char path[MAX_PATH_SIZE + 1];
78 static unsigned verbose;
80 static unsigned long sigaction_count;
82 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
87 static void log_io(char *msg, struct io *io)
89 char target[64], data[64];
90 /* null terminate name in case of req->target is less than 63 characters,
91 * and next character after name (aka first byte of next buffer) is not
94 struct store* store = io->store;
95 struct xseg *xseg = store->xseg;
96 struct xseg_request *req = io->req;
97 char *req_target = xseg_get_target(xseg, req);
98 char *req_data = xseg_get_data(xseg, req);
100 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
102 strncpy(target, req_target, end);
104 strncpy(data, req_data, 63);
108 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
109 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
111 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
112 (unsigned int)req->op,
113 (unsigned long long)req->offset,
114 (unsigned long)req->size,
115 (unsigned long)io->retval,
116 (unsigned int)req->state,
117 (unsigned int)req->targetlen, target,
118 (unsigned long long)req->datalen, data);
121 static struct io *alloc_io(struct store *store)
123 xqindex idx = xq_pop_head(&store->free_ops, 1);
126 return store->ios + idx;
129 static inline void free_io(struct store *store, struct io *io)
131 xqindex idx = io - store->ios;
133 xq_append_head(&store->free_ops, idx, 1);
137 static void complete(struct store *store, struct io *io)
139 struct xseg_request *req = io->req;
141 req->state |= XS_SERVED;
143 log_io("complete", io);
144 while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
146 xseg_signal(store->xseg, p);
147 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
150 static void fail(struct store *store, struct io *io)
152 struct xseg_request *req = io->req;
154 req->state |= XS_FAILED;
157 while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
159 xseg_signal(store->xseg, p);
160 if (io->fdcacheidx >= 0) {
161 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
165 static void pending(struct store *store, struct io *io)
167 io->req->state = XS_PENDING;
170 static void handle_unknown(struct store *store, struct io *io)
172 struct xseg *xseg = store->xseg;
173 struct xseg_request *req = io->req;
174 char *data = xseg_get_data(xseg, req);
175 snprintf(data, req->datalen, "unknown request op");
179 static inline void prepare_io(struct store *store, struct io *io)
184 static int dir_open( struct store *store, struct io *io,
185 char *target, uint32_t targetlen, int mode )
188 struct fdcache_node *ce = NULL;
192 if (targetlen> MAX_FILENAME_SIZE)
197 pthread_mutex_lock(&store->cache_lock);
201 for (i = 0; i < store->maxfds; i++) {
202 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time
203 && (store->fdcache[i].flags & READY)) {
204 min = store->fdcache[i].time;
208 if (!strncmp(store->fdcache[i].target, target, targetlen)) {
209 if (store->fdcache[i].target[targetlen] == 0) {
210 ce = &store->fdcache[i];
211 /* if any other io thread is currently opening
212 * the file, block until it succeeds or fails
214 if (!(ce->flags & READY)) {
215 pthread_cond_wait(&ce->cond, &store->cache_lock);
216 /* when ready, restart lookup */
219 /* if successfully opened */
221 fd = store->fdcache[i].fd;
225 /* else open failed for the other io thread, so
226 * it should fail for everyone waiting on this
238 /* all cache entries are currently being used */
239 pthread_mutex_unlock(&store->cache_lock);
242 if (store->fdcache[lru].ref){
244 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
247 /* make room for new file */
248 ce = &store->fdcache[lru];
249 /* set name here and state to not ready, for any other requests on the
250 * same target that may follow
252 strncpy(ce->target, target, targetlen);
253 ce->target[targetlen] = 0;
255 pthread_mutex_unlock(&store->cache_lock);
258 if (close(ce->fd) < 0){
262 fd = openat(store->dirfd, ce->target, O_RDWR);
264 if (errno == ENOENT){
265 fd = openat(store->dirfd, ce->target,
266 O_RDWR | O_CREAT, 0600);
271 /* insert in cache a negative fd to indicate opening error to
272 * any other ios waiting for the file to open
275 /* insert in cache */
277 pthread_mutex_lock(&store->cache_lock);
281 pthread_cond_broadcast(&ce->cond);
283 io->fdcacheidx = lru;
291 store->handled_reqs++;
292 ce->time = store->handled_reqs;
293 __sync_fetch_and_add(&ce->ref, 1);
294 pthread_mutex_unlock(&store->cache_lock);
299 pthread_mutex_unlock(&store->cache_lock);
303 static void handle_read_write(struct store *store, struct io *io)
306 struct xseg *xseg = store->xseg;
307 struct xseg_request *req = io->req;
308 char *target = xseg_get_target(xseg, req);
309 char *data = xseg_get_data(xseg, req);
311 if (req->op == X_WRITE)
315 fd = dir_open(store, io, target, req->targetlen, mode);
323 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
325 if (req->flags & (XF_FLUSH | XF_FUA)) {
326 /* No FLUSH/FUA support yet (O_SYNC ?).
327 * note that with FLUSH/size == 0
328 * there will probably be a (uint64_t)-1 offset */
338 prepare_io(store, io);
342 while (req->serviced < req->datalen) {
343 r = pread(fd, data + req->serviced,
344 req->datalen - req->serviced,
345 req->offset + req->serviced);
347 req->datalen = req->serviced;
351 /* reached end of file. zero out the rest data buffer */
352 memset(data + req->serviced, 0, req->datalen - req->serviced);
353 req->serviced = req->datalen;
361 while (req->serviced < req->datalen) {
362 r = pwrite(fd, data + req->serviced,
363 req->datalen - req->serviced,
364 req->offset + req->serviced);
366 req->datalen = req->serviced;
369 /* reached end of file. zero out the rest data buffer */
370 memset(data + req->serviced, 0, req->datalen - req->serviced);
371 req->serviced = req->datalen;
380 /* if fsync fails, then no bytes serviced correctly */
385 snprintf(data, req->datalen,
386 "wtf, corrupt op %u?\n", req->op);
391 if (req->serviced > 0 ) {
395 strerror_r(errno, data, req->datalen);
401 static void handle_info(struct store *store, struct io *io)
403 struct xseg *xseg = store->xseg;
404 struct xseg_request *req = io->req;
405 char *target = xseg_get_target(xseg, req);
406 char *data = xseg_get_data(xseg, req);
411 fd = dir_open(store, io, target, req->targetlen, 0);
416 r = fstat(fd, &stat);
423 *((off_t *) data) = size;
424 req->datalen = sizeof(size);
429 static void handle_copy(struct store *store, struct io *io)
431 struct xseg_request *req = io->req;
432 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
435 char buf[XSEG_MAX_TARGETLEN+1];
436 char *target = xseg_get_target(store->xseg, req);
438 dst = dir_open(store, io, target, req->targetlen, 1);
440 fprintf(stderr, "fail in dst\n");
445 strncpy(buf, xcopy->target, xcopy->targetlen);
446 buf[xcopy->targetlen] = 0;
447 src = openat(store->dirfd, buf, O_RDWR);
449 if (errno == ENOENT){
450 src = openat(store->dirfd, buf,
451 O_RDWR | O_CREAT, 0600);
453 fprintf(stderr, "fail in src\n");
458 fprintf(stderr, "fail in src\n");
465 n = sendfile(dst, src, 0, st.st_size);
466 if (n != st.st_size) {
467 fprintf(stderr, "fail in copy\n");
473 fprintf(stderr, "fail in cp\n");
484 static void handle_delete(struct store *store, struct io *io)
486 struct xseg_request *req = io->req;
488 char *target = xseg_get_target(store->xseg, req);
490 fd = dir_open(store, io, target, req->targetlen, 0);
492 fprintf(stderr, "fail in dir_open\n");
497 /* 'invalidate' cache entry */
498 if (io->fdcacheidx >= 0) {
499 store->fdcache[io->fdcacheidx].fd = -1;
503 char buf[MAX_FILENAME_SIZE + 1];
504 strncpy(buf, target, req->targetlen);
505 buf[req->targetlen] = 0;
506 unlinkat(store->dirfd, buf, 0);
513 static void dispatch(struct store *store, struct io *io)
516 printf("io: 0x%p, req: 0x%p, op %u\n",
517 (void *)io, (void *)io->req, io->req->op);
518 switch (io->req->op) {
521 handle_read_write(store, io); break;
523 handle_info(store, io); break;
525 handle_delete(store, io); break;
527 handle_copy(store, io); break;
530 handle_unknown(store, io);
534 static void handle_accepted(struct store *store, struct io *io)
536 struct xseg_request *req = io->req;
538 req->state = XS_ACCEPTED;
543 static struct io* wake_up_next_iothread(struct store *store)
545 struct io *io = alloc_io(store);
548 pthread_mutex_lock(&io->lock);
549 pthread_cond_signal(&io->cond);
550 pthread_mutex_unlock(&io->lock);
555 void *io_loop(void *arg)
557 struct io *io = (struct io *) arg;
558 struct store *store = io->store;
559 struct xseg *xseg = store->xseg;
560 uint32_t portno = store->portno;
561 struct xseg_request *accepted;
565 accepted = xseg_accept(xseg, portno, 0);
568 wake_up_next_iothread(store);
569 handle_accepted(store, io);
572 pthread_mutex_lock(&io->lock);
574 pthread_cond_wait(&io->cond, &io->lock);
575 pthread_mutex_unlock(&io->lock);
582 static struct xseg *join(char *spec)
584 struct xseg_config config;
587 (void)xseg_parse_spec(spec, &config);
588 xseg = xseg_join(config.type, config.name, "posix", NULL);
592 fprintf(stderr, "Failed to join xseg, creating it...\n");
593 (void)xseg_create(&config);
594 return xseg_join(config.type, config.name, "posix", NULL);
597 static int filed_loop(struct store *store)
599 struct xseg *xseg = store->xseg;
600 uint32_t portno = store->portno;
604 io = wake_up_next_iothread(store);
605 xseg_prepare_wait(xseg, portno);
606 xseg_wait_signal(xseg, 1000000UL);
611 static int filed( char *path, unsigned long size, uint32_t nr_ops,
612 char *spec, long portno )
618 store = malloc(sizeof(struct store));
631 store->sigevent.sigev_notify = SIGEV_SIGNAL;
632 store->sigevent.sigev_signo = SIGIO;
633 sa.sa_sigaction = sigaction_handler;
634 sa.sa_flags = SA_SIGINFO;
635 if (sigemptyset(&sa.sa_mask))
636 perror("sigemptyset");
638 if (sigaction(SIGIO, &sa, NULL)) {
643 store->nr_ops = nr_ops;
644 store->maxfds = 2 * nr_ops;
646 store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
650 store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
651 if(!store->free_bufs)
654 store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
658 store->ios = calloc(nr_ops, sizeof(struct io));
665 for (i = 0; i < nr_ops; i++) {
666 store->ios[i].store = store;
667 pthread_cond_init(&store->ios[i].cond, NULL);
668 pthread_mutex_init(&store->ios[i].lock, NULL);
671 xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
673 store->handled_reqs = 0;
674 strncpy(store->path, path, MAX_PATH_SIZE);
675 store->path[MAX_PATH_SIZE] = 0;
677 store->path_len = strlen(store->path);
678 if (store->path[store->path_len -1] != '/'){
679 store->path[store->path_len] = '/';
680 store->path[++store->path_len]= 0;
682 store->dirfd = open(store->path, O_RDWR);
683 if (!(store->dirfd < 0 && errno == EISDIR)){
684 fprintf(stderr, "%s is not a directory\n", store->path);
688 store->dirfd = open(store->path, O_RDONLY);
689 if (store->dirfd < 0){
690 perror("Directory open");
695 int fd = dir_open(store, ".__tmp", 6, 1);
697 perror("Directory check");
701 if (xseg_initialize()) {
702 printf("cannot initialize library\n");
705 store->xseg = join(spec);
709 store->xport = xseg_bind_port(store->xseg, portno, NULL);
711 printf("cannot bind to port %ld\n", portno);
715 store->portno = xseg_portno(store->xseg, store->xport);
716 printf("filed on port %u/%u\n",
717 store->portno, store->xseg->config.nr_ports);
719 if (xseg_init_local_signal(store->xseg, store->portno) < 0){
720 printf("cannot int local signals\n");
724 for (i = 0; i < nr_ops; i++) {
725 pthread_cond_init(&store->fdcache[i].cond, NULL);
726 store->fdcache[i].flags = READY;
728 for (i = 0; i < nr_ops; i++) {
729 //TODO error check + cond variable to stop io from starting
730 //unless all threads are created successfully
731 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
733 pthread_mutex_init(&store->cache_lock, NULL);
734 return filed_loop(store);
737 int main(int argc, char **argv)
739 char *path, *spec = "";
753 for (i = 2; i < argc; i++) {
754 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
760 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
761 portno = strtoul(argv[i+1], NULL, 10);
766 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
767 nr_ops = strtoul(argv[i+1], NULL, 10);
771 if (!strcmp(argv[i], "-v")) {
780 return filed(path, size, nr_ops, spec, portno);