dummy mt-sosd changes commit
[archipelago] / xseg / peers / user / mt-sosd.c
index bffe78b..b6ad780 100644 (file)
@@ -2,27 +2,33 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <xseg/xseg.h>
-#include <mpeer.h>
+#include <peer.h>
 #include <rados/librados.h>
 #include <xseg/protocol.h>
 
 #define MAX_POOL_NAME 64
-#define MAX_OBJ_NAME 256
+#define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
 
 enum rados_state {
        ACCEPTED = 0,
-       PENDING = 1
+       PENDING = 1,
+       READING = 2,
+       WRITING = 3
 };
 
 struct radosd {
        rados_t cluster;
        rados_ioctx_t ioctx;
-       char pool[MAX_POOL_NAME];
+       char pool[MAX_POOL_NAME + 1];
 };
 
 struct rados_io{
-       char obj_name[MAX_OBJ_NAME];
+       char obj_name[MAX_OBJ_NAME + 1];
        enum rados_state state;
+       uint64_t size;
+       char *src_name, *buf;
+       uint64_t read;
+
 };
 
 void rados_ack_cb(rados_completion_t c, void *arg)
@@ -96,47 +102,113 @@ int handle_delete(struct peerd *peer, struct peer_req *pr)
        int r;
        struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       
-       //log_pr("delete start", pr);
+/*     
+       XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
        r = rados_remove(rados->ioctx, rio->obj_name);
        if (r < 0) {
                pr->retval = r;
+               XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
                fail(peer, pr);
        }
        else {
                pr->retval = 0;
+               XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
                complete(peer, pr);
        }
+
+       return 0;
+*/
+       if (rio->state == ACCEPTED) {
+               XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
+               rados_completion_t rados_compl;
+               r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+               if (r < 0){ 
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+                       return 0;
+               }
+               r = rados_aio_remove(rados->ioctx, rio->obj_name, rados_compl); 
+               if (r < 0) {
+                       rados_aio_release(rados_compl);
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+                       return 0;
+               }
+               rio->state = PENDING;
+       }
+       else {
+               if (pr->retval < 0){
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+               } 
+               else {
+                       XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
+                       complete(peer, pr);
+               }
+       }
        return 0;
 }
 
 int handle_info(struct peerd *peer, struct peer_req *pr)
 {
-       uint64_t size;
-       time_t pmtime;
        int r;
        struct xseg_request *req = pr->req;
        struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *req_data = xseg_get_data(peer->xseg, req);
-       struct xseg_reply_info *xinfo = req_data;
-
-       log_pr("info start", pr);
-       
+       struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
+/*
+       uint64_t size;
+       time_t pmtime;
+       XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);  
        r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
        if (r < 0) {
                pr->retval = r;
+               XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
                fail(peer, pr);
        }
        else {
                xinfo->size = size;
                pr->retval = sizeof(uint64_t);
+               XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
                complete(peer,pr);
        }
        return 0;
+*/     
+       if (rio->state == ACCEPTED) {
+               XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);  
+               rados_completion_t rados_compl;
+               r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+               if (r < 0){ 
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+                       return 0;
+               }
+               r = rados_aio_stat(rados->ioctx, rio->obj_name, rados_compl, &rio->size, NULL); 
+               if (r < 0) {
+                       rados_aio_release(rados_compl);
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+                       return 0;
+               }
+               rio->state = PENDING;
+       }
+       else {
+               if (pr->retval < 0){
+                       xinfo->size = 0;
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+               } 
+               else {
+                       xinfo->size = rio->size;
+                       pr->retval = sizeof(uint64_t);
+                       XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
+                       complete(peer, pr);
+               }
+       }
+       return 0;
 }
 
-//FIXME req->state no longer apply
 int handle_read(struct peerd *peer, struct peer_req *pr)
 {
        struct rados_io *rio = (struct rados_io *) (pr->priv);
@@ -149,34 +221,40 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                }
                //should we ensure req->op = X_READ ?
                rio->state = PENDING;
-               log_pr("read", pr);
+               XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
                        fail(peer, pr);
                }
        }
        else if (rio->state == PENDING) {
+               XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
                data = xseg_get_data(peer->xseg, pr->req);
                if (pr->retval > 0) 
                        req->serviced += pr->retval;
                else if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest", 
+                                               rio->obj_name, (unsigned long long) req->serviced);
                        /* reached end of object. zero out rest of data
                         * requested from this object
                         */
