add volume creation capability to mapper
[archipelago] / xseg / peers / user / filed.c
index 02c046a..b9666d4 100644 (file)
@@ -12,6 +12,9 @@
 #include <limits.h>
 #include <xseg/xseg.h>
 #include <pthread.h>
+#include <xseg/protocol.h>
+#include <sys/sendfile.h>
+
 
 #define MAX_PATH_SIZE 255
 #define MAX_FILENAME_SIZE 255
@@ -49,7 +52,7 @@ struct fdcache_node {
        volatile unsigned long time;
        volatile unsigned int flags;
        pthread_cond_t cond;
-       char name[MAX_FILENAME_SIZE + 1];
+       char target[MAX_FILENAME_SIZE + 1];
 };
 
 struct store {
@@ -83,35 +86,42 @@ static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
 
 static void log_io(char *msg, struct io *io)
 {
-       char name[64], data[64];
-       /* null terminate name in case of req->name is less than 63 characters,
+       char target[64], data[64];
+       /* null terminate name in case of req->target is less than 63 characters,
         * and next character after name (aka first byte of next buffer) is not
         * null
         */
-       unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
-       strncpy(name, io->req->name, end);
-       name[end] = 0;
-       strncpy(data, io->req->data, 63);
+       struct store* store = io->store;
+       struct xseg *xseg = store->xseg;
+       struct xseg_request *req = io->req;
+       char *req_target = xseg_get_target(xseg, req);
+       char *req_data = xseg_get_data(xseg, req);
+
+       unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
+
+       strncpy(target, req_target, end);
+       target[end] = 0;
+       strncpy(data, req_data, 63);
        data[63] = 0;
 
        fprintf(stderr,
                "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
-               "name[%u]: '%s', data[%llu]:\n%s------------------\n\n",
+               "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
                msg,
                (unsigned int)io->fdcacheidx, //this is cacheidx not fd
-               (unsigned int)io->req->op,
-               (unsigned long long)io->req->offset,
-               (unsigned long)io->req->size,
+               (unsigned int)req->op,
+               (unsigned long long)req->offset,
+               (unsigned long)req->size,
                (unsigned long)io->retval,
-               (unsigned int)io->req->state,
-               (unsigned int)io->req->namesize, name,
-               (unsigned long long)io->req->datasize, data);
+               (unsigned int)req->state,
+               (unsigned int)req->targetlen, target,
+               (unsigned long long)req->datalen, data);
 }
 
 static struct io *alloc_io(struct store *store)
 {
-       xqindex idx = xq_pop_head(&store->free_ops);
-       if (idx == None)
+       xqindex idx = xq_pop_head(&store->free_ops, 1);
+       if (idx == Noneidx)
                return NULL;
        return store->ios + idx;
 }
@@ -120,29 +130,33 @@ static inline void free_io(struct store *store, struct io *io)
 {
        xqindex idx = io - store->ios;
        io->req = NULL;
-       xq_append_head(&store->free_ops, idx);
+       xq_append_head(&store->free_ops, idx, 1);
 }
 
 
 static void complete(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_SERVED;
        if (verbose)
                log_io("complete", io);
-       xseg_respond(store->xseg, req->portno, req);
-       xseg_signal(store->xseg, req->portno);
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
+               ;
+       xseg_signal(store->xseg, p);
        __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
 }
 
 static void fail(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_FAILED;
        if (verbose)
                log_io("fail", io);
-       xseg_respond(store->xseg, req->portno, req);
-       xseg_signal(store->xseg, req->portno);
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
+               ;
+       xseg_signal(store->xseg, p);
        if (io->fdcacheidx >= 0) {
                __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
        }
@@ -155,8 +169,10 @@ static void pending(struct store *store, struct io *io)
 
 static void handle_unknown(struct store *store, struct io *io)
 {
+       struct xseg *xseg = store->xseg;
        struct xseg_request *req = io->req;
-       snprintf(req->data, req->datasize, "unknown request op");
+       char *data = xseg_get_data(xseg, req);
+       snprintf(data, req->datalen, "unknown request op");
        fail(store, io);
 }
 
@@ -166,14 +182,14 @@ static inline void prepare_io(struct store *store, struct io *io)
 
 
 static int dir_open(   struct store *store, struct io *io,
-                       char *name, uint32_t namesize, int mode )
+                       char *target, uint32_t targetlen, int mode      )
 {
-       int fd = -1, r;
+       int fd = -1;
        struct fdcache_node *ce = NULL;
        long i, lru;
        uint64_t min;
        io->fdcacheidx = -1;
-       if (namesize > MAX_FILENAME_SIZE)
+       if (targetlen> MAX_FILENAME_SIZE)
                goto out_err;
 
 start:
@@ -189,8 +205,8 @@ start_locked:
                        lru = i;
 
                }
