add improved argument parsing. also add helper messages
[archipelago] / xseg / peers / user / mt-sosd.c
index 757c850..bffc26b 100644 (file)
@@ -2,27 +2,44 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <xseg/xseg.h>
-#include <mpeer.h>
+#include <peer.h>
 #include <rados/librados.h>
 #include <xseg/protocol.h>
+#include <pthread.h>
 
 #define MAX_POOL_NAME 64
-#define MAX_OBJ_NAME 256
+#define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
+
+void custom_peer_usage()
+{
+       fprintf(stderr, "Custom peer options:\n"
+               "--pool: Rados pool to connect\n"
+               "\n");
+}
 
 enum rados_state {
        ACCEPTED = 0,
-       PENDING = 1
+       PENDING = 1,
+       READING = 2,
+       WRITING = 3
 };
 
 struct radosd {
        rados_t cluster;
        rados_ioctx_t ioctx;
-       char pool[MAX_POOL_NAME];
+       char pool[MAX_POOL_NAME + 1];
 };
 
 struct rados_io{
-       char obj_name[MAX_OBJ_NAME];
+       char obj_name[MAX_OBJ_NAME + 1];
        enum rados_state state;
+       uint64_t size;
+       char *src_name, *buf;
+       uint64_t read;
+       uint64_t watch_handle;
+       pthread_t tid;
+       pthread_cond_t cond;
+       pthread_mutex_t m;
 };
 
 void rados_ack_cb(rados_completion_t c, void *arg)
@@ -32,7 +49,7 @@ void rados_ack_cb(rados_completion_t c, void *arg)
        int ret = rados_aio_get_return_value(c);
        pr->retval = ret;
        rados_aio_release(c);
-       dispatch(peer, pr, pr->req, internal);
+       dispatch(peer, pr, pr->req, dispatch_internal);
 }
 
 void rados_commit_cb(rados_completion_t c, void *arg)
@@ -42,140 +59,188 @@ void rados_commit_cb(rados_completion_t c, void *arg)
        int ret = rados_aio_get_return_value(c);
        pr->retval = ret;
        rados_aio_release(c);
-       dispatch(peer, pr, pr->req, internal);
+       dispatch(peer, pr, pr->req, dispatch_internal);
 }
 
-int do_aio_read(struct peerd *peer, struct peer_req *pr)
+static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
+               char *target, char *buf, uint64_t size, uint64_t offset)
 {
        struct radosd *rados = (struct radosd *) peer->priv;
-       struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       char *data = xseg_get_data(peer->xseg, pr->req);
        int r;
 
        rados_completion_t rados_compl;
-       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
-       if (r < 0) 
-               return -1;
-       r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl, 
-                       data + req->serviced,
-                       req->size - req->serviced,
-                       req->offset + req->serviced);
+       switch (op) {
+               case X_READ:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_read(rados->ioctx, target, rados_compl,
+                                       buf, size, offset);
+                       break;
+               case X_WRITE:
+                       r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_write(rados->ioctx, target, rados_compl,
+                                       buf, size, offset);
+                       break;
+               case X_DELETE:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_remove(rados->ioctx, target, rados_compl);
+                       break;
+               case X_INFO:
+                       r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+                       if (r < 0)
+                               return -1;
+                       r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
+                       break;
+               default:
+                       return -1;
+                       break;
+       }
        if (r < 0) {
                rados_aio_release(rados_compl);
-               return -1;
        }
-       return 0;
+       return r;
 }
 
-int do_aio_write(struct peerd *peer, struct peer_req *pr)
+static int do_aio_read(struct peerd *peer, struct peer_req *pr)
 {
-       struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *data = xseg_get_data(peer->xseg, pr->req);
-       int r;
 
-       rados_completion_t rados_compl;
-       r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
-       if (r < 0) 
-               return -1;
-       r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl, 
+       return do_aio_generic(peer, pr, X_READ, rio->obj_name,
+                       data + req->serviced,
+                       req->size - req->serviced,
+                       req->offset + req->serviced);
+}
+
+static int do_aio_write(struct peerd *peer, struct peer_req *pr)
+{
+       struct xseg_request *req = pr->req;
+       struct rados_io *rio = (struct rados_io *) pr->priv;
+       char *data = xseg_get_data(peer->xseg, pr->req);
+
+       return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
                        data + req->serviced,
                        req->size - req->serviced,
                        req->offset + req->serviced);
