add stdout, stderr redirection of peers to logfile
[archipelago] / xseg / peers / user / mt-sosd.c
index b6ad780..43b3389 100644 (file)
@@ -51,50 +51,74 @@ void rados_commit_cb(rados_completion_t c, void *arg)
        dispatch(peer, pr, pr->req, dispatch_internal);
 }
 
-int do_aio_read(struct peerd *peer, struct peer_req *pr)
+static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
+               char *target, char *buf, uint64_t size, uint64_t offset)
 {
        struct radosd *rados = (struct radosd *) peer->priv;
-       struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       char *data = xseg_get_data(peer->xseg, pr->req);
        int r;
 
        rados_completion_t rados_compl;
-       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
-       if (r < 0) 
-               return -1;
-       r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl, 
-                       data + req->serviced,
-                       req->size - req->serviced,
-                       req->offset + req->serviced);
+       switch (op) {
+               case X_READ:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_read(rados->ioctx, target, rados_compl,
+                                       buf, size, offset);
+                       break;
+               case X_WRITE:
+                       r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_write(rados->ioctx, target, rados_compl,
+                                       buf, size, offset);
+                       break;
+               case X_DELETE:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_remove(rados->ioctx, target, rados_compl);
+                       break;
+               case X_INFO:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
+                       break;
+               default:
+                       return -1;
+                       break;
+       }
        if (r < 0) {
                rados_aio_release(rados_compl);
-               return -1;
        }
-       return 0;
+       return r;
 }
 
-int do_aio_write(struct peerd *peer, struct peer_req *pr)
+static int do_aio_read(struct peerd *peer, struct peer_req *pr)
+{
+       struct xseg_request *req = pr->req;
+       struct rados_io *rio = (struct rados_io *) pr->priv;
+       char *data = xseg_get_data(peer->xseg, pr->req);
+
+       return do_aio_generic(peer, pr, X_READ, rio->obj_name,
+                       data + req->serviced,
+                       req->size - req->serviced,
+                       req->offset + req->serviced);
+}
+
+static int do_aio_write(struct peerd *peer, struct peer_req *pr)
 {
-       struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *data = xseg_get_data(peer->xseg, pr->req);
        int r;
 
-       rados_completion_t rados_compl;
-       r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
-       if (r < 0) 
-               return -1;
-       r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl, 
+       return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
                        data + req->serviced,
                        req->size - req->serviced,
                        req->offset + req->serviced);
-       if (r < 0) {
-               rados_aio_release(rados_compl);
-               return -1;
-       }
-       return 0;
 }
 
 int handle_delete(struct peerd *peer, struct peer_req *pr)
@@ -102,47 +126,23 @@ int handle_delete(struct peerd *peer, struct peer_req *pr)
        int r;
        struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-/*     
-       XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
-       r = rados_remove(rados->ioctx, rio->obj_name);
-       if (r < 0) {
-               pr->retval = r;
-               XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
-               fail(peer, pr);
-       }
-       else {
-               pr->retval = 0;
-               XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
-               complete(peer, pr);
-       }
 
-       return 0;
-*/
        if (rio->state == ACCEPTED) {
                XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
-               rados_completion_t rados_compl;
-               r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
-               if (r < 0){ 
-                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
-                       fail(peer, pr);
-                       return 0;
-               }
-               r = rados_aio_remove(rados->ioctx, rio->obj_name, rados_compl); 
+               rio->state = PENDING;
+               r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
                if (r < 0) {
-                       rados_aio_release(rados_compl);
                        XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
                        fail(peer, pr);
-                       return 0;
                }
-               rio->state = PENDING;
        }
        else {
                if (pr->retval < 0){
                        XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
                        fail(peer, pr);
-               } 
+               }
                else {
-                       XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
                        complete(peer, pr);
                }
        }
@@ -157,48 +157,22 @@ int handle_info(struct peerd *peer, struct peer_req *pr)
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *req_data = xseg_get_data(peer->xseg, req);
        struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
-/*
-       uint64_t size;
-       time_t pmtime;
-       XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);  
-       r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
-       if (r < 0) {
-               pr->retval = r;
-               XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
-               fail(peer, pr);
-       }
-       else {
-               xinfo->size = size;
-               pr->retval = sizeof(uint64_t);
-               XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
-               complete(peer,pr);
-       }
-       return 0;
-*/     
+
        if (rio->state == ACCEPTED) {
-               XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);  
-               rados_completion_t rados_compl;
-               r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
-               if (r < 0){ 
-                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
-                       fail(peer, pr);
-                       return 0;
-               }
-               r = rados_aio_stat(rados->ioctx, rio->obj_name, rados_compl, &rio->size, NULL); 
+               XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
+               rio->state = PENDING;
+               r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
                if (r < 0) {
-                       rados_aio_release(rados_compl);
                        XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
                        fail(peer, pr);
-                       return 0;
                }
