add copy, acquire, release functionality in xseg tooladd copy, acquire, release
[archipelago] / xseg / peers / user / filed.c
index 47402c7..b9666d4 100644 (file)
@@ -12,6 +12,9 @@
 #include <limits.h>
 #include <xseg/xseg.h>
 #include <pthread.h>
+#include <xseg/protocol.h>
+#include <sys/sendfile.h>
+
 
 #define MAX_PATH_SIZE 255
 #define MAX_FILENAME_SIZE 255
@@ -88,10 +91,17 @@ static void log_io(char *msg, struct io *io)
         * and next character after name (aka first byte of next buffer) is not
         * null
         */
-       unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
-       strncpy(target, io->req->target, end);
+       struct store* store = io->store;
+       struct xseg *xseg = store->xseg;
+       struct xseg_request *req = io->req;
+       char *req_target = xseg_get_target(xseg, req);
+       char *req_data = xseg_get_data(xseg, req);
+
+       unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
+
+       strncpy(target, req_target, end);
        target[end] = 0;
-       strncpy(data, io->req->data, 63);
+       strncpy(data, req_data, 63);
        data[63] = 0;
 
        fprintf(stderr,
@@ -99,13 +109,13 @@ static void log_io(char *msg, struct io *io)
                "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
                msg,
                (unsigned int)io->fdcacheidx, //this is cacheidx not fd
-               (unsigned int)io->req->op,
-               (unsigned long long)io->req->offset,
-               (unsigned long)io->req->size,
+               (unsigned int)req->op,
+               (unsigned long long)req->offset,
+               (unsigned long)req->size,
                (unsigned long)io->retval,
-               (unsigned int)io->req->state,
-               (unsigned int)io->req->targetlen, target,
-               (unsigned long long)io->req->datalen, data);
+               (unsigned int)req->state,
+               (unsigned int)req->targetlen, target,
+               (unsigned long long)req->datalen, data);
 }
 
 static struct io *alloc_io(struct store *store)
