add stdout, stderr redirection of peers to logfile
[archipelago] / xseg / peers / user / mt-sosd.c
index 881db94..43b3389 100644 (file)
@@ -2,18 +2,18 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <xseg/xseg.h>
-#include <mpeer.h>
+#include <peer.h>
 #include <rados/librados.h>
 #include <xseg/protocol.h>
 
 #define MAX_POOL_NAME 64
-#define MAX_OBJ_NAME 256
-
-extern struct log_ctx lc;
+#define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
 
 enum rados_state {
        ACCEPTED = 0,
-       PENDING = 1
+       PENDING = 1,
+       READING = 2,
+       WRITING = 3
 };
 
 struct radosd {
@@ -25,6 +25,10 @@ struct radosd {
 struct rados_io{
        char obj_name[MAX_OBJ_NAME + 1];
        enum rados_state state;
+       uint64_t size;
+       char *src_name, *buf;
+       uint64_t read;
+
 };
 
 void rados_ack_cb(rados_completion_t c, void *arg)
@@ -47,50 +51,74 @@ void rados_commit_cb(rados_completion_t c, void *arg)
        dispatch(peer, pr, pr->req, dispatch_internal);
 }
 
-int do_aio_read(struct peerd *peer, struct peer_req *pr)
+static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
+               char *target, char *buf, uint64_t size, uint64_t offset)
 {
        struct radosd *rados = (struct radosd *) peer->priv;
-       struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       char *data = xseg_get_data(peer->xseg, pr->req);
        int r;
 
        rados_completion_t rados_compl;
-       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
-       if (r < 0) 
-               return -1;
-       r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl, 
-                       data + req->serviced,
-                       req->size - req->serviced,
-                       req->offset + req->serviced);
+       switch (op) {
+               case X_READ:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_read(rados->ioctx, target, rados_compl,
+                                       buf, size, offset);
+                       break;
+               case X_WRITE:
+                       r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_write(rados->ioctx, target, rados_compl,
+                                       buf, size, offset);
+                       break;
+               case X_DELETE:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_remove(rados->ioctx, target, rados_compl);
+                       break;
+               case X_INFO:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
+                       break;
+               default:
+                       return -1;
+                       break;
+       }
        if (r < 0) {
                rados_aio_release(rados_compl);
-               return -1;
        }
-       return 0;
+       return r;
 }
 
-int do_aio_write(struct peerd *peer, struct peer_req *pr)
+static int do_aio_read(struct peerd *peer, struct peer_req *pr)
+{
+       struct xseg_request *req = pr->req;
+       struct rados_io *rio = (struct rados_io *) pr->priv;
+       char *data = xseg_get_data(peer->xseg, pr->req);
+
+       return do_aio_generic(peer, pr, X_READ, rio->obj_name,
+                       data + req->serviced,
+                       req->size - req->serviced,
+                       req->offset + req->serviced);
+}
+
+static int do_aio_write(struct peerd *peer, struct peer_req *pr)
 {
-       struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *data = xseg_get_data(peer->xseg, pr->req);
        int r;
 
-       rados_completion_t rados_compl;
-       r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
-       if (r < 0) 
-               return -1;
-       r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl, 
+       return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
                        data + req->serviced,
                        req->size - req->serviced,
                        req->offset + req->serviced);
-       if (r < 0) {
-               rados_aio_release(rados_compl);
-               return -1;
-       }
-       return 0;
 }
 
 int handle_delete(struct peerd *peer, struct peer_req *pr)
@@ -98,51 +126,63 @@ int handle_delete(struct peerd *peer, struct peer_req *pr)
        int r;
        struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       
-       //log_pr("delete start", pr);
-       XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
-       r = rados_remove(rados->ioctx, rio->obj_name);
-       if (r < 0) {
-               pr->retval = r;
-               XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
-               fail(peer, pr);
+
+       if (rio->state == ACCEPTED) {
+               XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
+               rio->state = PENDING;
+               r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+               }
        }
        else {
-               pr->retval = 0;
-               XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
-               complete(peer, pr);
+               if (pr->retval < 0){
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+               }
+               else {
+                       XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
+                       complete(peer, pr);
+               }
        }
        return 0;
 }
 
 int handle_info(struct peerd *peer, struct peer_req *pr)
 {
-       uint64_t size;
-       time_t pmtime;
        int r;
        struct xseg_request *req = pr->req;
        struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *req_data = xseg_get_data(peer->xseg, req);
-       struct xseg_reply_info *xinfo = req_data;
+       struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
 
-       XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);  
-       r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
-       if (r < 0) {
-               pr->retval = r;
-               XSEGLOG2(&lc, I, "Getting info of %s failed", rio->obj_name);   
-               fail(peer, pr);
+       if (rio->state == ACCEPTED) {
+               XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
+               rio->state = PENDING;
+               r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+               }
        }
        else {
-               xinfo->size = size;
-               pr->retval = sizeof(uint64_t);
-               XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
-               complete(peer,pr);
+               if (pr->retval < 0){
+                       xinfo->size = 0;
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+               }
+               else {
+                       xinfo->size = rio->size;
+                       pr->retval = sizeof(uint64_t);
+                       XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
+                       complete(peer, pr);
+               }
        }
        return 0;
 }
 