-       if (r < 0) {
-               rados_aio_release(rados_compl);
-               return -1;
-       }
-       return 0;
 }
 
 int handle_delete(struct peerd *peer, struct peer_req *pr)
 {
        int r;
-       struct radosd *rados = (struct radosd *) peer->priv;
+       //struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       
-       //log_pr("delete start", pr);
-       r = rados_remove(rados->ioctx, rio->obj_name);
-       if (r < 0) {
-               pr->retval = r;
-               fail(peer, pr);
+
+       if (rio->state == ACCEPTED) {
+               XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
+               rio->state = PENDING;
+               r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+               }
        }
        else {
-               pr->retval = 0;
-               complete(peer, pr);
+               if (pr->retval < 0){
+                       XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+                       fail(peer, pr);
+               }
+               else {
+                       XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
+                       complete(peer, pr);
+               }
        }
        return 0;
 }
 
 int handle_info(struct peerd *peer, struct peer_req *pr)
 {
-       uint64_t size;
-       time_t pmtime;
        int r;
        struct xseg_request *req = pr->req;
-       struct radosd *rados = (struct radosd *) peer->priv;
+       //struct radosd *rados = (struct radosd *) peer->priv;
        struct rados_io *rio = (struct rados_io *) pr->priv;
        char *req_data = xseg_get_data(peer->xseg, req);
-       struct xseg_reply_info *xinfo = req_data;
+       struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
 
-       log_pr("info start", pr);
-       
-       r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
-       if (r < 0) {
-               pr->retval = r;
-               fail(peer, pr);
+       if (rio->state == ACCEPTED) {
+               XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
+               rio->state = PENDING;
+               r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+               }
        }
        else {
-               xinfo->size = size;
-               pr->retval = sizeof(uint64_t);
-               complete(peer,pr);
+               if (pr->retval < 0){
+                       xinfo->size = 0;
+                       XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
+                       fail(peer, pr);
+               }
+               else {
+                       xinfo->size = rio->size;
+                       pr->retval = sizeof(uint64_t);
+                       XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
+                       complete(peer, pr);
+               }
        }
        return 0;
 }
 
-//FIXME req->state no longer apply
 int handle_read(struct peerd *peer, struct peer_req *pr)
 {
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
        struct xseg_request *req = pr->req;
        char *data;
-       if (req->state == XS_ACCEPTED) {
+       if (rio->state == ACCEPTED) {
                if (!req->size) {
                        complete(peer, pr);
                        return 0;
                }
-               //should we ensure req->op = X_READ ?
-               pending(peer, pr);
-               log_pr("read", pr);
+               rio->state = READING;
+               XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
+                                               rio->obj_name);
                        fail(peer, pr);
                }
        }
-       else if (req->state == XS_PENDING) {
+       else if (rio->state == READING) {
+               XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
                data = xseg_get_data(peer->xseg, pr->req);
-               if (pr->retval > 0) 
+               if (pr->retval > 0)
                        req->serviced += pr->retval;
                else if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
+                               "%llu bytes. Zeroing out rest", rio->obj_name,
+                               (unsigned long long) req->serviced);
                        /* reached end of object. zero out rest of data
                         * requested from this object
                         */
-                       memset(data, 0, req->datalen - req->serviced);
-                       req->serviced = req->datalen ;
+                       memset(data + req->serviced, 0, req->datalen - req->serviced);
+                       req->serviced = req->datalen;
                }
                else if (pr->retval == -2) {
+                       XSEGLOG2(&lc, I, "Reading of %s return -2. "
+                                       "Zeroing out data", rio->obj_name);
                        /* object not found. return zeros instead */
                        memset(data, 0, req->datalen);
                        req->serviced = req->datalen;
                }
                else {
+                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
                        /* pr->retval < 0 && pr->retval != -2 */
                        fail(peer, pr);
                        return 0;
                }
                if (req->serviced >= req->datalen) {
-                       log_pr("read complete", pr);
+                       XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
                        complete(peer, pr);
                        return 0;
                }