-               if (!strncmp(store->fdcache[i].name, name, namesize)) {
-                       if (store->fdcache[i].name[namesize] == 0) {
+               if (!strncmp(store->fdcache[i].target, target, targetlen)) {
+                       if (store->fdcache[i].target[targetlen] == 0) {
                                ce = &store->fdcache[i];
                                /* if any other io thread is currently opening
                                 * the file, block until it succeeds or fails
@@ -233,8 +249,8 @@ start_locked:
        /* set name here and state to not ready, for any other requests on the
         * same target that may follow
         */
-       strncpy(ce->name, name, namesize);
-       ce->name[namesize] = 0;
+       strncpy(ce->target, target, targetlen);
+       ce->target[targetlen] = 0;
        ce->flags &= ~READY;
        pthread_mutex_unlock(&store->cache_lock);
 
@@ -243,10 +259,10 @@ start_locked:
                        perror("close");
                }
        }
-       fd = openat(store->dirfd, ce->name, O_RDWR);    
+       fd = openat(store->dirfd, ce->target, O_RDWR);  
        if (fd < 0) {
                if (errno == ENOENT){
-                       fd = openat(store->dirfd, ce->name, 
+                       fd = openat(store->dirfd, ce->target, 
                                        O_RDWR | O_CREAT, 0600);
                        if (fd >= 0)
                                goto new_entry;
@@ -287,13 +303,16 @@ out_err_unlock:
 static void handle_read_write(struct store *store, struct io *io)
 {
        int r, fd, mode;
+       struct xseg *xseg = store->xseg;
        struct xseg_request *req = io->req;
+       char *target = xseg_get_target(xseg, req);
+       char *data = xseg_get_data(xseg, req);
 
        if (req->op == X_WRITE)
                mode = 1;
        else
                mode = 0;
-       fd = dir_open(store, io, req->name, req->namesize, mode);
+       fd = dir_open(store, io, target, req->targetlen, mode);
        if (fd < 0){
                perror("dir_open");
                fail(store, io);
@@ -320,18 +339,18 @@ static void handle_read_write(struct store *store, struct io *io)
 
        switch (req->op) {
        case X_READ:
-               while (req->serviced < req->datasize) {
-                       r = pread(fd, req->data + req->serviced, 
-                                       req->datasize - req->serviced,
+               while (req->serviced < req->datalen) {
+                       r = pread(fd, data + req->serviced, 
+                                       req->datalen - req->serviced,
                                        req->offset + req->serviced);
                        if (r < 0) {
-                               req->datasize = req->serviced;
+                               req->datalen = req->serviced;
                                perror("pread");
                        }
                        else if (r == 0) {
                                /* reached end of file. zero out the rest data buffer */
-                               memset(req->data + req->serviced, 0, req->datasize - req->serviced);
-                               req->serviced = req->datasize;
+                               memset(data + req->serviced, 0, req->datalen - req->serviced);
+                               req->serviced = req->datalen;
                        }
                        else {
                                req->serviced += r;
@@ -339,17 +358,17 @@ static void handle_read_write(struct store *store, struct io *io)
                }
                break;
        case X_WRITE:
-               while (req->serviced < req->datasize) {
-                       r = pwrite(fd, req->data + req->serviced, 
-                                       req->datasize - req->serviced,
+               while (req->serviced < req->datalen) {
+                       r = pwrite(fd, data + req->serviced, 
+                                       req->datalen - req->serviced,
                                        req->offset + req->serviced);
                        if (r < 0) {
-                               req->datasize = req->serviced;
+                               req->datalen = req->serviced;
                        }
                        else if (r == 0) {
                                /* reached end of file. zero out the rest data buffer */
-                               memset(req->data + req->serviced, 0, req->datasize - req->serviced);
-                               req->serviced = req->datasize;
+                               memset(data + req->serviced, 0, req->datalen - req->serviced);
+                               req->serviced = req->datalen;
                        }
                        else {
                                req->serviced += r;
@@ -363,7 +382,7 @@ static void handle_read_write(struct store *store, struct io *io)
                }
                break;
        default:
-               snprintf(req->data, req->datasize,
+               snprintf(data, req->datalen,
                         "wtf, corrupt op %u?\n", req->op);
                fail(store, io);
                return;
@@ -373,7 +392,7 @@ static void handle_read_write(struct store *store, struct io *io)
                complete(store, io);
        }
        else {
-               strerror_r(errno, req->data, req->datasize);
+               strerror_r(errno, data, req->datalen);
                fail(store, io);
        }
        return;
@@ -381,12 +400,15 @@ static void handle_read_write(struct store *store, struct io *io)
 
 static void handle_info(struct store *store, struct io *io)
 {
+       struct xseg *xseg = store->xseg;
        struct xseg_request *req = io->req;
-        struct stat stat;
+       char *target = xseg_get_target(xseg, req);
+       char *data = xseg_get_data(xseg, req);
+       struct stat stat;
        int fd, r;
        off_t size;
 
-       fd = dir_open(store, io, req->name, req->namesize, 0);
+       fd = dir_open(store, io, target, req->targetlen, 0);
        if (fd < 0) {
                fail(store, io);
                return;
@@ -398,10 +420,94 @@ static void handle_info(struct store *store, struct io *io)
                return;
        }
        size = stat.st_size;
-       *((off_t *) req->data) = size;
-       req->datasize = sizeof(size);
+       *((off_t *) data) = size;
+       req->datalen = sizeof(size);
+
+       complete(store, io);
+}
+
+static void handle_copy(struct store *store, struct io *io)
+{
+        struct xseg_request *req = io->req;
+        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
+        struct stat st;
+        int n, src, dst;
+       char buf[XSEG_MAX_TARGETLEN+1];
+       char *target = xseg_get_target(store->xseg, req);
+
+        dst = dir_open(store, io, target, req->targetlen, 1);
+        if (dst < 0) {
+                fprintf(stderr, "fail in dst\n");
+                fail(store, io);
+                return;
+        }
+
+       strncpy(buf, xcopy->target, xcopy->targetlen);
+       buf[xcopy->targetlen] = 0;
+       src = openat(store->dirfd, buf, O_RDWR);        
+        if (src < 0) {
+               if (errno == ENOENT){
+                       src = openat(store->dirfd, buf, 
+                                       O_RDWR | O_CREAT, 0600);
+                       if (src < 0 ) {
+                               fprintf(stderr, "fail in src\n");
+                               fail(store, io);
+                               return;
+                       }       
+               } else {
+                       fprintf(stderr, "fail in src\n");
+                       fail(store, io);
+                       return;
+               }
+        }
+
+        fstat(src, &st);
+        n = sendfile(dst, src, 0, st.st_size);
+        if (n != st.st_size) {
+                fprintf(stderr, "fail in copy\n");
+                fail(store, io);
+                goto out;
+        }
+
+        if (n < 0) {
+                fprintf(stderr, "fail in cp\n");
+                fail(store, io);
+                goto out;
+        }
+
+        complete(store, io);
+
+out:
+        close(src);
+}
+
+static void handle_delete(struct store *store, struct io *io)
+{
+       struct xseg_request *req = io->req;
+       int fd;
+       char *target = xseg_get_target(store->xseg, req);
+       
+       fd = dir_open(store, io, target, req->targetlen, 0);
+       if (fd < 0) {
+               fprintf(stderr, "fail in dir_open\n");
+               fail(store, io);
+               return;
+       }
+
+       /* 'invalidate' cache entry */
+       if (io->fdcacheidx >= 0) {
+               store->fdcache[io->fdcacheidx].fd = -1;
+       }
+
+       close(fd);
+       char buf[MAX_FILENAME_SIZE + 1];
+       strncpy(buf, target, req->targetlen);
+       buf[req->targetlen] = 0;
+       unlinkat(store->dirfd, buf, 0);
 
        complete(store, io);
+
+       return;
 }
 
 static void dispatch(struct store *store, struct io *io)
@@ -415,6 +521,10 @@ static void dispatch(struct store *store, struct io *io)
                handle_read_write(store, io); break;
        case X_INFO:
                handle_info(store, io); break;
+       case X_DELETE:
+               handle_delete(store, io); break;
+       case X_COPY:
+               handle_copy(store, io); break;
        case X_SYNC:
        default:
                handle_unknown(store, io);
@@ -452,7 +562,7 @@ void *io_loop(void *arg)
 
        for (;;) {
                accepted = NULL;
-               accepted = xseg_accept(xseg, portno);
+               accepted = xseg_accept(xseg, portno, 0);
                if (accepted) {
                        io->req = accepted;
                        wake_up_next_iothread(store);
@@ -493,7 +603,7 @@ static int filed_loop(struct store *store)
        for (;;) {
                io = wake_up_next_iothread(store);
                xseg_prepare_wait(xseg, portno);
-               xseg_wait_signal(xseg, 10000);
+               xseg_wait_signal(xseg, 1000000UL);
        }
        return 0;
 }
@@ -501,11 +611,9 @@ static int filed_loop(struct store *store)
 static int filed(      char *path, unsigned long size, uint32_t nr_ops,
                        char *spec, long portno )
 {
-       struct stat stat;
        struct sigaction sa;
        struct store *store;
-       int r, mode, i;
-       void *status;
+       int i;
 
        store = malloc(sizeof(struct store));
        if (!store) {
@@ -598,7 +706,7 @@ malloc_fail:
        if (!store->xseg)
                return -1;
 
-       store->xport = xseg_bind_port(store->xseg, portno);
+       store->xport = xseg_bind_port(store->xseg, portno, NULL);
        if (!store->xport) {
                printf("cannot bind to port %ld\n", portno);
                return -1;
@@ -608,6 +716,11 @@ malloc_fail:
        printf("filed on port %u/%u\n",
                store->portno, store->xseg->config.nr_ports);
 
+       if (xseg_init_local_signal(store->xseg, store->portno) < 0){
+               printf("cannot int local signals\n");
+               return -1;
+       }
+
        for (i = 0; i < nr_ops; i++) {
                pthread_cond_init(&store->fdcache[i].cond, NULL);
                store->fdcache[i].flags = READY;