-//FIXME req->state no longer apply
 int handle_read(struct peerd *peer, struct peer_req *pr)
 {
        struct rados_io *rio = (struct rados_io *) (pr->priv);
@@ -153,30 +193,32 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                        complete(peer, pr);
                        return 0;
                }
-               //should we ensure req->op = X_READ ?
-               rio->state = PENDING;
+               rio->state = READING;
                XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
-                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
+                                               rio->obj_name);
                        fail(peer, pr);
                }
        }
-       else if (rio->state == PENDING) {
+       else if (rio->state == READING) {
                XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
                data = xseg_get_data(peer->xseg, pr->req);
-               if (pr->retval > 0) 
+               if (pr->retval > 0)
                        req->serviced += pr->retval;
                else if (pr->retval == 0) {
-                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest", 
-                                               rio->obj_name, (unsigned long long) req->serviced);
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
+                               "%llu bytes. Zeroing out rest", rio->obj_name,
+                               (unsigned long long) req->serviced);
                        /* reached end of object. zero out rest of data
                         * requested from this object
                         */
                        memset(data + req->serviced, 0, req->datalen - req->serviced);
-                       req->serviced = req->datalen ;
+                       req->serviced = req->datalen;
                }
                else if (pr->retval == -2) {
-                       XSEGLOG2(&lc, I, "Reading of %s return -2. Zeroing out data", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Reading of %s return -2. "
+                                       "Zeroing out data", rio->obj_name);
                        /* object not found. return zeros instead */
                        memset(data, 0, req->datalen);
                        req->serviced = req->datalen;
@@ -198,12 +240,11 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                        fail(peer, pr);
                        return 0;
                }
-               //TODO assert req->op == X_READ
-               
                /* resubmit */
                XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
-                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
+                                       rio->obj_name);
                        fail(peer, pr);
                }
        }
@@ -232,14 +273,15 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
                        }
                }
                //should we ensure req->op = X_READ ?
-               rio->state = PENDING;
+               rio->state = WRITING;
                XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
                if (do_aio_write(peer, pr) < 0) {
-                       XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
+                                       rio->obj_name);
                        fail(peer, pr);
                }
        }
-       else if (rio->state == PENDING) {
+       else if (rio->state == WRITING) {
                /* rados writes return 0 if write succeeded or < 0 if failed
                 * no resubmission occurs
                 */
@@ -251,7 +293,7 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
                        return 0;
                }
                else {
-                       XSEGLOG2(&lc, I, "Writing of %s failed", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
                        fail(peer, pr);
                        return 0;
                }
@@ -269,68 +311,148 @@ int handle_copy(struct peerd *peer, struct peer_req *pr)
        struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       int r, sum;
-       char *buf, src_name[MAX_OBJ_NAME + 1];
-       struct xseg_request_copy *xcopy = xseg_get_data(peer->xseg, req);
-       unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
+       int r;
+       struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
+
+       if (rio->state == ACCEPTED){
+               XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
+                               rio->src_name, rio->obj_name);
+               if (!req->size) {
+                       complete(peer, pr); //or fail?
+                       return 0;
+               }
 
-       strncpy(src_name, xcopy->target, end);
-       src_name[end] = 0;
+               rio->src_name = malloc(MAX_OBJ_NAME + 1);
+               if (!rio->src_name){
+                       fail(peer, pr);
+                       return -1;
+               }
+               //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
+               unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
+               strncpy(rio->src_name, xcopy->target, end);
+               rio->src_name[end] = 0;
 
-       req->serviced = 0;
-       buf = malloc(req->size);
-       if (!buf) {
-               fail(peer, pr);
-               return -1;
+               rio->buf = malloc(req->size);
+               if (!rio->buf) {
+                       r = -1;
+                       goto out_src;
+               }
+
+               rio->state = READING;
+               rio->read = 0;
+               XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
+               if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
+                       req->size - rio->read, req->offset + rio->read) < 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       fail(peer, pr);
+                       r = -1;
+                       goto out_buf;
+               }
        }