@@ -185,11 +250,11 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
                        fail(peer, pr);
                        return 0;
                }
-               //TODO assert req->op == X_READ
-               
                /* resubmit */
-               log_pr("read resubmit", pr);
+               XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
                if (do_aio_read(peer, pr) < 0) {
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
+                                       rio->obj_name);
                        fail(peer, pr);
                }
        }
@@ -203,8 +268,9 @@ int handle_read(struct peerd *peer, struct peer_req *pr)
 
 int handle_write(struct peerd *peer, struct peer_req *pr)
 {
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
        struct xseg_request *req = pr->req;
-       if (req->state == XS_ACCEPTED) {
+       if (rio->state == ACCEPTED) {
                if (!req->size) {
                        // for future use
                        if (req->flags & XF_FLUSH) {
@@ -217,23 +283,27 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
                        }
                }
                //should we ensure req->op = X_READ ?
-               pending(peer, pr);
-               //log_pr("write", pr);
+               rio->state = WRITING;
+               XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
                if (do_aio_write(peer, pr) < 0) {
+                       XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
+                                       rio->obj_name);
                        fail(peer, pr);
                }
        }
-       else if (req->state == XS_PENDING) {
+       else if (rio->state == WRITING) {
                /* rados writes return 0 if write succeeded or < 0 if failed
                 * no resubmission occurs
                 */
-               //log_pr("write complete", pr);
+               XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
                if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
                        req->serviced = req->datalen;
                        complete(peer, pr);
                        return 0;
                }
                else {
+                       XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
                        fail(peer, pr);
                        return 0;
                }
@@ -248,50 +318,238 @@ int handle_write(struct peerd *peer, struct peer_req *pr)
 
 int handle_copy(struct peerd *peer, struct peer_req *pr)
 {
-       struct radosd *rados = (struct radosd *) peer->priv;
+       //struct radosd *rados = (struct radosd *) peer->priv;
        struct xseg_request *req = pr->req;
        struct rados_io *rio = (struct rados_io *) pr->priv;
-       int r, sum;
-       char *buf, src_name[MAX_OBJ_NAME];
-       struct xseg_request_copy *xcopy = xseg_get_data(peer->xseg, req);
-       unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : xcopy->targetlen;
+       int r;
+       struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
 
-       strncpy(src_name, xcopy->target, end);
-       src_name[end] = 0;
+       if (rio->state == ACCEPTED){
+               XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
+                               rio->src_name, rio->obj_name);
+               if (!req->size) {
+                       complete(peer, pr); //or fail?
+                       return 0;
+               }
 
-       req->serviced = 0;
-       buf = malloc(req->size);
-       if (!buf) {
-               fail(peer, pr);
-               return -1;
+               rio->src_name = malloc(MAX_OBJ_NAME + 1);
+               if (!rio->src_name){
+                       fail(peer, pr);
+                       return -1;
+               }
+               //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
+               unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
+               strncpy(rio->src_name, xcopy->target, end);
+               rio->src_name[end] = 0;
+
+               rio->buf = malloc(req->size);
+               if (!rio->buf) {
+                       r = -1;
+                       goto out_src;
+               }
+
+               rio->state = READING;
+               rio->read = 0;
+               XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
+               if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
+                       req->size - rio->read, req->offset + rio->read) < 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
+                       fail(peer, pr);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else if (rio->state == READING){
+               XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
+               if (pr->retval > 0)
+                       rio->read += pr->retval;
+               else if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
+                               "%llu bytes. Zeroing out rest", rio->obj_name,
+                               (unsigned long long) req->serviced);
+                       memset(rio->buf + rio->read, 0, req->size - rio->read);
+                       rio->read = req->size ;
+               }
+               else {
+                       XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
+                       r = -1;
+                       goto out_buf;
+               }
+
+               if (rio->read >= req->size) {
+                       XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
+                       //do_aio_write
+                       rio->state = WRITING;
+                       XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
+                       if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
+                                       rio->buf, req->size, req->offset) < 0) {
+                               XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
+                               r = -1;
+                               goto out_buf;
+                       }
+                       return 0;
+               }
+
+               XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
+               if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
+                       req->size - rio->read, req->offset + rio->read) < 0) {
+                       XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
+                                       rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
        }