-               rio->state = PENDING;
        }
        else {
                if (pr->retval < 0){
                        xinfo->size = 0;
                        XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
                        fail(peer, pr);
-               } 
+               }
                else {
                        xinfo->size = rio->size;
                        pr->retval = sizeof(uint64_t);
@@ -219,30 +193,32 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                        complete(peer, pr);
                        return 0;
                }
-               //should we ensure req->op = X_READ ?
-               rio->state = PENDING;
+               rio->state = READING;
                XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
-                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
+                                               rio->obj_name);
                        fail(peer, pr);
                }
        }
-       else if (rio->state == PENDING) {
+       else if (rio->state == READING) {
                XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
                data = xseg_get_data(peer->xseg, pr->req);
-               if (pr->retval > 0) 
+               if (pr->retval > 0)
                        req->serviced += pr->retval;
                else if (pr->retval == 0) {
-                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest", 
-                                               rio->obj_name, (unsigned long long) req->serviced);
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
+                               "%llu bytes. Zeroing out rest", rio->obj_name,
+                               (unsigned long long) req->serviced);
                        /* reached end of object. zero out rest of data
                         * requested from this object
                         */
                        memset(data + req->serviced, 0, req->datalen - req->serviced);
-                       req->serviced = req->datalen ;
+                       req->serviced = req->datalen;
                }
                else if (pr->retval == -2) {
-                       XSEGLOG2(&lc, I, "Reading of %s return -2. Zeroing out data", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Reading of %s return -2. "
+                                       "Zeroing out data", rio->obj_name);
                        /* object not found. return zeros instead */
                        memset(data, 0, req->datalen);
                        req->serviced = req->datalen;
@@ -264,12 +240,11 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                        fail(peer, pr);
                        return 0;
                }
-               //TODO assert req->op == X_READ
-               
                /* resubmit */
                XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
-                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
+                                       rio->obj_name);
                        fail(peer, pr);
                }
        }
@@ -298,14 +273,15 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
                        }
                }
                //should we ensure req->op = X_READ ?
-               rio->state = PENDING;
+               rio->state = WRITING;
                XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
                if (do_aio_write(peer, pr) < 0) {
-                       XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
+                                       rio->obj_name);
                        fail(peer, pr);
                }
        }