@@ -127,22 +137,26 @@ static inline void free_io(struct store *store, struct io *io)
 static void complete(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_SERVED;
        if (verbose)
                log_io("complete", io);
-       xseg_respond(store->xseg, req->portno, req);
-       xseg_signal(store->xseg, req->portno);
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
+               ;
+       xseg_signal(store->xseg, p);
        __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
 }
 
 static void fail(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_FAILED;
        if (verbose)
                log_io("fail", io);
-       xseg_respond(store->xseg, req->portno, req);
-       xseg_signal(store->xseg, req->portno);
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
+               ;
+       xseg_signal(store->xseg, p);
        if (io->fdcacheidx >= 0) {
                __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
        }
@@ -155,8 +169,10 @@ static void pending(struct store *store, struct io *io)
 
 static void handle_unknown(struct store *store, struct io *io)
 {
+       struct xseg *xseg = store->xseg;
        struct xseg_request *req = io->req;
-       snprintf(req->data, req->datalen, "unknown request op");
+       char *data = xseg_get_data(xseg, req);
+       snprintf(data, req->datalen, "unknown request op");
        fail(store, io);
 }
 
@@ -287,13 +303,16 @@ out_err_unlock:
 static void handle_read_write(struct store *store, struct io *io)
 {
        int r, fd, mode;
+       struct xseg *xseg = store->xseg;
        struct xseg_request *req = io->req;
+       char *target = xseg_get_target(xseg, req);
+       char *data = xseg_get_data(xseg, req);
 
        if (req->op == X_WRITE)
                mode = 1;
        else
                mode = 0;
-       fd = dir_open(store, io, req->target, req->targetlen, mode);
+       fd = dir_open(store, io, target, req->targetlen, mode);
        if (fd < 0){
                perror("dir_open");
                fail(store, io);
@@ -321,7 +340,7 @@ static void handle_read_write(struct store *store, struct io *io)
        switch (req->op) {
        case X_READ:
                while (req->serviced < req->datalen) {
-                       r = pread(fd, req->data + req->serviced, 
+                       r = pread(fd, data + req->serviced, 
                                        req->datalen - req->serviced,
                                        req->offset + req->serviced);
                        if (r < 0) {
@@ -330,7 +349,7 @@ static void handle_read_write(struct store *store, struct io *io)
                        }
                        else if (r == 0) {
                                /* reached end of file. zero out the rest data buffer */
-                               memset(req->data + req->serviced, 0, req->datalen - req->serviced);
+                               memset(data + req->serviced, 0, req->datalen - req->serviced);
                                req->serviced = req->datalen;
                        }
                        else {
@@ -340,7 +359,7 @@ static void handle_read_write(struct store *store, struct io *io)
                break;
        case X_WRITE:
                while (req->serviced < req->datalen) {
-                       r = pwrite(fd, req->data + req->serviced, 
+                       r = pwrite(fd, data + req->serviced, 
                                        req->datalen - req->serviced,
                                        req->offset + req->serviced);
                        if (r < 0) {
@@ -348,7 +367,7 @@ static void handle_read_write(struct store *store, struct io *io)
                        }
                        else if (r == 0) {
                                /* reached end of file. zero out the rest data buffer */
-                               memset(req->data + req->serviced, 0, req->datalen - req->serviced);
+                               memset(data + req->serviced, 0, req->datalen - req->serviced);
                                req->serviced = req->datalen;
                        }
                        else {
@@ -363,7 +382,7 @@ static void handle_read_write(struct store *store, struct io *io)
                }
                break;
        default:
-               snprintf(req->data, req->datalen,
+               snprintf(data, req->datalen,
                         "wtf, corrupt op %u?\n", req->op);
                fail(store, io);
                return;
@@ -373,7 +392,7 @@ static void handle_read_write(struct store *store, struct io *io)
                complete(store, io);
        }
        else {
-               strerror_r(errno, req->data, req->datalen);
+               strerror_r(errno, data, req->datalen);
                fail(store, io);
        }
        return;
@@ -381,12 +400,15 @@ static void handle_read_write(struct store *store, struct io *io)
 
 static void handle_info(struct store *store, struct io *io)
 {
+       struct xseg *xseg = store->xseg;
        struct xseg_request *req = io->req;
+       char *target = xseg_get_target(xseg, req);
+       char *data = xseg_get_data(xseg, req);
        struct stat stat;
        int fd, r;
        off_t size;
 
-       fd = dir_open(store, io, req->target, req->targetlen, 0);
+       fd = dir_open(store, io, target, req->targetlen, 0);
        if (fd < 0) {
                fail(store, io);
                return;
@@ -398,12 +420,96 @@ static void handle_info(struct store *store, struct io *io)
                return;
        }
        size = stat.st_size;
-       *((off_t *) req->data) = size;
+       *((off_t *) data) = size;
        req->datalen = sizeof(size);
 
        complete(store, io);
 }
 
+static void handle_copy(struct store *store, struct io *io)
+{
+        struct xseg_request *req = io->req;
+        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
+        struct stat st;
+        int n, src, dst;
+       char buf[XSEG_MAX_TARGETLEN+1];
+       char *target = xseg_get_target(store->xseg, req);
+
+        dst = dir_open(store, io, target, req->targetlen, 1);
+        if (dst < 0) {
+                fprintf(stderr, "fail in dst\n");
+                fail(store, io);
+                return;
+        }
+
+       strncpy(buf, xcopy->target, xcopy->targetlen);
+       buf[xcopy->targetlen] = 0;
+       src = openat(store->dirfd, buf, O_RDWR);        
+        if (src < 0) {
+               if (errno == ENOENT){
+                       src = openat(store->dirfd, buf, 
+                                       O_RDWR | O_CREAT, 0600);
+                       if (src < 0 ) {
+                               fprintf(stderr, "fail in src\n");
+                               fail(store, io);
+                               return;
+                       }       
+               } else {
+                       fprintf(stderr, "fail in src\n");
+                       fail(store, io);
+                       return;
+               }
+        }
+
+        fstat(src, &st);
+        n = sendfile(dst, src, 0, st.st_size);
+        if (n != st.st_size) {
+                fprintf(stderr, "fail in copy\n");
+                fail(store, io);
+                goto out;
+        }
+
+        if (n < 0) {
+                fprintf(stderr, "fail in cp\n");
+                fail(store, io);
+                goto out;
+        }
+
+        complete(store, io);
+
+out:
+        close(src);
+}
+
+static void handle_delete(struct store *store, struct io *io)
+{
+       struct xseg_request *req = io->req;
+       int fd;
+       char *target = xseg_get_target(store->xseg, req);
+       
+       fd = dir_open(store, io, target, req->targetlen, 0);
+       if (fd < 0) {
+               fprintf(stderr, "fail in dir_open\n");
+               fail(store, io);
+               return;
+       }
+
+       /* 'invalidate' cache entry */
+       if (io->fdcacheidx >= 0) {
+               store->fdcache[io->fdcacheidx].fd = -1;
+       }
+
+       close(fd);
+       char buf[MAX_FILENAME_SIZE + 1];
+       strncpy(buf, target, req->targetlen);
+       buf[req->targetlen] = 0;
+       unlinkat(store->dirfd, buf, 0);
+
+       complete(store, io);
+
+       return;
+}
+
 static void dispatch(struct store *store, struct io *io)
 {
        if (verbose)
@@ -415,6 +521,10 @@ static void dispatch(struct store *store, struct io *io)
                handle_read_write(store, io); break;
        case X_INFO:
                handle_info(store, io); break;
+       case X_DELETE:
+               handle_delete(store, io); break;
+       case X_COPY:
+               handle_copy(store, io); break;
        case X_SYNC:
        default:
                handle_unknown(store, io);
@@ -452,7 +562,7 @@ void *io_loop(void *arg)
 
        for (;;) {
                accepted = NULL;
-               accepted = xseg_accept(xseg, portno);
+               accepted = xseg_accept(xseg, portno, 0);
                if (accepted) {
                        io->req = accepted;
                        wake_up_next_iothread(store);
@@ -596,7 +706,7 @@ malloc_fail:
        if (!store->xseg)
                return -1;
 
-       store->xport = xseg_bind_port(store->xseg, portno);
+       store->xport = xseg_bind_port(store->xseg, portno, NULL);
        if (!store->xport) {
                printf("cannot bind to port %ld\n", portno);
                return -1;
@@ -606,6 +716,11 @@ malloc_fail:
        printf("filed on port %u/%u\n",
                store->portno, store->xseg->config.nr_ports);
 
+       if (xseg_init_local_signal(store->xseg, store->portno) < 0){
+               printf("cannot int local signals\n");
+               return -1;
+       }
+
        for (i = 0; i < nr_ops; i++) {
                pthread_cond_init(&store->fdcache[i].cond, NULL);
                store->fdcache[i].flags = READY;