-       sum = 0;
-       do {
-               r = rados_read(rados->ioctx, rio->obj_name, buf, req->size, 0);
-               if (r < 0) 
-                       goto out_fail;
-               else if (r == 0) {
-                       memset(buf+r, 0, req->size - r);
-                       sum = req->size;
-               } else 
-                       sum += r;
-       } while (sum < req->size);
-
-       r = rados_write_full(rados->ioctx, rio->obj_name, buf, req->size);
+       else if (rio->state == WRITING){
+               XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
+               if (pr->retval == 0) {
+                       XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
+                       XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
+                       req->serviced = req->size;
+                       r = 0;
+                       goto out_buf;
+               }
+               else {
+                       XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
+                       XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
+                       r = -1;
+                       goto out_buf;
+               }
+       }
+       else {
+               XSEGLOG2(&lc, E, "Unknown state");
+       }
+       return 0;
+
+
+out_buf:
+       free(rio->buf);
+out_src:
+       free(rio->src_name);
+
+       rio->buf = NULL;
+       rio->src_name = NULL;
+       rio->read = 0;
+
        if (r < 0)
-               goto out_fail;
-       
-       req->serviced = block_size;
+               fail(peer ,pr);
+       else
+               complete(peer, pr);
        return 0;
+}
+
+int spawnthread(struct peerd *peer, struct peer_req *pr,
+                       void *(*func)(void *arg))
+{
+       //struct radosd *rados = (struct radosd *) peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+
+       pthread_attr_t attr;
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+       return (pthread_create(&rio->tid, &attr, func, (void *) pr));
+}
+
+void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
+{
+       //assert pr valid
+       struct peer_req *pr = (struct peer_req *)arg;
+       //struct radosd *rados = (struct radosd *) pr->peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
 
-out_fail:
-       free(buf);
-       pr->retval = -1;
-       fail(peer, pr);
+       if (pr->req->op == X_OPEN){
+               XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
+               pthread_cond_signal(&rio->cond);
+       }
+       else
+               XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
+}
+
+void * lock_op(void *arg)
+{
+       struct peer_req *pr = (struct peer_req *)arg;
+       struct radosd *rados = (struct radosd *) pr->peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+
+       XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
+       if (!(pr->req->flags & XF_NOSYNC)){
+               if (rados_watch(rados->ioctx, rio->obj_name, 0,
+                               &rio->watch_handle, watch_cb, pr) < 0){
+                       XSEGLOG2(&lc, E, "Rados watch failed for %s",
+                                       rio->obj_name);
+                       fail(pr->peer, pr);
+                       return NULL;
+               }
+       }
+
+       while(rados_lock(rados->ioctx, rio->obj_name) < 0){
+               if (pr->req->flags & XF_NOSYNC){
+                       XSEGLOG2(&lc, E, "Rados lock failed for %s",
+                                       rio->obj_name);
+                       fail(pr->peer, pr);
+                       return NULL;
+               }
+               else{
+                       XSEGLOG2(&lc, D, "rados lock for %s sleeping",
+                                       rio->obj_name);
+                       pthread_mutex_lock(&rio->m);
+                       pthread_cond_wait(&rio->cond, &rio->m);
+                       pthread_mutex_unlock(&rio->m);
+                       XSEGLOG2(&lc, D, "rados lock for %s woke up",
+                                       rio->obj_name);
+               }
+       }
+       if (!(pr->req->flags & XF_NOSYNC)){
+               if (rados_unwatch(rados->ioctx, rio->obj_name,
+                                       rio->watch_handle) < 0){
+                       XSEGLOG2(&lc, E, "Rados unwatch failed");
+               }
+       }
+       XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
+       complete(pr->peer, pr);
+       return NULL;
+}
+
+void * unlock_op(void *arg)
+{
+       struct peer_req *pr = (struct peer_req *)arg;
+       struct radosd *rados = (struct radosd *) pr->peer->priv;
+       struct rados_io *rio = (struct rados_io *) (pr->priv);
+       int r;
+       XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
+       r = rados_unlock(rados->ioctx, rio->obj_name);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Rados unlock failed for %s", rio->obj_name);
+               fail(pr->peer, pr);
+       }
+       else {
+               if (rados_notify(rados->ioctx, rio->obj_name, 
+                                       0, NULL, 0) < 0) {
+                       XSEGLOG2(&lc, E, "rados notify failed");
+               }
+               XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
+               complete(pr->peer, pr);
+       }
+       return NULL;
+}
+
+int handle_open(struct peerd *peer, struct peer_req *pr)
+{
+       int r = spawnthread(peer, pr, lock_op);
+       if (r < 0)
+               fail(pr->peer, pr);
        return 0;
 }
 
 