-                       memset(data, 0, req->datalen - req->serviced);
+                       memset(data + req->serviced, 0, req->datalen - req->serviced);
                        req->serviced = req->datalen ;
                }
                else if (pr->retval == -2) {
+                       XSEGLOG2(&lc, I, "Reading of %s return -2. Zeroing out data", rio->obj_name);
                        /* object not found. return zeros instead */
                        memset(data, 0, req->datalen);
                        req->serviced = req->datalen;
                }
                else {
+                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
                        /* pr->retval < 0 && pr->retval != -2 */
                        fail(peer, pr);
                        return 0;
                }
                if (req->serviced >= req->datalen) {
-                       log_pr("read complete", pr);
+                       XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
                        complete(peer, pr);
                        return 0;
                }
@@ -189,8 +267,9 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                //TODO assert req->op == X_READ
                
                /* resubmit */
-               log_pr("read resubmit", pr);
+               XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
                        fail(peer, pr);
                }
        }
@@ -220,8 +299,9 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
                }
                //should we ensure req->op = X_READ ?
                rio->state = PENDING;
-               //log_pr("write", pr);
+               XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
                if (do_aio_write(peer, pr) < 0) {
+                       XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
                        fail(peer, pr);
                }
        }
@@ -229,13 +309,15 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
                /* rados writes return 0 if write succeeded or < 0 if failed
                 * no resubmission occurs
                 */
-               //log_pr("write complete", pr);
+               XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
                if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
                        req->serviced = req->datalen;
                        complete(peer, pr);
                        return 0;
                }
                else {
+                       XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
                        fail(peer, pr);
                        return 0;
                }