-       XSEGLOG2(&lc, I, "Copy of object %s to object %s started", src_name, rio->obj_name);
-       sum = 0;
-       do {
-               r = rados_read(rados->ioctx, src_name, buf, req->size, 0);
-               if (r < 0){
-                       XSEGLOG2(&lc, E, "Read of object %s failed", src_name);
-                       goto out_fail;
+       else if (rio->state == READING){
+               XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
+               if (pr->retval > 0)
+                       rio->read += pr->retval;
+               else if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
+                               "%llu bytes. Zeroing out rest", rio->obj_name,
+                               (unsigned long long) req->serviced);
+                       memset(rio->buf + rio->read, 0, req->size - rio->read);
+                       rio->read = req->size ;
                }
-               else if (r == 0) {
-                       memset(buf+r, 0, req->size - r);
-                       sum = req->size;
-               } else 
-                       sum += r;
-       } while (sum < req->size);
-       XSEGLOG2(&lc, D, "Read of object %s Completed", src_name);
-
-       r = rados_write_full(rados->ioctx, rio->obj_name, buf, req->size);
-       if (r < 0){
-               XSEGLOG2(&lc, E, "Read of object %s failed", rio->obj_name);
-               goto out_fail;
+               else {
+                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
+                       r = -1;
+                       goto out_buf;
+               }
+
+               if (rio->read >= req->size) {
+                       XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
+                       //do_aio_write
+                       rio->state = WRITING;
+                       XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
+                       if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
+                                       rio->buf, req->size, req->offset) < 0) {
+                               XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
+                               r = -1;
+                               goto out_buf;
+                       }
+                       return 0;
+               }
+
+               XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
+               if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
+                       req->size - rio->read, req->offset + rio->read) < 0) {
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
+                                       rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else if (rio->state == WRITING){
+               XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
+               if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
+                       req->serviced = req->size;
+                       r = 0;
+                       goto out_buf;
+               }
+               else {
+                       XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else {
+               XSEGLOG2(&lc, E, "Unknown state");
        }
-       
-       free(buf);
-       req->serviced = req->size;
-       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", src_name, rio->obj_name);
-       complete(peer, pr);
        return 0;
 
-out_fail:
-       free(buf);
-       pr->retval = -1;
-       XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", src_name, rio->obj_name);
-       fail(peer, pr);
+
+out_buf:
+       free(rio->buf);
+out_src:
+       free(rio->src_name);
+
+       rio->buf = NULL;
+       rio->src_name = NULL;
+       rio->read = 0;
+
+       if (r < 0)
+               fail(peer ,pr);
+       else
+               complete(peer, pr);
        return 0;
 }
 
 int handle_open(struct peerd *peer, struct peer_req *pr)
 {
-       /* FIXME to be implemented */
-       complete(peer, pr);
+       struct radosd *rados = (struct radosd *) peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+       int r = rados_lock(rados->ioctx, rio->obj_name);
+       if (r < 0){
+               fail(peer, pr);
+       }
+       else {
+               complete(peer, pr);
+       }
        return 0;
 }
 
 
 int handle_close(struct peerd *peer, struct peer_req *pr)
 {
-       /* FIXME to be implemented */
-       complete(peer, pr);
+       struct radosd *rados = (struct radosd *) peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+       int r = rados_unlock(rados->ioctx, rio->obj_name);
+       if (r < 0){
+               fail(peer, pr);
+       }
+       else {
+               complete(peer, pr);
+       }
        return 0;
 }
 
@@ -353,36 +475,37 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
        if (!rados->pool[0]){
+               XSEGLOG2(&lc, E , "Pool must be provided");
                free(rados);
                return -1;
        }
 
        if (rados_create(&rados->cluster, NULL) < 0) {
-               printf("Rados create failed!\n");
+               XSEGLOG2(&lc, E, "Rados create failed!");
                return -1;
        }
        if (rados_conf_read_file(rados->cluster, NULL) < 0){
-               printf("Error reading rados conf files!\n");
+               XSEGLOG2(&lc, E, "Error reading rados conf files!");
                return -1;
        }
        if (rados_connect(rados->cluster) < 0) {
-               printf("Rados connect failed!\n");
+               XSEGLOG2(&lc, E, "Rados connect failed!");
                rados_shutdown(rados->cluster);
                free(rados);
                return 0;
        }
        if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
-               printf( "Pool does not exists. I will try to create it\n");
+               XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
                if (rados_pool_create(rados->cluster, rados->pool) < 0){
-                       printf("Couldn't create pool!\n");
+                       XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
                        rados_shutdown(rados->cluster);
                        free(rados);
                        return -1;
                }
-               printf("Pool created.\n");
+               XSEGLOG2(&lc, I, "Pool created.");
        }
-       if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
-               printf("ioctx create problem.\n");
+       if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
+               XSEGLOG2(&lc, E, "ioctx create problem.");
                rados_shutdown(rados->cluster);
                free(rados);
                return -1;
@@ -392,6 +515,7 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                rio = malloc(sizeof(struct rados_io));
                if (!rio) {
                        //ugly
+                       //is this really necessary?
                        for (j = 0; j < i; j++) {
                                free(peer->peer_reqs[j].priv);
                        }