+int handle_close(struct peerd *peer, struct peer_req *pr)
+{
+       int r = spawnthread(peer, pr, unlock_op);
+       if (r < 0)
+               fail(pr->peer, pr);
+       return 0;
+}
+
 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 {
        int i, j;
@@ -301,34 +559,47 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                perror("malloc");
                return -1;
        }
-       //TODO this should be a parameter. maybe -r (from rados)?
-       strncpy(rados->pool, "xseg", MAX_POOL_NAME);
+       rados->pool[0] = 0;
+       for (i = 0; i < argc; i++) {
+               if (!strcmp(argv[i], "--pool") && (i+1) < argc){
+                       strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
+                       rados->pool[MAX_POOL_NAME] = 0;
+                       i += 1;
+                       continue;
+               }
+       }
+       if (!rados->pool[0]){
+               XSEGLOG2(&lc, E , "Pool must be provided");
+               free(rados);
+               return -1;
+       }
+
        if (rados_create(&rados->cluster, NULL) < 0) {
-               printf("Rados create failed!\n");
+               XSEGLOG2(&lc, E, "Rados create failed!");
                return -1;
        }
        if (rados_conf_read_file(rados->cluster, NULL) < 0){
-               printf("Error reading rados conf files!\n");
+               XSEGLOG2(&lc, E, "Error reading rados conf files!");
                return -1;
        }
        if (rados_connect(rados->cluster) < 0) {
-               printf("Rados connect failed!\n");
+               XSEGLOG2(&lc, E, "Rados connect failed!");
                rados_shutdown(rados->cluster);
                free(rados);
                return 0;
        }
        if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
-               printf( "Pool does not exists. I will try to create it\n");
+               XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
                if (rados_pool_create(rados->cluster, rados->pool) < 0){
-                       printf("Couldn't create pool!\n");
+                       XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
                        rados_shutdown(rados->cluster);
                        free(rados);
                        return -1;
                }
-               printf("Pool created.\n");
+               XSEGLOG2(&lc, I, "Pool created.");
        }
-       if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
-               printf("ioctx create problem.\n");
+       if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
+               XSEGLOG2(&lc, E, "ioctx create problem.");
                rados_shutdown(rados->cluster);
                free(rados);
                return -1;
@@ -338,6 +609,7 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                rio = malloc(sizeof(struct rados_io));
                if (!rio) {
                        //ugly
+                       //is this really necessary?
                        for (j = 0; j < i; j++) {
                                free(peer->peer_reqs[j].priv);
                        }
@@ -345,6 +617,13 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                        perror("malloc");
                        return -1;
                }
+               rio->buf = 0;
+               rio->read = 0;
+               rio->size = 0;
+               rio->src_name = 0;
+               rio->watch_handle = 0;
+               pthread_cond_init(&rio->cond, NULL);
+               pthread_mutex_init(&rio->m, NULL);
                peer->peer_reqs[i].priv = (void *) rio;
        }
        return 0;
@@ -361,11 +640,11 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
 {
        struct rados_io *rio = (struct rados_io *) (pr->priv);
        char *target = xseg_get_target(peer->xseg, pr->req);
-       unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : pr->req->targetlen;
+       unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
        strncpy(rio->obj_name, target, end);
        rio->obj_name[end] = 0;
        //log_pr("dispatch", pr);
-       if (reason == accept)
+       if (reason == dispatch_accept)
                rio->state = ACCEPTED;
 
        switch (pr->req->op){
@@ -391,6 +670,11 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
                        else
                                handle_copy(peer, pr);
                        break;
+               case X_OPEN:
+                       handle_open(peer, pr); break;
+               case X_CLOSE:
+                       handle_close(peer, pr); break;
+
                default:
                        fail(peer, pr);
        }