@@ -250,13 +332,14 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
 
 int handle_copy(struct peerd *peer, struct peer_req *pr)
 {
+       /*
        struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        int r, sum;
-       char *buf, src_name[MAX_OBJ_NAME];
+       char *buf, src_name[MAX_OBJ_NAME + 1];
        struct xseg_request_copy *xcopy = xseg_get_data(peer->xseg, req);
-       unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : xcopy->targetlen;
+       unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
 
        strncpy(src_name, xcopy->target, end);
        src_name[end] = 0;
@@ -267,32 +350,185 @@ int handle_copy(struct peerd *peer, struct peer_req *pr)
                fail(peer, pr);
                return -1;
        }
+       XSEGLOG2(&lc, I, "Copy of object %s to object %s started", src_name, rio->obj_name);
        sum = 0;
        do {
-               r = rados_read(rados->ioctx, rio->obj_name, buf, req->size, 0);
-               if (r < 0) 
+               r = rados_read(rados->ioctx, src_name, buf, req->size, 0);
+               if (r < 0){
+                       XSEGLOG2(&lc, E, "Read of object %s failed", src_name);
                        goto out_fail;
+               }
                else if (r == 0) {
                        memset(buf+r, 0, req->size - r);
                        sum = req->size;
                } else 
                        sum += r;
        } while (sum < req->size);
+       XSEGLOG2(&lc, D, "Read of object %s Completed", src_name);
 
        r = rados_write_full(rados->ioctx, rio->obj_name, buf, req->size);
-       if (r < 0)
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Write of object %s failed", rio->obj_name);
                goto out_fail;
+       }
        
+       free(buf);
        req->serviced = req->size;
+       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", src_name, rio->obj_name);
+       complete(peer, pr);
        return 0;
 
 out_fail:
        free(buf);
        pr->retval = -1;
+       XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", src_name, rio->obj_name);
        fail(peer, pr);
        return 0;
+       */
+
+
+       struct radosd *rados = (struct radosd *) peer->priv;
+       struct xseg_request *req = pr->req;
+       struct rados_io *rio = (struct rados_io *) pr->priv;
+       int r;
+       struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
+
+       if (rio->state == ACCEPTED){
+               XSEGLOG2(&lc, I, "Copy of object %s to object %s started", rio->src_name, rio->obj_name);
+               if (!req->size) {
+                       complete(peer, pr); //or fail?
+                       return 0;
+               }
+               rio->src_name = malloc(MAX_OBJ_NAME + 1);
+               if (!rio->src_name){
+                       fail(peer, pr);
+                       return -1;
+               }
+               unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
+               strncpy(rio->src_name, xcopy->target, end);
+               rio->src_name[end] = 0;
+               req->serviced = 0;
+               rio->buf = malloc(req->size);
+               if (!rio->buf) {
+                       r = -1;
+                       goto out_src;
+               }
+               XSEGLOG2(&lc, I, "Copy of object %s to object %s started", rio->src_name, rio->obj_name);
+               rio->state = READING;
+               rio->read = 0;
+               XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
+               if (do_aio_read(peer, pr, rio->src_name, rio->buf) < 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       fail(peer, pr);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else if (rio->state == READING){
+               XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
+               if (pr->retval > 0) 
+                       rio->read += pr->retval;
+               else if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest", 
+                                               rio->obj_name, (unsigned long long) req->serviced);
+                       memset(rio->buf + rio->read, 0, req->size - rio->read);
+                       rio->read = req->size ;
+               }
+               else {
+                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
+               if (rio->read >= req->size) {
+                       XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
+                       //do_aio_write
+                       rio->state = WRITING;
+                       XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
+                       if (do_aio_write(peer, pr, rio->obj_name, rio->buf) < 0) {
+                               XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
+                               r = -1;
+                               goto out_buf;
+                       }
+                       return 0;
+               }
+
+               if (!req->size) {
+                       r = -1;
+                       goto out_buf;
+               }
+               XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
+               if (do_aio_read(peer, pr, rio->src_name, rio->buf + rio->read, req->size - rio->read) < 0) {
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else if (rio->state == WRITING){
+               XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
+               if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
+                       XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
+                       req->serviced = req->size;
+                       r = 0;
+                       goto out_buf;
+               }
+               else {
+                       XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else {
+               XSEGLOG2(&lc, E, "Unknown state");
+       }
+       return 0;
+
+
+out_buf:
+       free(rio->buf);
+out_src:
+       free(rio->src_name);
+
+       rio->buf = NULL;
+       rio->src_name = NULL;
+       rio->read = 0;
+
+       if (r < 0)
+               fail(peer ,pr);
+       else 
+               complete(peer, pr);
+       return 0;
 }
 
+int handle_open(struct peerd *peer, struct peer_req *pr)
+{
+       struct radosd *rados = (struct radosd *) peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+       int r = rados_lock(rados->ioctx, rio->obj_name);
+       if (r < 0){
+               fail(peer, pr);
+       }
+       else {
+               complete(peer, pr);
+       }
+       return 0;
+}
+
+
+int handle_close(struct peerd *peer, struct peer_req *pr)
+{
+       struct radosd *rados = (struct radosd *) peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+       int r = rados_unlock(rados->ioctx, rio->obj_name);
+       if (r < 0){
+               fail(peer, pr);
+       }
+       else {
+               complete(peer, pr);
+       }
+       return 0;
+}
 
 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 {
@@ -303,8 +539,20 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                perror("malloc");
                return -1;
        }
-       //TODO this should be a parameter. maybe -r (from rados)?
-       strncpy(rados->pool, "xseg", MAX_POOL_NAME);
+       rados->pool[0] = 0;
+       for (i = 0; i < argc; i++) {
+               if (!strcmp(argv[i], "--pool") && (i+1) < argc){
+                       strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
+                       rados->pool[MAX_POOL_NAME] = 0;
+                       i += 1;
+                       continue;
+               }
+       }
+       if (!rados->pool[0]){
+               free(rados);
+               return -1;
+       }
+
        if (rados_create(&rados->cluster, NULL) < 0) {
                printf("Rados create failed!\n");
                return -1;
@@ -363,7 +611,7 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
 {
        struct rados_io *rio = (struct rados_io *) (pr->priv);
        char *target = xseg_get_target(peer->xseg, pr->req);
-       unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : pr->req->targetlen;
+       unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
        strncpy(rio->obj_name, target, end);
        rio->obj_name[end] = 0;
        //log_pr("dispatch", pr);
@@ -393,6 +641,11 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
                        else
                                handle_copy(peer, pr);
                        break;
+               case X_OPEN:
+                       handle_open(peer, pr); break;
+               case X_CLOSE:
+                       handle_close(peer, pr); break;
+
                default:
                        fail(peer, pr);
        }