-       else if (rio->state == PENDING) {
+       else if (rio->state == WRITING) {
                /* rados writes return 0 if write succeeded or < 0 if failed
                 * no resubmission occurs
                 */
@@ -332,61 +308,6 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
 
 int handle_copy(struct peerd *peer, struct peer_req *pr)
 {
-       /*
-       struct radosd *rados = (struct radosd *) peer->priv;
-       struct xseg_request *req = pr->req;
-       struct rados_io *rio = (struct rados_io *) pr->priv;
-       int r, sum;
-       char *buf, src_name[MAX_OBJ_NAME + 1];
-       struct xseg_request_copy *xcopy = xseg_get_data(peer->xseg, req);
-       unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
-
-       strncpy(src_name, xcopy->target, end);
-       src_name[end] = 0;
-
-       req->serviced = 0;
-       buf = malloc(req->size);
-       if (!buf) {
-               fail(peer, pr);
-               return -1;
-       }
-       XSEGLOG2(&lc, I, "Copy of object %s to object %s started", src_name, rio->obj_name);
-       sum = 0;
-       do {
-               r = rados_read(rados->ioctx, src_name, buf, req->size, 0);
-               if (r < 0){
-                       XSEGLOG2(&lc, E, "Read of object %s failed", src_name);
-                       goto out_fail;
-               }
-               else if (r == 0) {
-                       memset(buf+r, 0, req->size - r);
-                       sum = req->size;
-               } else 
-                       sum += r;
-       } while (sum < req->size);
-       XSEGLOG2(&lc, D, "Read of object %s Completed", src_name);
-
-       r = rados_write_full(rados->ioctx, rio->obj_name, buf, req->size);
-       if (r < 0){
-               XSEGLOG2(&lc, E, "Write of object %s failed", rio->obj_name);
-               goto out_fail;
-       }
-       
-       free(buf);
-       req->serviced = req->size;
-       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", src_name, rio->obj_name);
-       complete(peer, pr);
-       return 0;
-
-out_fail:
-       free(buf);
-       pr->retval = -1;
-       XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", src_name, rio->obj_name);
-       fail(peer, pr);
-       return 0;
-       */
-
-
        struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
@@ -394,30 +315,34 @@ out_fail:
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
 
        if (rio->state == ACCEPTED){
-               XSEGLOG2(&lc, I, "Copy of object %s to object %s started", rio->src_name, rio->obj_name);
+               XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
+                               rio->src_name, rio->obj_name);
                if (!req->size) {
                        complete(peer, pr); //or fail?
                        return 0;
                }
+
                rio->src_name = malloc(MAX_OBJ_NAME + 1);
                if (!rio->src_name){
                        fail(peer, pr);
                        return -1;
                }
+               //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
                unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
                strncpy(rio->src_name, xcopy->target, end);
                rio->src_name[end] = 0;
-               req->serviced = 0;
+
                rio->buf = malloc(req->size);
                if (!rio->buf) {
                        r = -1;
                        goto out_src;
                }
-               XSEGLOG2(&lc, I, "Copy of object %s to object %s started", rio->src_name, rio->obj_name);
+
                rio->state = READING;
                rio->read = 0;
                XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
-               if (do_aio_read(peer, pr, rio->src_name, rio->buf) < 0) {
+               if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
+                       req->size - rio->read, req->offset + rio->read) < 0) {
                        XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
                        fail(peer, pr);
                        r = -1;
@@ -426,25 +351,28 @@ out_fail:
        }
        else if (rio->state == READING){
                XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
-               if (pr->retval > 0) 
+               if (pr->retval > 0)
                        rio->read += pr->retval;
                else if (pr->retval == 0) {
-                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest", 
-                                               rio->obj_name, (unsigned long long) req->serviced);
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
+                               "%llu bytes. Zeroing out rest", rio->obj_name,
+                               (unsigned long long) req->serviced);
                        memset(rio->buf + rio->read, 0, req->size - rio->read);
                        rio->read = req->size ;
                }
                else {
-                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
                        r = -1;
                        goto out_buf;
                }
+
                if (rio->read >= req->size) {
                        XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
                        //do_aio_write
                        rio->state = WRITING;
                        XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
-                       if (do_aio_write(peer, pr, rio->obj_name, rio->buf) < 0) {
+                       if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
+                                       rio->buf, req->size, req->offset) < 0) {
                                XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
                                r = -1;
                                goto out_buf;
@@ -452,13 +380,11 @@ out_fail:
                        return 0;
                }
 
-               if (!req->size) {
-                       r = -1;
-                       goto out_buf;
-               }
                XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
-               if (do_aio_read(peer, pr, rio->src_name, rio->buf + rio->read, req->size - rio->read) < 0) {
-                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
+               if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
+                       req->size - rio->read, req->offset + rio->read) < 0) {
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
+                                       rio->obj_name);
                        r = -1;
                        goto out_buf;
                }
@@ -466,8 +392,8 @@ out_fail:
        else if (rio->state == WRITING){
                XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
                if (pr->retval == 0) {
-                       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
                        XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
                        req->serviced = req->size;
                        r = 0;
                        goto out_buf;
@@ -496,7 +422,7 @@ out_src:
 
        if (r < 0)
                fail(peer ,pr);
-       else 
+       else
                complete(peer, pr);
        return 0;
 }
@@ -549,36 +475,37 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
        if (!rados->pool[0]){
+               XSEGLOG2(&lc, E , "Pool must be provided");
                free(rados);
                return -1;
        }
 
        if (rados_create(&rados->cluster, NULL) < 0) {
-               printf("Rados create failed!\n");
+               XSEGLOG2(&lc, E, "Rados create failed!");
                return -1;
        }
        if (rados_conf_read_file(rados->cluster, NULL) < 0){
-               printf("Error reading rados conf files!\n");
+               XSEGLOG2(&lc, E, "Error reading rados conf files!");
                return -1;
        }
        if (rados_connect(rados->cluster) < 0) {
-               printf("Rados connect failed!\n");
+               XSEGLOG2(&lc, E, "Rados connect failed!");
                rados_shutdown(rados->cluster);
                free(rados);
                return 0;
        }
        if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
-               printf( "Pool does not exists. I will try to create it\n");
+               XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
                if (rados_pool_create(rados->cluster, rados->pool) < 0){
-                       printf("Couldn't create pool!\n");
+                       XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
                        rados_shutdown(rados->cluster);
                        free(rados);
                        return -1;
                }
-               printf("Pool created.\n");
+               XSEGLOG2(&lc, I, "Pool created.");
        }
-       if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
-               printf("ioctx create problem.\n");
+       if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
+               XSEGLOG2(&lc, E, "ioctx create problem.");
                rados_shutdown(rados->cluster);
                free(rados);
                return -1;
@@ -588,6 +515,7 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                rio = malloc(sizeof(struct rados_io));
                if (!rio) {
                        //ugly
+                       //is this really necessary?
                        for (j = 0; j < i; j++) {
                                free(peer->peer_reqs[j].priv);
                        }