add volume creation capability to mapper
[archipelago] / xseg / peers / user / filed.c
index 901b508..b9666d4 100644 (file)
@@ -12,6 +12,9 @@
 #include <limits.h>
 #include <xseg/xseg.h>
 #include <pthread.h>
+#include <xseg/protocol.h>
+#include <sys/sendfile.h>
+
 
 #define MAX_PATH_SIZE 255
 #define MAX_FILENAME_SIZE 255
@@ -134,24 +137,26 @@ static inline void free_io(struct store *store, struct io *io)
 static void complete(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_SERVED;
        if (verbose)
                log_io("complete", io);
-       while (xseg_respond(store->xseg, req->portno, req) == Noneidx)
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
                ;
-       xseg_signal(store->xseg, req->portno);
+       xseg_signal(store->xseg, p);
        __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
 }
 
 static void fail(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_FAILED;
        if (verbose)
                log_io("fail", io);
-       while (xseg_respond(store->xseg, req->portno, req) == Noneidx)
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
                ;
-       xseg_signal(store->xseg, req->portno);
+       xseg_signal(store->xseg, p);
        if (io->fdcacheidx >= 0) {
                __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
        }
@@ -421,6 +426,90 @@ static void handle_info(struct store *store, struct io *io)
        complete(store, io);
 }
 
+static void handle_copy(struct store *store, struct io *io)
+{
+        struct xseg_request *req = io->req;
+        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
+        struct stat st;
+        int n, src, dst;
+       char buf[XSEG_MAX_TARGETLEN+1];
+       char *target = xseg_get_target(store->xseg, req);
+
+        dst = dir_open(store, io, target, req->targetlen, 1);
+        if (dst < 0) {
+                fprintf(stderr, "fail in dst\n");
+                fail(store, io);
+                return;
+        }
+
+       strncpy(buf, xcopy->target, xcopy->targetlen);
+       buf[xcopy->targetlen] = 0;
+       src = openat(store->dirfd, buf, O_RDWR);        
+        if (src < 0) {
+               if (errno == ENOENT){
+                       src = openat(store->dirfd, buf, 
+                                       O_RDWR | O_CREAT, 0600);
+                       if (src < 0 ) {
+                               fprintf(stderr, "fail in src\n");
+                               fail(store, io);
+                               return;
+                       }       
+               } else {
+                       fprintf(stderr, "fail in src\n");
+                       fail(store, io);
+                       return;
+               }
+        }
+
+        fstat(src, &st);
+        n = sendfile(dst, src, 0, st.st_size);
+        if (n != st.st_size) {
+                fprintf(stderr, "fail in copy\n");
+                fail(store, io);
+                goto out;
+        }
+
+        if (n < 0) {
+                fprintf(stderr, "fail in cp\n");
+                fail(store, io);
+                goto out;
+        }
+
+        complete(store, io);
+
+out:
+        close(src);
+}
+
+static void handle_delete(struct store *store, struct io *io)
+{
+       struct xseg_request *req = io->req;
+       int fd;
+       char *target = xseg_get_target(store->xseg, req);
+       
+       fd = dir_open(store, io, target, req->targetlen, 0);
+       if (fd < 0) {
+               fprintf(stderr, "fail in dir_open\n");
+               fail(store, io);
+               return;
+       }
+
+       /* 'invalidate' cache entry */
+       if (io->fdcacheidx >= 0) {
+               store->fdcache[io->fdcacheidx].fd = -1;
+       }
+
+       close(fd);
+       char buf[MAX_FILENAME_SIZE + 1];
+       strncpy(buf, target, req->targetlen);
+       buf[req->targetlen] = 0;
+       unlinkat(store->dirfd, buf, 0);
+
+       complete(store, io);
+
+       return;
+}
+
 static void dispatch(struct store *store, struct io *io)
 {
        if (verbose)
@@ -432,6 +521,10 @@ static void dispatch(struct store *store, struct io *io)
                handle_read_write(store, io); break;
        case X_INFO:
                handle_info(store, io); break;
+       case X_DELETE:
+               handle_delete(store, io); break;
+       case X_COPY:
+               handle_copy(store, io); break;
        case X_SYNC:
        default:
                handle_unknown(store, io);
@@ -469,7 +562,7 @@ void *io_loop(void *arg)
 
        for (;;) {
                accepted = NULL;
-               accepted = xseg_accept(xseg, portno);
+               accepted = xseg_accept(xseg, portno, 0);
                if (accepted) {
                        io->req = accepted;
                        wake_up_next_iothread(store);
@@ -613,7 +706,7 @@ malloc_fail:
        if (!store->xseg)
                return -1;
 
-       store->xport = xseg_bind_port(store->xseg, portno);
+       store->xport = xseg_bind_port(store->xseg, portno, NULL);
        if (!store->xport) {
                printf("cannot bind to port %ld\n", portno);
                return -1;
@@ -623,6 +716,11 @@ malloc_fail:
        printf("filed on port %u/%u\n",
                store->portno, store->xseg->config.nr_ports);
 
+       if (xseg_init_local_signal(store->xseg, store->portno) < 0){
+               printf("cannot int local signals\n");
+               return -1;
+       }
+
        for (i = 0; i < nr_ops; i++) {
                pthread_cond_init(&store->fdcache[i].cond, NULL);
                store->fdcache[i].flags = READY;